Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * rewriteheap.c
4 : * Support functions to rewrite tables.
5 : *
6 : * These functions provide a facility to completely rewrite a heap, while
7 : * preserving visibility information and update chains.
8 : *
9 : * INTERFACE
10 : *
11 : * The caller is responsible for creating the new heap, all catalog
12 : * changes, supplying the tuples to be written to the new heap, and
13 : * rebuilding indexes. The caller must hold AccessExclusiveLock on the
14 : * target table, because we assume no one else is writing into it.
15 : *
16 : * To use the facility:
17 : *
18 : * begin_heap_rewrite
19 : * while (fetch next tuple)
20 : * {
21 : * if (tuple is dead)
22 : * rewrite_heap_dead_tuple
23 : * else
24 : * {
25 : * // do any transformations here if required
26 : * rewrite_heap_tuple
27 : * }
28 : * }
29 : * end_heap_rewrite
30 : *
31 : * The contents of the new relation shouldn't be relied on until after
32 : * end_heap_rewrite is called.
33 : *
34 : *
35 : * IMPLEMENTATION
36 : *
37 : * This would be a fairly trivial affair, except that we need to maintain
38 : * the ctid chains that link versions of an updated tuple together.
39 : * Since the newly stored tuples will have tids different from the original
40 : * ones, if we just copied t_ctid fields to the new table the links would
41 : * be wrong. When we are required to copy a (presumably recently-dead or
42 : * delete-in-progress) tuple whose ctid doesn't point to itself, we have
43 : * to substitute the correct ctid instead.
44 : *
45 : * For each ctid reference from A -> B, we might encounter either A first
46 : * or B first. (Note that a tuple in the middle of a chain is both A and B
47 : * of different pairs.)
48 : *
49 : * If we encounter A first, we'll store the tuple in the unresolved_tups
50 : * hash table. When we later encounter B, we remove A from the hash table,
51 : * fix the ctid to point to the new location of B, and insert both A and B
52 : * to the new heap.
53 : *
54 : * If we encounter B first, we can insert B to the new heap right away.
55 : * We then add an entry to the old_new_tid_map hash table showing B's
56 : * original tid (in the old heap) and new tid (in the new heap).
57 : * When we later encounter A, we get the new location of B from the table,
58 : * and can write A immediately with the correct ctid.
59 : *
60 : * Entries in the hash tables can be removed as soon as the later tuple
61 : * is encountered. That helps to keep the memory usage down. At the end,
62 : * both tables are usually empty; we should have encountered both A and B
63 : * of each pair. However, it's possible for A to be RECENTLY_DEAD and B
64 : * entirely DEAD according to HeapTupleSatisfiesVacuum, because the test
65 : * for deadness using OldestXmin is not exact. In such a case we might
66 : * encounter B first, and skip it, and find A later. Then A would be added
67 : * to unresolved_tups, and stay there until end of the rewrite. Since
68 : * this case is very unusual, we don't worry about the memory usage.
69 : *
70 : * Using in-memory hash tables means that we use some memory for each live
71 : * update chain in the table, from the time we find one end of the
72 : * reference until we find the other end. That shouldn't be a problem in
73 : * practice, but if you do something like an UPDATE without a where-clause
74 : * on a large table, and then run CLUSTER in the same transaction, you
75 : * could run out of memory. It doesn't seem worthwhile to add support for
76 : * spill-to-disk, as there shouldn't be that many RECENTLY_DEAD tuples in a
77 : * table under normal circumstances. Furthermore, in the typical scenario
78 : * of CLUSTERing on an unchanging key column, we'll see all the versions
79 : * of a given tuple together anyway, and so the peak memory usage is only
80 : * proportional to the number of RECENTLY_DEAD versions of a single row, not
81 : * in the whole table. Note that if we do fail halfway through a CLUSTER,
82 : * the old table is still valid, so failure is not catastrophic.
83 : *
84 : * We can't use the normal heap_insert function to insert into the new
85 : * heap, because heap_insert overwrites the visibility information.
86 : * We use a special-purpose raw_heap_insert function instead, which
87 : * is optimized for bulk inserting a lot of tuples, knowing that we have
88 : * exclusive access to the heap. raw_heap_insert builds new pages in
89 : * local storage. When a page is full, or at the end of the process,
90 : * we insert it to WAL as a single record and then write it to disk
91 : * directly through smgr. Note, however, that any data sent to the new
92 : * heap's TOAST table will go through the normal bufmgr.
93 : *
94 : *
95 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
96 : * Portions Copyright (c) 1994-5, Regents of the University of California
97 : *
98 : * IDENTIFICATION
99 : * src/backend/access/heap/rewriteheap.c
100 : *
101 : *-------------------------------------------------------------------------
102 : */
103 : #include "postgres.h"
104 :
105 : #include <unistd.h>
106 :
107 : #include "access/heapam.h"
108 : #include "access/heapam_xlog.h"
109 : #include "access/heaptoast.h"
110 : #include "access/rewriteheap.h"
111 : #include "access/transam.h"
112 : #include "access/xact.h"
113 : #include "access/xloginsert.h"
114 : #include "catalog/catalog.h"
115 : #include "common/file_utils.h"
116 : #include "lib/ilist.h"
117 : #include "miscadmin.h"
118 : #include "pgstat.h"
119 : #include "replication/logical.h"
120 : #include "replication/slot.h"
121 : #include "storage/bufmgr.h"
122 : #include "storage/fd.h"
123 : #include "storage/procarray.h"
124 : #include "storage/smgr.h"
125 : #include "utils/memutils.h"
126 : #include "utils/rel.h"
127 :
128 : /*
129 : * State associated with a rewrite operation. This is opaque to the user
130 : * of the rewrite facility.
131 : */
132 : typedef struct RewriteStateData
133 : {
134 : Relation rs_old_rel; /* source heap */
135 : Relation rs_new_rel; /* destination heap */
136 : Page rs_buffer; /* page currently being built */
137 : BlockNumber rs_blockno; /* block where page will go */
138 : bool rs_buffer_valid; /* T if any tuples in buffer */
139 : bool rs_logical_rewrite; /* do we need to do logical rewriting */
140 : TransactionId rs_oldest_xmin; /* oldest xmin used by caller to determine
141 : * tuple visibility */
142 : TransactionId rs_freeze_xid; /* Xid that will be used as freeze cutoff
143 : * point */
144 : TransactionId rs_logical_xmin; /* Xid that will be used as cutoff point
145 : * for logical rewrites */
146 : MultiXactId rs_cutoff_multi; /* MultiXactId that will be used as cutoff
147 : * point for multixacts */
148 : MemoryContext rs_cxt; /* for hash tables and entries and tuples in
149 : * them */
150 : XLogRecPtr rs_begin_lsn; /* XLogInsertLsn when starting the rewrite */
151 : HTAB *rs_unresolved_tups; /* unmatched A tuples */
152 : HTAB *rs_old_new_tid_map; /* unmatched B tuples */
153 : HTAB *rs_logical_mappings; /* logical remapping files */
154 : uint32 rs_num_rewrite_mappings; /* # in memory mappings */
155 : } RewriteStateData;
156 :
157 : /*
158 : * The lookup keys for the hash tables are tuple TID and xmin (we must check
159 : * both to avoid false matches from dead tuples). Beware that there is
160 : * probably some padding space in this struct; it must be zeroed out for
161 : * correct hashtable operation.
162 : */
163 : typedef struct
164 : {
165 : TransactionId xmin; /* tuple xmin */
166 : ItemPointerData tid; /* tuple location in old heap */
167 : } TidHashKey;
168 :
169 : /*
170 : * Entry structures for the hash tables
171 : */
172 : typedef struct
173 : {
174 : TidHashKey key; /* expected xmin/old location of B tuple */
175 : ItemPointerData old_tid; /* A's location in the old heap */
176 : HeapTuple tuple; /* A's tuple contents */
177 : } UnresolvedTupData;
178 :
179 : typedef UnresolvedTupData *UnresolvedTup;
180 :
181 : typedef struct
182 : {
183 : TidHashKey key; /* actual xmin/old location of B tuple */
184 : ItemPointerData new_tid; /* where we put it in the new heap */
185 : } OldToNewMappingData;
186 :
187 : typedef OldToNewMappingData *OldToNewMapping;
188 :
189 : /*
190 : * In-Memory data for an xid that might need logical remapping entries
191 : * to be logged.
192 : */
193 : typedef struct RewriteMappingFile
194 : {
195 : TransactionId xid; /* xid that might need to see the row */
196 : int vfd; /* fd of mappings file */
197 : off_t off; /* how far have we written yet */
198 : dclist_head mappings; /* list of in-memory mappings */
199 : char path[MAXPGPATH]; /* path, for error messages */
200 : } RewriteMappingFile;
201 :
202 : /*
203 : * A single In-Memory logical rewrite mapping, hanging off
204 : * RewriteMappingFile->mappings.
205 : */
206 : typedef struct RewriteMappingDataEntry
207 : {
208 : LogicalRewriteMappingData map; /* map between old and new location of the
209 : * tuple */
210 : dlist_node node;
211 : } RewriteMappingDataEntry;
212 :
213 :
214 : /* prototypes for internal functions */
215 : static void raw_heap_insert(RewriteState state, HeapTuple tup);
216 :
217 : /* internal logical remapping prototypes */
218 : static void logical_begin_heap_rewrite(RewriteState state);
219 : static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
220 : static void logical_end_heap_rewrite(RewriteState state);
221 :
222 :
223 : /*
224 : * Begin a rewrite of a table
225 : *
226 : * old_heap old, locked heap relation tuples will be read from
227 : * new_heap new, locked heap relation to insert tuples to
228 : * oldest_xmin xid used by the caller to determine which tuples are dead
229 : * freeze_xid xid before which tuples will be frozen
230 : * cutoff_multi multixact before which multis will be removed
231 : *
232 : * Returns an opaque RewriteState, allocated in current memory context,
233 : * to be used in subsequent calls to the other functions.
234 : */
5845 tgl 235 ECB : RewriteState
3324 rhaas 236 GIC 262 : begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
237 : TransactionId freeze_xid, MultiXactId cutoff_multi)
238 : {
239 : RewriteState state;
240 : MemoryContext rw_cxt;
241 : MemoryContext old_cxt;
242 : HASHCTL hash_ctl;
243 :
244 : /*
245 : * To ease cleanup, make a separate context that will contain the
246 : * RewriteState struct itself plus all subsidiary data.
5845 tgl 247 ECB : */
5845 tgl 248 GIC 262 : rw_cxt = AllocSetContextCreate(CurrentMemoryContext,
249 : "Table rewrite",
2416 tgl 250 ECB : ALLOCSET_DEFAULT_SIZES);
5845 tgl 251 GIC 262 : old_cxt = MemoryContextSwitchTo(rw_cxt);
252 :
5845 tgl 253 ECB : /* Create and fill in the state struct */
5845 tgl 254 GIC 262 : state = palloc0(sizeof(RewriteStateData));
5845 tgl 255 ECB :
3324 rhaas 256 CBC 262 : state->rs_old_rel = old_heap;
5845 tgl 257 262 : state->rs_new_rel = new_heap;
1 tmunro 258 GNC 262 : state->rs_buffer = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
5845 tgl 259 ECB : /* new_heap needn't be empty, just locked */
5845 tgl 260 CBC 262 : state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
261 262 : state->rs_buffer_valid = false;
262 262 : state->rs_oldest_xmin = oldest_xmin;
5806 alvherre 263 262 : state->rs_freeze_xid = freeze_xid;
3492 264 262 : state->rs_cutoff_multi = cutoff_multi;
5845 tgl 265 GIC 262 : state->rs_cxt = rw_cxt;
266 :
5845 tgl 267 ECB : /* Initialize hash tables used to track update chains */
5845 tgl 268 CBC 262 : hash_ctl.keysize = sizeof(TidHashKey);
269 262 : hash_ctl.entrysize = sizeof(UnresolvedTupData);
5845 tgl 270 GIC 262 : hash_ctl.hcxt = state->rs_cxt;
5845 tgl 271 ECB :
5845 tgl 272 CBC 262 : state->rs_unresolved_tups =
5845 tgl 273 GIC 262 : hash_create("Rewrite / Unresolved ctids",
274 : 128, /* arbitrary initial size */
275 : &hash_ctl,
276 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
5845 tgl 277 ECB :
5845 tgl 278 GIC 262 : hash_ctl.entrysize = sizeof(OldToNewMappingData);
5845 tgl 279 ECB :
5845 tgl 280 CBC 262 : state->rs_old_new_tid_map =
5845 tgl 281 GIC 262 : hash_create("Rewrite / Old to new tid map",
282 : 128, /* arbitrary initial size */
283 : &hash_ctl,
284 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
5845 tgl 285 ECB :
5845 tgl 286 GIC 262 : MemoryContextSwitchTo(old_cxt);
5845 tgl 287 ECB :
3324 rhaas 288 GIC 262 : logical_begin_heap_rewrite(state);
3324 rhaas 289 ECB :
5845 tgl 290 GIC 262 : return state;
291 : }
292 :
293 : /*
294 : * End a rewrite.
295 : *
296 : * state and any other resources are freed.
297 : */
5845 tgl 298 ECB : void
5845 tgl 299 GIC 262 : end_heap_rewrite(RewriteState state)
300 : {
301 : HASH_SEQ_STATUS seq_status;
302 : UnresolvedTup unresolved;
303 :
304 : /*
305 : * Write any remaining tuples in the UnresolvedTups table. If we have any
306 : * left, they should in fact be dead, but let's err on the safe side.
5845 tgl 307 ECB : */
5845 tgl 308 GIC 262 : hash_seq_init(&seq_status, state->rs_unresolved_tups);
5845 tgl 309 ECB :
5845 tgl 310 GIC 262 : while ((unresolved = hash_seq_search(&seq_status)) != NULL)
5845 tgl 311 EUB : {
5845 tgl 312 UBC 0 : ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
5845 tgl 313 UIC 0 : raw_heap_insert(state, unresolved->tuple);
314 : }
315 :
5845 tgl 316 ECB : /* Write the last page, if any */
5845 tgl 317 GIC 262 : if (state->rs_buffer_valid)
5845 tgl 318 ECB : {
1100 noah 319 CBC 173 : if (RelationNeedsWAL(state->rs_new_rel))
277 rhaas 320 GNC 88 : log_newpage(&state->rs_new_rel->rd_locator,
321 : MAIN_FORKNUM,
322 : state->rs_blockno,
323 : state->rs_buffer,
324 : true);
3654 simon 325 ECB :
3654 simon 326 GIC 173 : PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
3654 simon 327 ECB :
636 tgl 328 CBC 173 : smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
41 peter 329 GNC 173 : state->rs_blockno, state->rs_buffer, true);
330 : }
331 :
332 : /*
333 : * When we WAL-logged rel pages, we must nonetheless fsync them. The
334 : * reason is the same as in storage.c's RelationCopyStorage(): we're
335 : * writing data that's not in shared buffers, and so a CHECKPOINT
336 : * occurring during the rewriteheap operation won't have fsync'd data we
337 : * wrote before the checkpoint.
5845 tgl 338 ECB : */
4500 rhaas 339 CBC 262 : if (RelationNeedsWAL(state->rs_new_rel))
636 tgl 340 GIC 128 : smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
5845 tgl 341 ECB :
3324 rhaas 342 GIC 262 : logical_end_heap_rewrite(state);
343 :
5845 tgl 344 ECB : /* Deleting the context frees everything */
5845 tgl 345 CBC 262 : MemoryContextDelete(state->rs_cxt);
5845 tgl 346 GIC 262 : }
347 :
348 : /*
349 : * Add a tuple to the new heap.
350 : *
351 : * Visibility information is copied from the original tuple, except that
352 : * we "freeze" very-old tuples. Note that since we scribble on new_tuple,
353 : * it had better be temp storage not a pointer to the original tuple.
354 : *
355 : * state opaque state as returned by begin_heap_rewrite
356 : * old_tuple original tuple in the old heap
357 : * new_tuple new, rewritten tuple to be inserted to new heap
358 : */
5845 tgl 359 ECB : void
5845 tgl 360 GIC 384627 : rewrite_heap_tuple(RewriteState state,
361 : HeapTuple old_tuple, HeapTuple new_tuple)
362 : {
363 : MemoryContext old_cxt;
364 : ItemPointerData old_tid;
365 : TidHashKey hashkey;
366 : bool found;
367 : bool free_new;
5845 tgl 368 ECB :
5845 tgl 369 GIC 384627 : old_cxt = MemoryContextSwitchTo(state->rs_cxt);
370 :
371 : /*
372 : * Copy the original tuple's visibility information into new_tuple.
373 : *
374 : * XXX we might later need to copy some t_infomask2 bits, too? Right now,
375 : * we intentionally clear the HOT status bits.
5845 tgl 376 ECB : */
5845 tgl 377 CBC 384627 : memcpy(&new_tuple->t_data->t_choice.t_heap,
5845 tgl 378 GIC 384627 : &old_tuple->t_data->t_choice.t_heap,
379 : sizeof(HeapTupleFields));
5845 tgl 380 ECB :
5845 tgl 381 CBC 384627 : new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
5680 382 384627 : new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
5845 383 384627 : new_tuple->t_data->t_infomask |=
5845 tgl 384 GIC 384627 : old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
385 :
386 : /*
387 : * While we have our hands on the tuple, we may as well freeze any
388 : * eligible xmin or xmax, so that future VACUUM effort can be saved.
5806 alvherre 389 ECB : */
1973 andres 390 CBC 384627 : heap_freeze_tuple(new_tuple->t_data,
391 384627 : state->rs_old_rel->rd_rel->relfrozenxid,
1973 andres 392 GIC 384627 : state->rs_old_rel->rd_rel->relminmxid,
393 : state->rs_freeze_xid,
394 : state->rs_cutoff_multi);
395 :
396 : /*
397 : * Invalid ctid means that ctid should point to the tuple itself. We'll
398 : * override it later if the tuple is part of an update chain.
5845 tgl 399 ECB : */
5845 tgl 400 GIC 384627 : ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
401 :
402 : /*
403 : * If the tuple has been updated, check the old-to-new mapping hash table.
5845 tgl 404 ECB : */
3728 alvherre 405 CBC 417155 : if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
406 32528 : HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
1828 andres 407 32528 : !HeapTupleHeaderIndicatesMovedPartitions(old_tuple->t_data) &&
5845 tgl 408 32528 : !(ItemPointerEquals(&(old_tuple->t_self),
5845 tgl 409 GIC 32528 : &(old_tuple->t_data->t_ctid))))
410 : {
411 : OldToNewMapping mapping;
5845 tgl 412 ECB :
5845 tgl 413 CBC 513 : memset(&hashkey, 0, sizeof(hashkey));
3728 alvherre 414 513 : hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
5845 tgl 415 GIC 513 : hashkey.tid = old_tuple->t_data->t_ctid;
416 :
5845 tgl 417 ECB : mapping = (OldToNewMapping)
5845 tgl 418 GIC 513 : hash_search(state->rs_old_new_tid_map, &hashkey,
419 : HASH_FIND, NULL);
5845 tgl 420 ECB :
5845 tgl 421 GIC 513 : if (mapping != NULL)
422 : {
423 : /*
424 : * We've already copied the tuple that t_ctid points to, so we can
425 : * set the ctid of this tuple to point to the new location, and
426 : * insert it right away.
5845 tgl 427 ECB : */
5845 tgl 428 GIC 181 : new_tuple->t_data->t_ctid = mapping->new_tid;
429 :
5845 tgl 430 ECB : /* We don't need the mapping entry anymore */
5845 tgl 431 GIC 181 : hash_search(state->rs_old_new_tid_map, &hashkey,
5845 tgl 432 ECB : HASH_REMOVE, &found);
5845 tgl 433 GIC 181 : Assert(found);
434 : }
435 : else
436 : {
437 : /*
438 : * We haven't seen the tuple t_ctid points to yet. Stash this
439 : * tuple into unresolved_tups to be written later.
440 : */
441 : UnresolvedTup unresolved;
5845 tgl 442 ECB :
5845 tgl 443 GIC 332 : unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
5845 tgl 444 ECB : HASH_ENTER, &found);
5845 tgl 445 GIC 332 : Assert(!found);
5845 tgl 446 ECB :
5845 tgl 447 CBC 332 : unresolved->old_tid = old_tuple->t_self;
5845 tgl 448 GIC 332 : unresolved->tuple = heap_copytuple(new_tuple);
449 :
450 : /*
451 : * We can't do anything more now, since we don't know where the
452 : * tuple will be written.
5845 tgl 453 ECB : */
5845 tgl 454 CBC 332 : MemoryContextSwitchTo(old_cxt);
5845 tgl 455 GIC 332 : return;
456 : }
457 : }
458 :
459 : /*
460 : * Now we will write the tuple, and then check to see if it is the B tuple
461 : * in any new or known pair. When we resolve a known pair, we will be
462 : * able to write that pair's A tuple, and then we have to check if it
463 : * resolves some other pair. Hence, we need a loop here.
5845 tgl 464 ECB : */
5845 tgl 465 CBC 384295 : old_tid = old_tuple->t_self;
5845 tgl 466 GIC 384295 : free_new = false;
467 :
5845 tgl 468 ECB : for (;;)
5845 tgl 469 GIC 332 : {
470 : ItemPointerData new_tid;
471 :
5845 tgl 472 ECB : /* Insert the tuple and find out where it's put in new_heap */
5845 tgl 473 CBC 384627 : raw_heap_insert(state, new_tuple);
5845 tgl 474 GIC 384627 : new_tid = new_tuple->t_self;
5845 tgl 475 ECB :
3324 rhaas 476 GIC 384627 : logical_rewrite_heap_tuple(state, old_tid, new_tuple);
477 :
478 : /*
479 : * If the tuple is the updated version of a row, and the prior version
480 : * wouldn't be DEAD yet, then we need to either resolve the prior
481 : * version (if it's waiting in rs_unresolved_tups), or make an entry
482 : * in rs_old_new_tid_map (so we can resolve it when we do see it). The
483 : * previous tuple's xmax would equal this one's xmin, so it's
484 : * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
5845 tgl 485 ECB : */
5845 tgl 486 CBC 384627 : if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
5845 tgl 487 GIC 8150 : !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
488 : state->rs_oldest_xmin))
489 : {
490 : /*
491 : * Okay, this is B in an update pair. See if we've seen A.
492 : */
493 : UnresolvedTup unresolved;
5845 tgl 494 ECB :
5845 tgl 495 CBC 513 : memset(&hashkey, 0, sizeof(hashkey));
496 513 : hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
5845 tgl 497 GIC 513 : hashkey.tid = old_tid;
5845 tgl 498 ECB :
5845 tgl 499 GIC 513 : unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
500 : HASH_FIND, NULL);
5845 tgl 501 ECB :
5845 tgl 502 GIC 513 : if (unresolved != NULL)
503 : {
504 : /*
505 : * We have seen and memorized the previous tuple already. Now
506 : * that we know where we inserted the tuple its t_ctid points
507 : * to, fix its t_ctid and insert it to the new heap.
5845 tgl 508 ECB : */
5845 tgl 509 CBC 332 : if (free_new)
510 88 : heap_freetuple(new_tuple);
511 332 : new_tuple = unresolved->tuple;
512 332 : free_new = true;
513 332 : old_tid = unresolved->old_tid;
5845 tgl 514 GIC 332 : new_tuple->t_data->t_ctid = new_tid;
515 :
516 : /*
517 : * We don't need the hash entry anymore, but don't free its
518 : * tuple just yet.
5845 tgl 519 ECB : */
5845 tgl 520 GIC 332 : hash_search(state->rs_unresolved_tups, &hashkey,
5845 tgl 521 ECB : HASH_REMOVE, &found);
5845 tgl 522 GIC 332 : Assert(found);
523 :
5845 tgl 524 ECB : /* loop back to insert the previous tuple in the chain */
5845 tgl 525 GIC 332 : continue;
526 : }
527 : else
528 : {
529 : /*
530 : * Remember the new tid of this tuple. We'll use it to set the
531 : * ctid when we find the previous tuple in the chain.
532 : */
533 : OldToNewMapping mapping;
5845 tgl 534 ECB :
5845 tgl 535 GIC 181 : mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
5845 tgl 536 ECB : HASH_ENTER, &found);
5845 tgl 537 GIC 181 : Assert(!found);
5845 tgl 538 ECB :
5845 tgl 539 GIC 181 : mapping->new_tid = new_tid;
540 : }
541 : }
542 :
5845 tgl 543 ECB : /* Done with this (chain of) tuples, for now */
5845 tgl 544 CBC 384295 : if (free_new)
545 244 : heap_freetuple(new_tuple);
5845 tgl 546 GIC 384295 : break;
547 : }
5845 tgl 548 ECB :
5845 tgl 549 GIC 384295 : MemoryContextSwitchTo(old_cxt);
550 : }
551 :
552 : /*
553 : * Register a dead tuple with an ongoing rewrite. Dead tuples are not
554 : * copied to the new table, but we still make note of them so that we
555 : * can release some resources earlier.
556 : *
557 : * Returns true if a tuple was removed from the unresolved_tups table.
558 : * This indicates that that tuple, previously thought to be "recently dead",
559 : * is now known really dead and won't be written to the output.
560 : */
4567 tgl 561 ECB : bool
5845 tgl 562 GIC 11411 : rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
563 : {
564 : /*
565 : * If we have already seen an earlier tuple in the update chain that
566 : * points to this tuple, let's forget about that earlier tuple. It's in
567 : * fact dead as well, our simple xmax < OldestXmin test in
568 : * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
569 : * when xmin of a tuple is greater than xmax, which sounds
570 : * counter-intuitive but is perfectly valid.
571 : *
572 : * We don't bother to try to detect the situation the other way round,
573 : * when we encounter the dead tuple first and then the recently dead one
574 : * that points to it. If that happens, we'll have some unmatched entries
575 : * in the UnresolvedTups hash table at the end. That can happen anyway,
576 : * because a vacuum might have removed the dead tuple in the chain before
577 : * us.
578 : */
579 : UnresolvedTup unresolved;
580 : TidHashKey hashkey;
581 : bool found;
5845 tgl 582 ECB :
5845 tgl 583 CBC 11411 : memset(&hashkey, 0, sizeof(hashkey));
584 11411 : hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
5845 tgl 585 GIC 11411 : hashkey.tid = old_tuple->t_self;
5845 tgl 586 ECB :
5845 tgl 587 GIC 11411 : unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
588 : HASH_FIND, NULL);
5845 tgl 589 ECB :
5845 tgl 590 GIC 11411 : if (unresolved != NULL)
591 : {
5845 tgl 592 EUB : /* Need to free the contained tuple as well as the hashtable entry */
5845 tgl 593 UBC 0 : heap_freetuple(unresolved->tuple);
5845 tgl 594 UIC 0 : hash_search(state->rs_unresolved_tups, &hashkey,
5845 tgl 595 EUB : HASH_REMOVE, &found);
5845 tgl 596 UBC 0 : Assert(found);
4567 tgl 597 UIC 0 : return true;
598 : }
4567 tgl 599 ECB :
4567 tgl 600 GIC 11411 : return false;
601 : }
602 :
603 : /*
604 : * Insert a tuple to the new relation. This has to track heap_insert
605 : * and its subsidiary functions!
606 : *
607 : * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
608 : * tuple is invalid on entry, it's replaced with the new TID as well (in
609 : * the inserted data only, not in the caller's copy).
610 : */
5845 tgl 611 ECB : static void
5845 tgl 612 GIC 384627 : raw_heap_insert(RewriteState state, HeapTuple tup)
5845 tgl 613 ECB : {
5624 bruce 614 GIC 384627 : Page page = state->rs_buffer;
615 : Size pageFreeSpace,
616 : saveFreeSpace;
617 : Size len;
618 : OffsetNumber newoff;
619 : HeapTuple heaptup;
620 :
621 : /*
622 : * If the new tuple is too big for storage or contains already toasted
623 : * out-of-line attributes from some other relation, invoke the toaster.
624 : *
625 : * Note: below this point, heaptup is the data we actually intend to store
626 : * into the relation; tup is the caller's original untoasted data.
5845 tgl 627 ECB : */
5845 tgl 628 GIC 384627 : if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
629 : {
5845 tgl 630 EUB : /* toast table entries should never be recursively toasted */
5845 tgl 631 UBC 0 : Assert(!HeapTupleHasExternal(tup));
5845 tgl 632 UIC 0 : heaptup = tup;
5845 tgl 633 ECB : }
5845 tgl 634 CBC 384627 : else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
1642 andres 635 292 : {
1418 tgl 636 GIC 292 : int options = HEAP_INSERT_SKIP_FSM;
637 :
638 : /*
639 : * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
640 : * for the TOAST table are not logically decoded. The main heap is
641 : * WAL-logged as XLOG FPI records, which are not logically decoded.
1642 andres 642 ECB : */
1593 tomas.vondra 643 GIC 292 : options |= HEAP_INSERT_NO_LOGICAL;
1642 andres 644 ECB :
1283 rhaas 645 GIC 292 : heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
646 : options);
647 : }
5845 tgl 648 ECB : else
5845 tgl 649 GIC 384335 : heaptup = tup;
5845 tgl 650 ECB :
2118 tgl 651 GIC 384627 : len = MAXALIGN(heaptup->t_len); /* be conservative */
652 :
653 : /*
654 : * If we're gonna fail for oversize tuple, do it right away
5845 tgl 655 ECB : */
5845 tgl 656 GBC 384627 : if (len > MaxHeapTupleSize)
5845 tgl 657 UIC 0 : ereport(ERROR,
658 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
659 : errmsg("row is too big: size %zu, maximum size %zu",
660 : len, MaxHeapTupleSize)));
661 :
5845 tgl 662 ECB : /* Compute desired extra freespace due to fillfactor option */
5845 tgl 663 GIC 384627 : saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
664 : HEAP_DEFAULT_FILLFACTOR);
665 :
5845 tgl 666 ECB : /* Now we can check to see if there's enough free space already. */
5845 tgl 667 GIC 384627 : if (state->rs_buffer_valid)
5845 tgl 668 ECB : {
5680 tgl 669 GIC 384454 : pageFreeSpace = PageGetHeapFreeSpace(page);
5845 tgl 670 ECB :
5845 tgl 671 GIC 384454 : if (len + saveFreeSpace > pageFreeSpace)
672 : {
673 : /*
674 : * Doesn't fit, so write out the existing page. It always
675 : * contains a tuple. Hence, unlike RelationGetBufferForTuple(),
676 : * enforce saveFreeSpace unconditionally.
677 : */
678 :
5845 tgl 679 ECB : /* XLOG stuff */
1100 noah 680 CBC 5160 : if (RelationNeedsWAL(state->rs_new_rel))
277 rhaas 681 GNC 1401 : log_newpage(&state->rs_new_rel->rd_locator,
682 : MAIN_FORKNUM,
683 : state->rs_blockno,
684 : page,
685 : true);
686 :
687 : /*
688 : * Now write the page. We say skipFsync = true because there's no
689 : * need for smgr to schedule an fsync for this write; we'll do it
690 : * ourselves in end_heap_rewrite.
5845 tgl 691 ECB : */
3654 simon 692 GIC 5160 : PageSetChecksumInplace(page, state->rs_blockno);
3654 simon 693 ECB :
636 tgl 694 GIC 5160 : smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
695 : state->rs_blockno, page, true);
5845 tgl 696 ECB :
5845 tgl 697 CBC 5160 : state->rs_blockno++;
5845 tgl 698 GIC 5160 : state->rs_buffer_valid = false;
699 : }
700 : }
5845 tgl 701 ECB :
5845 tgl 702 GIC 384627 : if (!state->rs_buffer_valid)
703 : {
5845 tgl 704 ECB : /* Initialize a new empty page */
5845 tgl 705 CBC 5333 : PageInit(page, BLCKSZ, 0);
5845 tgl 706 GIC 5333 : state->rs_buffer_valid = true;
707 : }
708 :
5845 tgl 709 ECB : /* And now we can insert the tuple into the page */
4534 heikki.linnakangas 710 GIC 384627 : newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
5680 tgl 711 ECB : InvalidOffsetNumber, false, true);
5845 tgl 712 GBC 384627 : if (newoff == InvalidOffsetNumber)
5845 tgl 713 UIC 0 : elog(ERROR, "failed to add tuple");
714 :
5845 tgl 715 ECB : /* Update caller's t_self to the actual position where it was stored */
5845 tgl 716 GIC 384627 : ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
717 :
718 : /*
719 : * Insert the correct position into CTID of the stored tuple, too, if the
720 : * caller didn't supply a valid CTID.
5845 tgl 721 ECB : */
5624 bruce 722 GIC 384627 : if (!ItemPointerIsValid(&tup->t_data->t_ctid))
723 : {
724 : ItemId newitemid;
725 : HeapTupleHeader onpage_tup;
5845 tgl 726 ECB :
5845 tgl 727 CBC 384114 : newitemid = PageGetItemId(page, newoff);
5845 tgl 728 GIC 384114 : onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
5845 tgl 729 ECB :
5845 tgl 730 GIC 384114 : onpage_tup->t_ctid = tup->t_self;
731 : }
732 :
5845 tgl 733 ECB : /* If heaptup is a private copy, release it. */
5845 tgl 734 CBC 384627 : if (heaptup != tup)
735 292 : heap_freetuple(heaptup);
5845 tgl 736 GIC 384627 : }
737 :
738 : /* ------------------------------------------------------------------------
739 : * Logical rewrite support
740 : *
741 : * When doing logical decoding - which relies on using cmin/cmax of catalog
742 : * tuples, via xl_heap_new_cid records - heap rewrites have to log enough
743 : * information to allow the decoding backend to update its internal mapping
744 : * of (relfilelocator,ctid) => (cmin, cmax) to be correct for the rewritten heap.
745 : *
746 : * For that, every time we find a tuple that's been modified in a catalog
747 : * relation within the xmin horizon of any decoding slot, we log a mapping
748 : * from the old to the new location.
749 : *
750 : * To deal with rewrites that abort the filename of a mapping file contains
751 : * the xid of the transaction performing the rewrite, which then can be
752 : * checked before being read in.
753 : *
754 : * For efficiency we don't immediately spill every single map mapping for a
755 : * row to disk but only do so in batches when we've collected several of them
756 : * in memory or when end_heap_rewrite() has been called.
757 : *
758 : * Crash-Safety: This module diverts from the usual patterns of doing WAL
759 : * since it cannot rely on checkpoint flushing out all buffers and thus
760 : * waiting for exclusive locks on buffers. Usually the XLogInsert() covering
761 : * buffer modifications is performed while the buffer(s) that are being
762 : * modified are exclusively locked guaranteeing that both the WAL record and
763 : * the modified heap are on either side of the checkpoint. But since the
764 : * mapping files we log aren't in shared_buffers that interlock doesn't work.
765 : *
766 : * Instead we simply write the mapping files out to disk, *before* the
767 : * XLogInsert() is performed. That guarantees that either the XLogInsert() is
768 : * inserted after the checkpoint's redo pointer or that the checkpoint (via
769 : * CheckPointLogicalRewriteHeap()) has flushed the (partial) mapping file to
770 : * disk. That leaves the tail end that has not yet been flushed open to
771 : * corruption, which is solved by including the current offset in the
772 : * xl_heap_rewrite_mapping records and truncating the mapping file to it
773 : * during replay. Every time a rewrite is finished all generated mapping files
774 : * are synced to disk.
775 : *
776 : * Note that if we were only concerned about crash safety we wouldn't have to
777 : * deal with WAL logging at all - an fsync() at the end of a rewrite would be
778 : * sufficient for crash safety. Any mapping that hasn't been safely flushed to
779 : * disk has to be by an aborted (explicitly or via a crash) transaction and is
780 : * ignored by virtue of the xid in its name being subject to a
781 : * TransactionDidCommit() check. But we want to support having standbys via
782 : * physical replication, both for availability and to do logical decoding
783 : * there.
784 : * ------------------------------------------------------------------------
785 : */
786 :
787 : /*
788 : * Do preparations for logging logical mappings during a rewrite if
789 : * necessary. If we detect that we don't need to log anything we'll prevent
790 : * any further action by the various logical rewrite functions.
791 : */
3324 rhaas 792 ECB : static void
3324 rhaas 793 GIC 262 : logical_begin_heap_rewrite(RewriteState state)
794 : {
795 : HASHCTL hash_ctl;
796 : TransactionId logical_xmin;
797 :
798 : /*
799 : * We only need to persist these mappings if the rewritten table can be
800 : * accessed during logical decoding, if not, we can skip doing any
801 : * additional work.
3324 rhaas 802 ECB : */
3324 rhaas 803 CBC 262 : state->rs_logical_rewrite =
3324 rhaas 804 GIC 262 : RelationIsAccessibleInLogicalDecoding(state->rs_old_rel);
3324 rhaas 805 ECB :
3324 rhaas 806 CBC 262 : if (!state->rs_logical_rewrite)
3324 rhaas 807 GIC 242 : return;
3324 rhaas 808 ECB :
3324 rhaas 809 GIC 21 : ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
810 :
811 : /*
812 : * If there are no logical slots in progress we don't need to do anything,
813 : * there cannot be any remappings for relevant rows yet. The relation's
814 : * lock protects us against races.
3324 rhaas 815 ECB : */
3324 rhaas 816 GIC 21 : if (logical_xmin == InvalidTransactionId)
3324 rhaas 817 ECB : {
3324 rhaas 818 CBC 1 : state->rs_logical_rewrite = false;
3324 rhaas 819 GIC 1 : return;
820 : }
3324 rhaas 821 ECB :
3324 rhaas 822 CBC 20 : state->rs_logical_xmin = logical_xmin;
823 20 : state->rs_begin_lsn = GetXLogInsertRecPtr();
3324 rhaas 824 GIC 20 : state->rs_num_rewrite_mappings = 0;
3324 rhaas 825 ECB :
3324 rhaas 826 CBC 20 : hash_ctl.keysize = sizeof(TransactionId);
827 20 : hash_ctl.entrysize = sizeof(RewriteMappingFile);
3324 rhaas 828 GIC 20 : hash_ctl.hcxt = state->rs_cxt;
3324 rhaas 829 ECB :
3324 rhaas 830 CBC 20 : state->rs_logical_mappings =
3324 rhaas 831 GIC 20 : hash_create("Logical rewrite mapping",
832 : 128, /* arbitrary initial size */
833 : &hash_ctl,
834 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
835 : }
836 :
837 : /*
838 : * Flush all logical in-memory mappings to disk, but don't fsync them yet.
839 : */
3324 rhaas 840 ECB : static void
3324 rhaas 841 GIC 9 : logical_heap_rewrite_flush_mappings(RewriteState state)
842 : {
843 : HASH_SEQ_STATUS seq_status;
844 : RewriteMappingFile *src;
845 : dlist_mutable_iter iter;
3324 rhaas 846 ECB :
3324 rhaas 847 GIC 9 : Assert(state->rs_logical_rewrite);
848 :
3324 rhaas 849 ECB : /* no logical rewrite in progress, no need to iterate over mappings */
3324 rhaas 850 GBC 9 : if (state->rs_num_rewrite_mappings == 0)
3324 rhaas 851 UIC 0 : return;
3324 rhaas 852 ECB :
3324 rhaas 853 GIC 9 : elog(DEBUG1, "flushing %u logical rewrite mapping entries",
854 : state->rs_num_rewrite_mappings);
3324 rhaas 855 ECB :
3324 rhaas 856 CBC 9 : hash_seq_init(&seq_status, state->rs_logical_mappings);
3324 rhaas 857 GIC 98 : while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
858 : {
859 : char *waldata;
860 : char *waldata_start;
861 : xl_heap_rewrite_mapping xlrec;
862 : Oid dboid;
863 : uint32 len;
3260 bruce 864 ECB : int written;
158 drowley 865 GNC 89 : uint32 num_mappings = dclist_count(&src->mappings);
866 :
867 : /* this file hasn't got any new mappings */
868 89 : if (num_mappings == 0)
3324 rhaas 869 UBC 0 : continue;
870 :
3324 rhaas 871 CBC 89 : if (state->rs_old_rel->rd_rel->relisshared)
3324 rhaas 872 UBC 0 : dboid = InvalidOid;
873 : else
3324 rhaas 874 CBC 89 : dboid = MyDatabaseId;
875 :
158 drowley 876 GNC 89 : xlrec.num_mappings = num_mappings;
3324 rhaas 877 CBC 89 : xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
878 89 : xlrec.mapped_xid = src->xid;
879 89 : xlrec.mapped_db = dboid;
880 89 : xlrec.offset = src->off;
881 89 : xlrec.start_lsn = state->rs_begin_lsn;
882 :
883 : /* write all mappings consecutively */
158 drowley 884 GNC 89 : len = num_mappings * sizeof(LogicalRewriteMappingData);
3274 tgl 885 CBC 89 : waldata_start = waldata = palloc(len);
886 :
887 : /*
888 : * collect data we need to write out, but don't modify ondisk data yet
889 : */
158 drowley 890 GNC 748 : dclist_foreach_modify(iter, &src->mappings)
891 : {
892 : RewriteMappingDataEntry *pmap;
893 :
894 659 : pmap = dclist_container(RewriteMappingDataEntry, node, iter.cur);
895 :
3324 rhaas 896 CBC 659 : memcpy(waldata, &pmap->map, sizeof(pmap->map));
897 659 : waldata += sizeof(pmap->map);
898 :
899 : /* remove from the list and free */
158 drowley 900 GNC 659 : dclist_delete_from(&src->mappings, &pmap->node);
3324 rhaas 901 CBC 659 : pfree(pmap);
902 :
903 : /* update bookkeeping */
904 659 : state->rs_num_rewrite_mappings--;
905 : }
3324 rhaas 906 ECB :
158 drowley 907 GNC 89 : Assert(dclist_count(&src->mappings) == 0);
3274 tgl 908 GIC 89 : Assert(waldata == waldata_start + len);
909 :
910 : /*
911 : * Note that we deviate from the usual WAL coding practices here,
912 : * check the above "Logical rewrite support" comment for reasoning.
3324 rhaas 913 ECB : */
1614 tmunro 914 GIC 89 : written = FileWrite(src->vfd, waldata_start, len, src->off,
2213 rhaas 915 ECB : WAIT_EVENT_LOGICAL_REWRITE_WRITE);
3324 rhaas 916 GBC 89 : if (written != len)
3324 rhaas 917 UIC 0 : ereport(ERROR,
918 : (errcode_for_file_access(),
919 : errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
3324 rhaas 920 ECB : written, len)));
3324 rhaas 921 GIC 89 : src->off += len;
3324 rhaas 922 ECB :
3062 heikki.linnakangas 923 CBC 89 : XLogBeginInsert();
924 89 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
3062 heikki.linnakangas 925 GIC 89 : XLogRegisterData(waldata_start, len);
926 :
3324 rhaas 927 ECB : /* write xlog record */
3062 heikki.linnakangas 928 GIC 89 : XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
3324 rhaas 929 ECB :
3274 tgl 930 GIC 89 : pfree(waldata_start);
3324 rhaas 931 ECB : }
3324 rhaas 932 GIC 9 : Assert(state->rs_num_rewrite_mappings == 0);
933 : }
934 :
935 : /*
936 : * Logical remapping part of end_heap_rewrite().
937 : */
3324 rhaas 938 ECB : static void
3324 rhaas 939 GIC 262 : logical_end_heap_rewrite(RewriteState state)
940 : {
941 : HASH_SEQ_STATUS seq_status;
942 : RewriteMappingFile *src;
943 :
3324 rhaas 944 ECB : /* done, no logical rewrite in progress */
3324 rhaas 945 CBC 262 : if (!state->rs_logical_rewrite)
3324 rhaas 946 GIC 242 : return;
947 :
3324 rhaas 948 ECB : /* writeout remaining in-memory entries */
3260 bruce 949 CBC 20 : if (state->rs_num_rewrite_mappings > 0)
3324 rhaas 950 GIC 9 : logical_heap_rewrite_flush_mappings(state);
951 :
3324 rhaas 952 ECB : /* Iterate over all mappings we have written and fsync the files. */
3324 rhaas 953 CBC 20 : hash_seq_init(&seq_status, state->rs_logical_mappings);
3324 rhaas 954 GIC 109 : while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
3324 rhaas 955 ECB : {
2213 rhaas 956 GBC 89 : if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
1602 tmunro 957 UIC 0 : ereport(data_sync_elevel(ERROR),
958 : (errcode_for_file_access(),
3324 rhaas 959 ECB : errmsg("could not fsync file \"%s\": %m", src->path)));
3324 rhaas 960 GIC 89 : FileClose(src->vfd);
961 : }
962 : /* memory context cleanup will deal with the rest */
963 : }
964 :
965 : /*
966 : * Log a single (old->new) mapping for 'xid'.
967 : */
3324 rhaas 968 ECB : static void
3324 rhaas 969 GIC 659 : logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
970 : LogicalRewriteMappingData *map)
971 : {
972 : RewriteMappingFile *src;
973 : RewriteMappingDataEntry *pmap;
974 : Oid relid;
975 : bool found;
3324 rhaas 976 ECB :
3324 rhaas 977 GIC 659 : relid = RelationGetRelid(state->rs_old_rel);
978 :
3324 rhaas 979 ECB : /* look for existing mappings for this 'mapped' xid */
3324 rhaas 980 GIC 659 : src = hash_search(state->rs_logical_mappings, &xid,
981 : HASH_ENTER, &found);
982 :
983 : /*
984 : * We haven't yet had the need to map anything for this xid, create
985 : * per-xid data structures.
3324 rhaas 986 ECB : */
3324 rhaas 987 GIC 659 : if (!found)
988 : {
989 : char path[MAXPGPATH];
990 : Oid dboid;
3324 rhaas 991 ECB :
3324 rhaas 992 GBC 89 : if (state->rs_old_rel->rd_rel->relisshared)
3324 rhaas 993 UIC 0 : dboid = InvalidOid;
3324 rhaas 994 ECB : else
3324 rhaas 995 GIC 89 : dboid = MyDatabaseId;
3324 rhaas 996 ECB :
3324 rhaas 997 GIC 89 : snprintf(path, MAXPGPATH,
998 : "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
3324 rhaas 999 ECB : dboid, relid,
775 peter 1000 GIC 89 : LSN_FORMAT_ARGS(state->rs_begin_lsn),
1001 : xid, GetCurrentTransactionId());
3324 rhaas 1002 ECB :
158 drowley 1003 GNC 89 : dclist_init(&src->mappings);
3324 rhaas 1004 CBC 89 : src->off = 0;
3324 rhaas 1005 GIC 89 : memcpy(src->path, path, sizeof(path));
3324 rhaas 1006 CBC 89 : src->vfd = PathNameOpenFile(path,
2024 peter_e 1007 EUB : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3324 rhaas 1008 GIC 89 : if (src->vfd < 0)
3324 rhaas 1009 UIC 0 : ereport(ERROR,
1010 : (errcode_for_file_access(),
1011 : errmsg("could not create file \"%s\": %m", path)));
3324 rhaas 1012 ECB : }
1013 :
3324 rhaas 1014 CBC 659 : pmap = MemoryContextAlloc(state->rs_cxt,
3324 rhaas 1015 ECB : sizeof(RewriteMappingDataEntry));
3324 rhaas 1016 CBC 659 : memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
158 drowley 1017 GNC 659 : dclist_push_tail(&src->mappings, &pmap->node);
3324 rhaas 1018 GIC 659 : state->rs_num_rewrite_mappings++;
1019 :
1020 : /*
3324 rhaas 1021 ECB : * Write out buffer every time we've too many in-memory entries across all
3324 rhaas 1022 EUB : * mapping files.
3324 rhaas 1023 ECB : */
3260 bruce 1024 GIC 659 : if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
3324 rhaas 1025 UIC 0 : logical_heap_rewrite_flush_mappings(state);
3324 rhaas 1026 GIC 659 : }
1027 :
1028 : /*
1029 : * Perform logical remapping for a tuple that's mapped from old_tid to
3310 fujii 1030 ECB : * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
1031 : */
1032 : static void
3324 rhaas 1033 CBC 384627 : logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
3324 rhaas 1034 ECB : HeapTuple new_tuple)
1035 : {
3324 rhaas 1036 GIC 384627 : ItemPointerData new_tid = new_tuple->t_self;
3260 bruce 1037 CBC 384627 : TransactionId cutoff = state->rs_logical_xmin;
3260 bruce 1038 ECB : TransactionId xmin;
1039 : TransactionId xmax;
3260 bruce 1040 GIC 384627 : bool do_log_xmin = false;
1041 384627 : bool do_log_xmax = false;
3324 rhaas 1042 ECB : LogicalRewriteMappingData map;
1043 :
1044 : /* no logical rewrite in progress, we don't need to log anything */
3324 rhaas 1045 CBC 384627 : if (!state->rs_logical_rewrite)
3324 rhaas 1046 GIC 383983 : return;
3324 rhaas 1047 ECB :
3324 rhaas 1048 GIC 25907 : xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
1049 : /* use *GetUpdateXid to correctly deal with multixacts */
1050 25907 : xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
1051 :
3324 rhaas 1052 ECB : /*
1053 : * Log the mapping iff the tuple has been created recently.
1054 : */
3324 rhaas 1055 CBC 25907 : if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
3324 rhaas 1056 GIC 485 : do_log_xmin = true;
1057 :
1058 25907 : if (!TransactionIdIsNormal(xmax))
1059 : {
1060 : /*
1061 : * no xmax is set, can't have any permanent ones, so this check is
3324 rhaas 1062 ECB : * sufficient
1063 : */
1064 : }
3324 rhaas 1065 GIC 452 : else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
3324 rhaas 1066 ECB : {
1067 : /* only locked, we don't care */
1068 : }
3324 rhaas 1069 CBC 452 : else if (!TransactionIdPrecedes(xmax, cutoff))
1070 : {
1071 : /* tuple has been deleted recently, log */
3324 rhaas 1072 GIC 452 : do_log_xmax = true;
3324 rhaas 1073 ECB : }
1074 :
1075 : /* if neither needs to be logged, we're done */
3324 rhaas 1076 GIC 25907 : if (!do_log_xmin && !do_log_xmax)
3324 rhaas 1077 CBC 25263 : return;
3324 rhaas 1078 ECB :
1079 : /* fill out mapping information */
277 rhaas 1080 GNC 644 : map.old_locator = state->rs_old_rel->rd_locator;
3324 rhaas 1081 GIC 644 : map.old_tid = old_tid;
277 rhaas 1082 GNC 644 : map.new_locator = state->rs_new_rel->rd_locator;
3324 rhaas 1083 GIC 644 : map.new_tid = new_tid;
1084 :
1085 : /* ---
1086 : * Now persist the mapping for the individual xids that are affected. We
1087 : * need to log for both xmin and xmax if they aren't the same transaction
1088 : * since the mapping files are per "affected" xid.
1089 : * We don't muster all that much effort detecting whether xmin and xmax
1090 : * are actually the same transaction, we just check whether the xid is the
1091 : * same disregarding subtransactions. Logging too much is relatively
1092 : * harmless and we could never do the check fully since subtransaction
3324 rhaas 1093 ECB : * data is thrown away during restarts.
1094 : * ---
1095 : */
3324 rhaas 1096 CBC 644 : if (do_log_xmin)
1097 485 : logical_rewrite_log_mapping(state, xmin, &map);
1098 : /* separately log mapping for xmax unless it'd be redundant */
3324 rhaas 1099 GIC 644 : if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
1100 174 : logical_rewrite_log_mapping(state, xmax, &map);
1101 : }
1102 :
1103 : /*
3324 rhaas 1104 EUB : * Replay XLOG_HEAP2_REWRITE records
1105 : */
1106 : void
3062 heikki.linnakangas 1107 UIC 0 : heap_xlog_logical_rewrite(XLogReaderState *r)
1108 : {
1109 : char path[MAXPGPATH];
1110 : int fd;
1111 : xl_heap_rewrite_mapping *xlrec;
3324 rhaas 1112 EUB : uint32 len;
1113 : char *data;
1114 :
3324 rhaas 1115 UIC 0 : xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
1116 :
3324 rhaas 1117 UBC 0 : snprintf(path, MAXPGPATH,
3203 andres 1118 EUB : "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
1119 : xlrec->mapped_db, xlrec->mapped_rel,
775 peter 1120 UBC 0 : LSN_FORMAT_ARGS(xlrec->start_lsn),
3062 heikki.linnakangas 1121 UIC 0 : xlrec->mapped_xid, XLogRecGetXid(r));
3324 rhaas 1122 EUB :
3324 rhaas 1123 UBC 0 : fd = OpenTransientFile(path,
1124 : O_CREAT | O_WRONLY | PG_BINARY);
3324 rhaas 1125 UIC 0 : if (fd < 0)
1126 0 : ereport(ERROR,
1127 : (errcode_for_file_access(),
1128 : errmsg("could not create file \"%s\": %m", path)));
1129 :
1130 : /*
3324 rhaas 1131 EUB : * Truncate all data that's not guaranteed to have been safely fsynced (by
1132 : * previous record or by the last checkpoint).
1133 : */
2213 rhaas 1134 UIC 0 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE);
3324 1135 0 : if (ftruncate(fd, xlrec->offset) != 0)
1136 0 : ereport(ERROR,
3324 rhaas 1137 EUB : (errcode_for_file_access(),
1138 : errmsg("could not truncate file \"%s\" to %u: %m",
1139 : path, (uint32) xlrec->offset)));
2213 rhaas 1140 UIC 0 : pgstat_report_wait_end();
3324 rhaas 1141 EUB :
3324 rhaas 1142 UIC 0 : data = XLogRecGetData(r) + sizeof(*xlrec);
1143 :
3324 rhaas 1144 UBC 0 : len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
3324 rhaas 1145 EUB :
1146 : /* write out tail end of mapping file (again) */
1708 michael 1147 UIC 0 : errno = 0;
2213 rhaas 1148 0 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE);
192 tmunro 1149 UBC 0 : if (pg_pwrite(fd, data, len, xlrec->offset) != len)
1749 michael 1150 EUB : {
1151 : /* if write didn't set errno, assume problem is no disk space */
1749 michael 1152 UIC 0 : if (errno == 0)
1153 0 : errno = ENOSPC;
3324 rhaas 1154 0 : ereport(ERROR,
3324 rhaas 1155 EUB : (errcode_for_file_access(),
1156 : errmsg("could not write to file \"%s\": %m", path)));
1157 : }
2213 rhaas 1158 UIC 0 : pgstat_report_wait_end();
1159 :
1160 : /*
1161 : * Now fsync all previously written data. We could improve things and only
3324 rhaas 1162 EUB : * do this for the last write to a file, but the required bookkeeping
1163 : * doesn't seem worth the trouble.
1164 : */
2213 rhaas 1165 UIC 0 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC);
3324 1166 0 : if (pg_fsync(fd) != 0)
1602 tmunro 1167 UBC 0 : ereport(data_sync_elevel(ERROR),
1168 : (errcode_for_file_access(),
3324 rhaas 1169 EUB : errmsg("could not fsync file \"%s\": %m", path)));
2213 rhaas 1170 UBC 0 : pgstat_report_wait_end();
1171 :
1373 peter 1172 UIC 0 : if (CloseTransientFile(fd) != 0)
1492 michael 1173 UBC 0 : ereport(ERROR,
1174 : (errcode_for_file_access(),
1175 : errmsg("could not close file \"%s\": %m", path)));
3324 rhaas 1176 UIC 0 : }
1177 :
1178 : /* ---
1179 : * Perform a checkpoint for logical rewrite mappings
1180 : *
1181 : * This serves two tasks:
1182 : * 1) Remove all mappings not needed anymore based on the logical restart LSN
1183 : * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
1184 : * only has to deal with the parts of a mapping that have been written out
1185 : * after the checkpoint started.
3324 rhaas 1186 ECB : * ---
1187 : */
1188 : void
3324 rhaas 1189 GIC 2363 : CheckPointLogicalRewriteHeap(void)
1190 : {
1191 : XLogRecPtr cutoff;
1192 : XLogRecPtr redo;
1193 : DIR *mappings_dir;
1194 : struct dirent *mapping_de;
1195 : char path[MAXPGPATH + 20];
1196 :
1197 : /*
3324 rhaas 1198 ECB : * We start of with a minimum of the last redo pointer. No new decoding
1199 : * slot will start before that, so that's a safe upper bound for removal.
1200 : */
3324 rhaas 1201 CBC 2363 : redo = GetRedoRecPtr();
1202 :
1203 : /* now check for the restart ptrs from existing slots */
1204 2363 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
3324 rhaas 1205 EUB :
1206 : /* don't start earlier than the restart lsn */
3324 rhaas 1207 CBC 2363 : if (cutoff != InvalidXLogRecPtr && redo < cutoff)
3324 rhaas 1208 LBC 0 : cutoff = redo;
1209 :
3203 andres 1210 GIC 2363 : mappings_dir = AllocateDir("pg_logical/mappings");
1211 7267 : while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
1212 : {
1213 : Oid dboid;
1214 : Oid relid;
1215 : XLogRecPtr lsn;
1216 : TransactionId rewrite_xid;
1217 : TransactionId create_xid;
3260 bruce 1218 ECB : uint32 hi,
1219 : lo;
1220 : PGFileType de_type;
3324 rhaas 1221 :
3324 rhaas 1222 GIC 4904 : if (strcmp(mapping_de->d_name, ".") == 0 ||
3324 rhaas 1223 CBC 2541 : strcmp(mapping_de->d_name, "..") == 0)
1224 4726 : continue;
1225 :
2189 peter_e 1226 178 : snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
219 michael 1227 GNC 178 : de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
1228 :
1229 178 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
3324 rhaas 1230 UIC 0 : continue;
1231 :
3324 rhaas 1232 ECB : /* Skip over files that cannot be ours. */
3324 rhaas 1233 GBC 178 : if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3324 rhaas 1234 UIC 0 : continue;
3324 rhaas 1235 ECB :
3324 rhaas 1236 GIC 178 : if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3324 rhaas 1237 EUB : &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
3260 bruce 1238 UIC 0 : elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3324 rhaas 1239 ECB :
3324 rhaas 1240 GIC 178 : lsn = ((uint64) hi) << 32 | lo;
3324 rhaas 1241 ECB :
3324 rhaas 1242 GIC 178 : if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
3324 rhaas 1243 ECB : {
3324 rhaas 1244 CBC 89 : elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
3324 rhaas 1245 GBC 89 : if (unlink(path) < 0)
3324 rhaas 1246 UIC 0 : ereport(ERROR,
1247 : (errcode_for_file_access(),
1248 : errmsg("could not remove file \"%s\": %m", path)));
1249 : }
1250 : else
1251 : {
1278 michael 1252 ECB : /* on some operating systems fsyncing a file requires O_RDWR */
1278 michael 1253 GIC 89 : int fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1254 :
1255 : /*
1256 : * The file cannot vanish due to concurrency since this function
1257 : * is the only one removing logical mappings and only one
1258 : * checkpoint can be in progress at a time.
3324 rhaas 1259 ECB : */
3324 rhaas 1260 GBC 89 : if (fd < 0)
3324 rhaas 1261 UIC 0 : ereport(ERROR,
1262 : (errcode_for_file_access(),
1263 : errmsg("could not open file \"%s\": %m", path)));
1264 :
1265 : /*
1266 : * We could try to avoid fsyncing files that either haven't
1267 : * changed or have only been created since the checkpoint's start,
1268 : * but it's currently not deemed worth the effort.
3324 rhaas 1269 ECB : */
2213 rhaas 1270 CBC 89 : pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC);
2213 rhaas 1271 GBC 89 : if (pg_fsync(fd) != 0)
1602 tmunro 1272 UIC 0 : ereport(data_sync_elevel(ERROR),
1273 : (errcode_for_file_access(),
3324 rhaas 1274 ECB : errmsg("could not fsync file \"%s\": %m", path)));
2213 rhaas 1275 GIC 89 : pgstat_report_wait_end();
1492 michael 1276 ECB :
1373 peter 1277 GBC 89 : if (CloseTransientFile(fd) != 0)
1492 michael 1278 UIC 0 : ereport(ERROR,
1279 : (errcode_for_file_access(),
1280 : errmsg("could not close file \"%s\": %m", path)));
1281 : }
3324 rhaas 1282 ECB : }
3324 rhaas 1283 GIC 2363 : FreeDir(mappings_dir);
1284 :
443 andres 1285 ECB : /* persist directory entries to disk */
443 andres 1286 CBC 2363 : fsync_fname("pg_logical/mappings", true);
3324 rhaas 1287 GIC 2363 : }
|