LCOV - differential code coverage report
Current view: top level - src/backend/access/heap - rewriteheap.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 83.4 % 326 272 1 39 14 11 145 18 98 28 163 1 10
Current Date: 2023-04-08 15:15:32 Functions: 91.7 % 12 11 1 11 1 11
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * rewriteheap.c
       4                 :  *    Support functions to rewrite tables.
       5                 :  *
       6                 :  * These functions provide a facility to completely rewrite a heap, while
       7                 :  * preserving visibility information and update chains.
       8                 :  *
       9                 :  * INTERFACE
      10                 :  *
      11                 :  * The caller is responsible for creating the new heap, all catalog
      12                 :  * changes, supplying the tuples to be written to the new heap, and
      13                 :  * rebuilding indexes.  The caller must hold AccessExclusiveLock on the
      14                 :  * target table, because we assume no one else is writing into it.
      15                 :  *
      16                 :  * To use the facility:
      17                 :  *
      18                 :  * begin_heap_rewrite
      19                 :  * while (fetch next tuple)
      20                 :  * {
      21                 :  *     if (tuple is dead)
      22                 :  *         rewrite_heap_dead_tuple
      23                 :  *     else
      24                 :  *     {
      25                 :  *         // do any transformations here if required
      26                 :  *         rewrite_heap_tuple
      27                 :  *     }
      28                 :  * }
      29                 :  * end_heap_rewrite
      30                 :  *
      31                 :  * The contents of the new relation shouldn't be relied on until after
      32                 :  * end_heap_rewrite is called.
      33                 :  *
      34                 :  *
      35                 :  * IMPLEMENTATION
      36                 :  *
      37                 :  * This would be a fairly trivial affair, except that we need to maintain
      38                 :  * the ctid chains that link versions of an updated tuple together.
      39                 :  * Since the newly stored tuples will have tids different from the original
      40                 :  * ones, if we just copied t_ctid fields to the new table the links would
      41                 :  * be wrong.  When we are required to copy a (presumably recently-dead or
      42                 :  * delete-in-progress) tuple whose ctid doesn't point to itself, we have
      43                 :  * to substitute the correct ctid instead.
      44                 :  *
      45                 :  * For each ctid reference from A -> B, we might encounter either A first
      46                 :  * or B first.  (Note that a tuple in the middle of a chain is both A and B
      47                 :  * of different pairs.)
      48                 :  *
      49                 :  * If we encounter A first, we'll store the tuple in the unresolved_tups
      50                 :  * hash table. When we later encounter B, we remove A from the hash table,
      51                 :  * fix the ctid to point to the new location of B, and insert both A and B
      52                 :  * to the new heap.
      53                 :  *
      54                 :  * If we encounter B first, we can insert B to the new heap right away.
      55                 :  * We then add an entry to the old_new_tid_map hash table showing B's
      56                 :  * original tid (in the old heap) and new tid (in the new heap).
      57                 :  * When we later encounter A, we get the new location of B from the table,
      58                 :  * and can write A immediately with the correct ctid.
      59                 :  *
      60                 :  * Entries in the hash tables can be removed as soon as the later tuple
      61                 :  * is encountered.  That helps to keep the memory usage down.  At the end,
      62                 :  * both tables are usually empty; we should have encountered both A and B
      63                 :  * of each pair.  However, it's possible for A to be RECENTLY_DEAD and B
      64                 :  * entirely DEAD according to HeapTupleSatisfiesVacuum, because the test
      65                 :  * for deadness using OldestXmin is not exact.  In such a case we might
      66                 :  * encounter B first, and skip it, and find A later.  Then A would be added
      67                 :  * to unresolved_tups, and stay there until end of the rewrite.  Since
      68                 :  * this case is very unusual, we don't worry about the memory usage.
      69                 :  *
      70                 :  * Using in-memory hash tables means that we use some memory for each live
      71                 :  * update chain in the table, from the time we find one end of the
      72                 :  * reference until we find the other end.  That shouldn't be a problem in
      73                 :  * practice, but if you do something like an UPDATE without a where-clause
      74                 :  * on a large table, and then run CLUSTER in the same transaction, you
      75                 :  * could run out of memory.  It doesn't seem worthwhile to add support for
      76                 :  * spill-to-disk, as there shouldn't be that many RECENTLY_DEAD tuples in a
      77                 :  * table under normal circumstances.  Furthermore, in the typical scenario
      78                 :  * of CLUSTERing on an unchanging key column, we'll see all the versions
      79                 :  * of a given tuple together anyway, and so the peak memory usage is only
      80                 :  * proportional to the number of RECENTLY_DEAD versions of a single row, not
      81                 :  * in the whole table.  Note that if we do fail halfway through a CLUSTER,
      82                 :  * the old table is still valid, so failure is not catastrophic.
      83                 :  *
      84                 :  * We can't use the normal heap_insert function to insert into the new
      85                 :  * heap, because heap_insert overwrites the visibility information.
      86                 :  * We use a special-purpose raw_heap_insert function instead, which
      87                 :  * is optimized for bulk inserting a lot of tuples, knowing that we have
      88                 :  * exclusive access to the heap.  raw_heap_insert builds new pages in
      89                 :  * local storage.  When a page is full, or at the end of the process,
      90                 :  * we insert it to WAL as a single record and then write it to disk
      91                 :  * directly through smgr.  Note, however, that any data sent to the new
      92                 :  * heap's TOAST table will go through the normal bufmgr.
      93                 :  *
      94                 :  *
      95                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      96                 :  * Portions Copyright (c) 1994-5, Regents of the University of California
      97                 :  *
      98                 :  * IDENTIFICATION
      99                 :  *    src/backend/access/heap/rewriteheap.c
     100                 :  *
     101                 :  *-------------------------------------------------------------------------
     102                 :  */
     103                 : #include "postgres.h"
     104                 : 
     105                 : #include <unistd.h>
     106                 : 
     107                 : #include "access/heapam.h"
     108                 : #include "access/heapam_xlog.h"
     109                 : #include "access/heaptoast.h"
     110                 : #include "access/rewriteheap.h"
     111                 : #include "access/transam.h"
     112                 : #include "access/xact.h"
     113                 : #include "access/xloginsert.h"
     114                 : #include "catalog/catalog.h"
     115                 : #include "common/file_utils.h"
     116                 : #include "lib/ilist.h"
     117                 : #include "miscadmin.h"
     118                 : #include "pgstat.h"
     119                 : #include "replication/logical.h"
     120                 : #include "replication/slot.h"
     121                 : #include "storage/bufmgr.h"
     122                 : #include "storage/fd.h"
     123                 : #include "storage/procarray.h"
     124                 : #include "storage/smgr.h"
     125                 : #include "utils/memutils.h"
     126                 : #include "utils/rel.h"
     127                 : 
     128                 : /*
     129                 :  * State associated with a rewrite operation. This is opaque to the user
     130                 :  * of the rewrite facility.
     131                 :  */
     132                 : typedef struct RewriteStateData
     133                 : {
     134                 :     Relation    rs_old_rel;     /* source heap */
     135                 :     Relation    rs_new_rel;     /* destination heap */
     136                 :     Page        rs_buffer;      /* page currently being built */
     137                 :     BlockNumber rs_blockno;     /* block where page will go */
     138                 :     bool        rs_buffer_valid;    /* T if any tuples in buffer */
     139                 :     bool        rs_logical_rewrite; /* do we need to do logical rewriting */
     140                 :     TransactionId rs_oldest_xmin;   /* oldest xmin used by caller to determine
     141                 :                                      * tuple visibility */
     142                 :     TransactionId rs_freeze_xid;    /* Xid that will be used as freeze cutoff
     143                 :                                      * point */
     144                 :     TransactionId rs_logical_xmin;  /* Xid that will be used as cutoff point
     145                 :                                      * for logical rewrites */
     146                 :     MultiXactId rs_cutoff_multi;    /* MultiXactId that will be used as cutoff
     147                 :                                      * point for multixacts */
     148                 :     MemoryContext rs_cxt;       /* for hash tables and entries and tuples in
     149                 :                                  * them */
     150                 :     XLogRecPtr  rs_begin_lsn;   /* XLogInsertLsn when starting the rewrite */
     151                 :     HTAB       *rs_unresolved_tups; /* unmatched A tuples */
     152                 :     HTAB       *rs_old_new_tid_map; /* unmatched B tuples */
     153                 :     HTAB       *rs_logical_mappings;    /* logical remapping files */
     154                 :     uint32      rs_num_rewrite_mappings;    /* # in memory mappings */
     155                 : }           RewriteStateData;
     156                 : 
     157                 : /*
     158                 :  * The lookup keys for the hash tables are tuple TID and xmin (we must check
     159                 :  * both to avoid false matches from dead tuples).  Beware that there is
     160                 :  * probably some padding space in this struct; it must be zeroed out for
     161                 :  * correct hashtable operation.
     162                 :  */
     163                 : typedef struct
     164                 : {
     165                 :     TransactionId xmin;         /* tuple xmin */
     166                 :     ItemPointerData tid;        /* tuple location in old heap */
     167                 : } TidHashKey;
     168                 : 
     169                 : /*
     170                 :  * Entry structures for the hash tables
     171                 :  */
     172                 : typedef struct
     173                 : {
     174                 :     TidHashKey  key;            /* expected xmin/old location of B tuple */
     175                 :     ItemPointerData old_tid;    /* A's location in the old heap */
     176                 :     HeapTuple   tuple;          /* A's tuple contents */
     177                 : } UnresolvedTupData;
     178                 : 
     179                 : typedef UnresolvedTupData *UnresolvedTup;
     180                 : 
     181                 : typedef struct
     182                 : {
     183                 :     TidHashKey  key;            /* actual xmin/old location of B tuple */
     184                 :     ItemPointerData new_tid;    /* where we put it in the new heap */
     185                 : } OldToNewMappingData;
     186                 : 
     187                 : typedef OldToNewMappingData *OldToNewMapping;
     188                 : 
     189                 : /*
     190                 :  * In-Memory data for an xid that might need logical remapping entries
     191                 :  * to be logged.
     192                 :  */
     193                 : typedef struct RewriteMappingFile
     194                 : {
     195                 :     TransactionId xid;          /* xid that might need to see the row */
     196                 :     int         vfd;            /* fd of mappings file */
     197                 :     off_t       off;            /* how far have we written yet */
     198                 :     dclist_head mappings;       /* list of in-memory mappings */
     199                 :     char        path[MAXPGPATH];    /* path, for error messages */
     200                 : } RewriteMappingFile;
     201                 : 
     202                 : /*
     203                 :  * A single In-Memory logical rewrite mapping, hanging off
     204                 :  * RewriteMappingFile->mappings.
     205                 :  */
     206                 : typedef struct RewriteMappingDataEntry
     207                 : {
     208                 :     LogicalRewriteMappingData map;  /* map between old and new location of the
     209                 :                                      * tuple */
     210                 :     dlist_node  node;
     211                 : } RewriteMappingDataEntry;
     212                 : 
     213                 : 
     214                 : /* prototypes for internal functions */
     215                 : static void raw_heap_insert(RewriteState state, HeapTuple tup);
     216                 : 
     217                 : /* internal logical remapping prototypes */
     218                 : static void logical_begin_heap_rewrite(RewriteState state);
     219                 : static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
     220                 : static void logical_end_heap_rewrite(RewriteState state);
     221                 : 
     222                 : 
     223                 : /*
     224                 :  * Begin a rewrite of a table
     225                 :  *
     226                 :  * old_heap     old, locked heap relation tuples will be read from
     227                 :  * new_heap     new, locked heap relation to insert tuples to
     228                 :  * oldest_xmin  xid used by the caller to determine which tuples are dead
     229                 :  * freeze_xid   xid before which tuples will be frozen
     230                 :  * cutoff_multi multixact before which multis will be removed
     231                 :  *
     232                 :  * Returns an opaque RewriteState, allocated in current memory context,
     233                 :  * to be used in subsequent calls to the other functions.
     234                 :  */
     235 ECB             : RewriteState
     236 GIC         262 : begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
     237                 :                    TransactionId freeze_xid, MultiXactId cutoff_multi)
     238                 : {
     239                 :     RewriteState state;
     240                 :     MemoryContext rw_cxt;
     241                 :     MemoryContext old_cxt;
     242                 :     HASHCTL     hash_ctl;
     243                 : 
     244                 :     /*
     245                 :      * To ease cleanup, make a separate context that will contain the
     246                 :      * RewriteState struct itself plus all subsidiary data.
     247 ECB             :      */
     248 GIC         262 :     rw_cxt = AllocSetContextCreate(CurrentMemoryContext,
     249                 :                                    "Table rewrite",
     250 ECB             :                                    ALLOCSET_DEFAULT_SIZES);
     251 GIC         262 :     old_cxt = MemoryContextSwitchTo(rw_cxt);
     252                 : 
     253 ECB             :     /* Create and fill in the state struct */
     254 GIC         262 :     state = palloc0(sizeof(RewriteStateData));
     255 ECB             : 
     256 CBC         262 :     state->rs_old_rel = old_heap;
     257             262 :     state->rs_new_rel = new_heap;
     258 GNC         262 :     state->rs_buffer = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
     259 ECB             :     /* new_heap needn't be empty, just locked */
     260 CBC         262 :     state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
     261             262 :     state->rs_buffer_valid = false;
     262             262 :     state->rs_oldest_xmin = oldest_xmin;
     263             262 :     state->rs_freeze_xid = freeze_xid;
     264             262 :     state->rs_cutoff_multi = cutoff_multi;
     265 GIC         262 :     state->rs_cxt = rw_cxt;
     266                 : 
     267 ECB             :     /* Initialize hash tables used to track update chains */
     268 CBC         262 :     hash_ctl.keysize = sizeof(TidHashKey);
     269             262 :     hash_ctl.entrysize = sizeof(UnresolvedTupData);
     270 GIC         262 :     hash_ctl.hcxt = state->rs_cxt;
     271 ECB             : 
     272 CBC         262 :     state->rs_unresolved_tups =
     273 GIC         262 :         hash_create("Rewrite / Unresolved ctids",
     274                 :                     128,        /* arbitrary initial size */
     275                 :                     &hash_ctl,
     276                 :                     HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     277 ECB             : 
     278 GIC         262 :     hash_ctl.entrysize = sizeof(OldToNewMappingData);
     279 ECB             : 
     280 CBC         262 :     state->rs_old_new_tid_map =
     281 GIC         262 :         hash_create("Rewrite / Old to new tid map",
     282                 :                     128,        /* arbitrary initial size */
     283                 :                     &hash_ctl,
     284                 :                     HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     285 ECB             : 
     286 GIC         262 :     MemoryContextSwitchTo(old_cxt);
     287 ECB             : 
     288 GIC         262 :     logical_begin_heap_rewrite(state);
     289 ECB             : 
     290 GIC         262 :     return state;
     291                 : }
     292                 : 
     293                 : /*
     294                 :  * End a rewrite.
     295                 :  *
     296                 :  * state and any other resources are freed.
     297                 :  */
     298 ECB             : void
     299 GIC         262 : end_heap_rewrite(RewriteState state)
     300                 : {
     301                 :     HASH_SEQ_STATUS seq_status;
     302                 :     UnresolvedTup unresolved;
     303                 : 
     304                 :     /*
     305                 :      * Write any remaining tuples in the UnresolvedTups table. If we have any
     306                 :      * left, they should in fact be dead, but let's err on the safe side.
     307 ECB             :      */
     308 GIC         262 :     hash_seq_init(&seq_status, state->rs_unresolved_tups);
     309 ECB             : 
     310 GIC         262 :     while ((unresolved = hash_seq_search(&seq_status)) != NULL)
     311 EUB             :     {
     312 UBC           0 :         ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
     313 UIC           0 :         raw_heap_insert(state, unresolved->tuple);
     314                 :     }
     315                 : 
     316 ECB             :     /* Write the last page, if any */
     317 GIC         262 :     if (state->rs_buffer_valid)
     318 ECB             :     {
     319 CBC         173 :         if (RelationNeedsWAL(state->rs_new_rel))
     320 GNC          88 :             log_newpage(&state->rs_new_rel->rd_locator,
     321                 :                         MAIN_FORKNUM,
     322                 :                         state->rs_blockno,
     323                 :                         state->rs_buffer,
     324                 :                         true);
     325 ECB             : 
     326 GIC         173 :         PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
     327 ECB             : 
     328 CBC         173 :         smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
     329 GNC         173 :                    state->rs_blockno, state->rs_buffer, true);
     330                 :     }
     331                 : 
     332                 :     /*
     333                 :      * When we WAL-logged rel pages, we must nonetheless fsync them.  The
     334                 :      * reason is the same as in storage.c's RelationCopyStorage(): we're
     335                 :      * writing data that's not in shared buffers, and so a CHECKPOINT
     336                 :      * occurring during the rewriteheap operation won't have fsync'd data we
     337                 :      * wrote before the checkpoint.
     338 ECB             :      */
     339 CBC         262 :     if (RelationNeedsWAL(state->rs_new_rel))
     340 GIC         128 :         smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
     341 ECB             : 
     342 GIC         262 :     logical_end_heap_rewrite(state);
     343                 : 
     344 ECB             :     /* Deleting the context frees everything */
     345 CBC         262 :     MemoryContextDelete(state->rs_cxt);
     346 GIC         262 : }
     347                 : 
     348                 : /*
     349                 :  * Add a tuple to the new heap.
     350                 :  *
     351                 :  * Visibility information is copied from the original tuple, except that
     352                 :  * we "freeze" very-old tuples.  Note that since we scribble on new_tuple,
     353                 :  * it had better be temp storage not a pointer to the original tuple.
     354                 :  *
     355                 :  * state        opaque state as returned by begin_heap_rewrite
     356                 :  * old_tuple    original tuple in the old heap
     357                 :  * new_tuple    new, rewritten tuple to be inserted to new heap
     358                 :  */
     359 ECB             : void
     360 GIC      384627 : rewrite_heap_tuple(RewriteState state,
     361                 :                    HeapTuple old_tuple, HeapTuple new_tuple)
     362                 : {
     363                 :     MemoryContext old_cxt;
     364                 :     ItemPointerData old_tid;
     365                 :     TidHashKey  hashkey;
     366                 :     bool        found;
     367                 :     bool        free_new;
     368 ECB             : 
     369 GIC      384627 :     old_cxt = MemoryContextSwitchTo(state->rs_cxt);
     370                 : 
     371                 :     /*
     372                 :      * Copy the original tuple's visibility information into new_tuple.
     373                 :      *
     374                 :      * XXX we might later need to copy some t_infomask2 bits, too? Right now,
     375                 :      * we intentionally clear the HOT status bits.
     376 ECB             :      */
     377 CBC      384627 :     memcpy(&new_tuple->t_data->t_choice.t_heap,
     378 GIC      384627 :            &old_tuple->t_data->t_choice.t_heap,
     379                 :            sizeof(HeapTupleFields));
     380 ECB             : 
     381 CBC      384627 :     new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
     382          384627 :     new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
     383          384627 :     new_tuple->t_data->t_infomask |=
     384 GIC      384627 :         old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
     385                 : 
     386                 :     /*
     387                 :      * While we have our hands on the tuple, we may as well freeze any
     388                 :      * eligible xmin or xmax, so that future VACUUM effort can be saved.
     389 ECB             :      */
     390 CBC      384627 :     heap_freeze_tuple(new_tuple->t_data,
     391          384627 :                       state->rs_old_rel->rd_rel->relfrozenxid,
     392 GIC      384627 :                       state->rs_old_rel->rd_rel->relminmxid,
     393                 :                       state->rs_freeze_xid,
     394                 :                       state->rs_cutoff_multi);
     395                 : 
     396                 :     /*
     397                 :      * Invalid ctid means that ctid should point to the tuple itself. We'll
     398                 :      * override it later if the tuple is part of an update chain.
     399 ECB             :      */
     400 GIC      384627 :     ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
     401                 : 
     402                 :     /*
     403                 :      * If the tuple has been updated, check the old-to-new mapping hash table.
     404 ECB             :      */
     405 CBC      417155 :     if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
     406           32528 :           HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
     407           32528 :         !HeapTupleHeaderIndicatesMovedPartitions(old_tuple->t_data) &&
     408           32528 :         !(ItemPointerEquals(&(old_tuple->t_self),
     409 GIC       32528 :                             &(old_tuple->t_data->t_ctid))))
     410                 :     {
     411                 :         OldToNewMapping mapping;
     412 ECB             : 
     413 CBC         513 :         memset(&hashkey, 0, sizeof(hashkey));
     414             513 :         hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
     415 GIC         513 :         hashkey.tid = old_tuple->t_data->t_ctid;
     416                 : 
     417 ECB             :         mapping = (OldToNewMapping)
     418 GIC         513 :             hash_search(state->rs_old_new_tid_map, &hashkey,
     419                 :                         HASH_FIND, NULL);
     420 ECB             : 
     421 GIC         513 :         if (mapping != NULL)
     422                 :         {
     423                 :             /*
     424                 :              * We've already copied the tuple that t_ctid points to, so we can
     425                 :              * set the ctid of this tuple to point to the new location, and
     426                 :              * insert it right away.
     427 ECB             :              */
     428 GIC         181 :             new_tuple->t_data->t_ctid = mapping->new_tid;
     429                 : 
     430 ECB             :             /* We don't need the mapping entry anymore */
     431 GIC         181 :             hash_search(state->rs_old_new_tid_map, &hashkey,
     432 ECB             :                         HASH_REMOVE, &found);
     433 GIC         181 :             Assert(found);
     434                 :         }
     435                 :         else
     436                 :         {
     437                 :             /*
     438                 :              * We haven't seen the tuple t_ctid points to yet. Stash this
     439                 :              * tuple into unresolved_tups to be written later.
     440                 :              */
     441                 :             UnresolvedTup unresolved;
     442 ECB             : 
     443 GIC         332 :             unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
     444 ECB             :                                      HASH_ENTER, &found);
     445 GIC         332 :             Assert(!found);
     446 ECB             : 
     447 CBC         332 :             unresolved->old_tid = old_tuple->t_self;
     448 GIC         332 :             unresolved->tuple = heap_copytuple(new_tuple);
     449                 : 
     450                 :             /*
     451                 :              * We can't do anything more now, since we don't know where the
     452                 :              * tuple will be written.
     453 ECB             :              */
     454 CBC         332 :             MemoryContextSwitchTo(old_cxt);
     455 GIC         332 :             return;
     456                 :         }
     457                 :     }
     458                 : 
     459                 :     /*
     460                 :      * Now we will write the tuple, and then check to see if it is the B tuple
     461                 :      * in any new or known pair.  When we resolve a known pair, we will be
     462                 :      * able to write that pair's A tuple, and then we have to check if it
     463                 :      * resolves some other pair.  Hence, we need a loop here.
     464 ECB             :      */
     465 CBC      384295 :     old_tid = old_tuple->t_self;
     466 GIC      384295 :     free_new = false;
     467                 : 
     468 ECB             :     for (;;)
     469 GIC         332 :     {
     470                 :         ItemPointerData new_tid;
     471                 : 
     472 ECB             :         /* Insert the tuple and find out where it's put in new_heap */
     473 CBC      384627 :         raw_heap_insert(state, new_tuple);
     474 GIC      384627 :         new_tid = new_tuple->t_self;
     475 ECB             : 
     476 GIC      384627 :         logical_rewrite_heap_tuple(state, old_tid, new_tuple);
     477                 : 
     478                 :         /*
     479                 :          * If the tuple is the updated version of a row, and the prior version
     480                 :          * wouldn't be DEAD yet, then we need to either resolve the prior
     481                 :          * version (if it's waiting in rs_unresolved_tups), or make an entry
     482                 :          * in rs_old_new_tid_map (so we can resolve it when we do see it). The
     483                 :          * previous tuple's xmax would equal this one's xmin, so it's
     484                 :          * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
     485 ECB             :          */
     486 CBC      384627 :         if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
     487 GIC        8150 :             !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
     488                 :                                    state->rs_oldest_xmin))
     489                 :         {
     490                 :             /*
     491                 :              * Okay, this is B in an update pair.  See if we've seen A.
     492                 :              */
     493                 :             UnresolvedTup unresolved;
     494 ECB             : 
     495 CBC         513 :             memset(&hashkey, 0, sizeof(hashkey));
     496             513 :             hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
     497 GIC         513 :             hashkey.tid = old_tid;
     498 ECB             : 
     499 GIC         513 :             unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
     500                 :                                      HASH_FIND, NULL);
     501 ECB             : 
     502 GIC         513 :             if (unresolved != NULL)
     503                 :             {
     504                 :                 /*
     505                 :                  * We have seen and memorized the previous tuple already. Now
     506                 :                  * that we know where we inserted the tuple its t_ctid points
     507                 :                  * to, fix its t_ctid and insert it to the new heap.
     508 ECB             :                  */
     509 CBC         332 :                 if (free_new)
     510              88 :                     heap_freetuple(new_tuple);
     511             332 :                 new_tuple = unresolved->tuple;
     512             332 :                 free_new = true;
     513             332 :                 old_tid = unresolved->old_tid;
     514 GIC         332 :                 new_tuple->t_data->t_ctid = new_tid;
     515                 : 
     516                 :                 /*
     517                 :                  * We don't need the hash entry anymore, but don't free its
     518                 :                  * tuple just yet.
     519 ECB             :                  */
     520 GIC         332 :                 hash_search(state->rs_unresolved_tups, &hashkey,
     521 ECB             :                             HASH_REMOVE, &found);
     522 GIC         332 :                 Assert(found);
     523                 : 
     524 ECB             :                 /* loop back to insert the previous tuple in the chain */
     525 GIC         332 :                 continue;
     526                 :             }
     527                 :             else
     528                 :             {
     529                 :                 /*
     530                 :                  * Remember the new tid of this tuple. We'll use it to set the
     531                 :                  * ctid when we find the previous tuple in the chain.
     532                 :                  */
     533                 :                 OldToNewMapping mapping;
     534 ECB             : 
     535 GIC         181 :                 mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
     536 ECB             :                                       HASH_ENTER, &found);
     537 GIC         181 :                 Assert(!found);
     538 ECB             : 
     539 GIC         181 :                 mapping->new_tid = new_tid;
     540                 :             }
     541                 :         }
     542                 : 
     543 ECB             :         /* Done with this (chain of) tuples, for now */
     544 CBC      384295 :         if (free_new)
     545             244 :             heap_freetuple(new_tuple);
     546 GIC      384295 :         break;
     547                 :     }
     548 ECB             : 
     549 GIC      384295 :     MemoryContextSwitchTo(old_cxt);
     550                 : }
     551                 : 
     552                 : /*
     553                 :  * Register a dead tuple with an ongoing rewrite. Dead tuples are not
     554                 :  * copied to the new table, but we still make note of them so that we
     555                 :  * can release some resources earlier.
     556                 :  *
     557                 :  * Returns true if a tuple was removed from the unresolved_tups table.
     558                 :  * This indicates that that tuple, previously thought to be "recently dead",
     559                 :  * is now known really dead and won't be written to the output.
     560                 :  */
     561 ECB             : bool
     562 GIC       11411 : rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
     563                 : {
     564                 :     /*
     565                 :      * If we have already seen an earlier tuple in the update chain that
     566                 :      * points to this tuple, let's forget about that earlier tuple. It's in
     567                 :      * fact dead as well, our simple xmax < OldestXmin test in
     568                 :      * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
     569                 :      * when xmin of a tuple is greater than xmax, which sounds
     570                 :      * counter-intuitive but is perfectly valid.
     571                 :      *
     572                 :      * We don't bother to try to detect the situation the other way round,
     573                 :      * when we encounter the dead tuple first and then the recently dead one
     574                 :      * that points to it. If that happens, we'll have some unmatched entries
     575                 :      * in the UnresolvedTups hash table at the end. That can happen anyway,
     576                 :      * because a vacuum might have removed the dead tuple in the chain before
     577                 :      * us.
     578                 :      */
     579                 :     UnresolvedTup unresolved;
     580                 :     TidHashKey  hashkey;
     581                 :     bool        found;
     582 ECB             : 
     583 CBC       11411 :     memset(&hashkey, 0, sizeof(hashkey));
     584           11411 :     hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
     585 GIC       11411 :     hashkey.tid = old_tuple->t_self;
     586 ECB             : 
     587 GIC       11411 :     unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
     588                 :                              HASH_FIND, NULL);
     589 ECB             : 
     590 GIC       11411 :     if (unresolved != NULL)
     591                 :     {
     592 EUB             :         /* Need to free the contained tuple as well as the hashtable entry */
     593 UBC           0 :         heap_freetuple(unresolved->tuple);
     594 UIC           0 :         hash_search(state->rs_unresolved_tups, &hashkey,
     595 EUB             :                     HASH_REMOVE, &found);
     596 UBC           0 :         Assert(found);
     597 UIC           0 :         return true;
     598                 :     }
     599 ECB             : 
     600 GIC       11411 :     return false;
     601                 : }
     602                 : 
     603                 : /*
     604                 :  * Insert a tuple to the new relation.  This has to track heap_insert
     605                 :  * and its subsidiary functions!
     606                 :  *
     607                 :  * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
     608                 :  * tuple is invalid on entry, it's replaced with the new TID as well (in
     609                 :  * the inserted data only, not in the caller's copy).
     610                 :  */
     611 ECB             : static void
     612 GIC      384627 : raw_heap_insert(RewriteState state, HeapTuple tup)
     613 ECB             : {
     614 GIC      384627 :     Page        page = state->rs_buffer;
     615                 :     Size        pageFreeSpace,
     616                 :                 saveFreeSpace;
     617                 :     Size        len;
     618                 :     OffsetNumber newoff;
     619                 :     HeapTuple   heaptup;
     620                 : 
     621                 :     /*
     622                 :      * If the new tuple is too big for storage or contains already toasted
     623                 :      * out-of-line attributes from some other relation, invoke the toaster.
     624                 :      *
     625                 :      * Note: below this point, heaptup is the data we actually intend to store
     626                 :      * into the relation; tup is the caller's original untoasted data.
     627 ECB             :      */
     628 GIC      384627 :     if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
     629                 :     {
     630 EUB             :         /* toast table entries should never be recursively toasted */
     631 UBC           0 :         Assert(!HeapTupleHasExternal(tup));
     632 UIC           0 :         heaptup = tup;
     633 ECB             :     }
     634 CBC      384627 :     else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
     635             292 :     {
     636 GIC         292 :         int         options = HEAP_INSERT_SKIP_FSM;
     637                 : 
     638                 :         /*
     639                 :          * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
     640                 :          * for the TOAST table are not logically decoded.  The main heap is
     641                 :          * WAL-logged as XLOG FPI records, which are not logically decoded.
     642 ECB             :          */
     643 GIC         292 :         options |= HEAP_INSERT_NO_LOGICAL;
     644 ECB             : 
     645 GIC         292 :         heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL,
     646                 :                                               options);
     647                 :     }
     648 ECB             :     else
     649 GIC      384335 :         heaptup = tup;
     650 ECB             : 
     651 GIC      384627 :     len = MAXALIGN(heaptup->t_len); /* be conservative */
     652                 : 
     653                 :     /*
     654                 :      * If we're gonna fail for oversize tuple, do it right away
     655 ECB             :      */
     656 GBC      384627 :     if (len > MaxHeapTupleSize)
     657 UIC           0 :         ereport(ERROR,
     658                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     659                 :                  errmsg("row is too big: size %zu, maximum size %zu",
     660                 :                         len, MaxHeapTupleSize)));
     661                 : 
     662 ECB             :     /* Compute desired extra freespace due to fillfactor option */
     663 GIC      384627 :     saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
     664                 :                                                    HEAP_DEFAULT_FILLFACTOR);
     665                 : 
     666 ECB             :     /* Now we can check to see if there's enough free space already. */
     667 GIC      384627 :     if (state->rs_buffer_valid)
     668 ECB             :     {
     669 GIC      384454 :         pageFreeSpace = PageGetHeapFreeSpace(page);
     670 ECB             : 
     671 GIC      384454 :         if (len + saveFreeSpace > pageFreeSpace)
     672                 :         {
     673                 :             /*
     674                 :              * Doesn't fit, so write out the existing page.  It always
     675                 :              * contains a tuple.  Hence, unlike RelationGetBufferForTuple(),
     676                 :              * enforce saveFreeSpace unconditionally.
     677                 :              */
     678                 : 
     679 ECB             :             /* XLOG stuff */
     680 CBC        5160 :             if (RelationNeedsWAL(state->rs_new_rel))
     681 GNC        1401 :                 log_newpage(&state->rs_new_rel->rd_locator,
     682                 :                             MAIN_FORKNUM,
     683                 :                             state->rs_blockno,
     684                 :                             page,
     685                 :                             true);
     686                 : 
     687                 :             /*
     688                 :              * Now write the page. We say skipFsync = true because there's no
     689                 :              * need for smgr to schedule an fsync for this write; we'll do it
     690                 :              * ourselves in end_heap_rewrite.
     691 ECB             :              */
     692 GIC        5160 :             PageSetChecksumInplace(page, state->rs_blockno);
     693 ECB             : 
     694 GIC        5160 :             smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
     695                 :                        state->rs_blockno, page, true);
     696 ECB             : 
     697 CBC        5160 :             state->rs_blockno++;
     698 GIC        5160 :             state->rs_buffer_valid = false;
     699                 :         }
     700                 :     }
     701 ECB             : 
     702 GIC      384627 :     if (!state->rs_buffer_valid)
     703                 :     {
     704 ECB             :         /* Initialize a new empty page */
     705 CBC        5333 :         PageInit(page, BLCKSZ, 0);
     706 GIC        5333 :         state->rs_buffer_valid = true;
     707                 :     }
     708                 : 
     709 ECB             :     /* And now we can insert the tuple into the page */
     710 GIC      384627 :     newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
     711 ECB             :                          InvalidOffsetNumber, false, true);
     712 GBC      384627 :     if (newoff == InvalidOffsetNumber)
     713 UIC           0 :         elog(ERROR, "failed to add tuple");
     714                 : 
     715 ECB             :     /* Update caller's t_self to the actual position where it was stored */
     716 GIC      384627 :     ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
     717                 : 
     718                 :     /*
     719                 :      * Insert the correct position into CTID of the stored tuple, too, if the
     720                 :      * caller didn't supply a valid CTID.
     721 ECB             :      */
     722 GIC      384627 :     if (!ItemPointerIsValid(&tup->t_data->t_ctid))
     723                 :     {
     724                 :         ItemId      newitemid;
     725                 :         HeapTupleHeader onpage_tup;
     726 ECB             : 
     727 CBC      384114 :         newitemid = PageGetItemId(page, newoff);
     728 GIC      384114 :         onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
     729 ECB             : 
     730 GIC      384114 :         onpage_tup->t_ctid = tup->t_self;
     731                 :     }
     732                 : 
     733 ECB             :     /* If heaptup is a private copy, release it. */
     734 CBC      384627 :     if (heaptup != tup)
     735             292 :         heap_freetuple(heaptup);
     736 GIC      384627 : }
     737                 : 
     738                 : /* ------------------------------------------------------------------------
     739                 :  * Logical rewrite support
     740                 :  *
     741                 :  * When doing logical decoding - which relies on using cmin/cmax of catalog
     742                 :  * tuples, via xl_heap_new_cid records - heap rewrites have to log enough
     743                 :  * information to allow the decoding backend to update its internal mapping
     744                 :  * of (relfilelocator,ctid) => (cmin, cmax) to be correct for the rewritten heap.
     745                 :  *
     746                 :  * For that, every time we find a tuple that's been modified in a catalog
     747                 :  * relation within the xmin horizon of any decoding slot, we log a mapping
     748                 :  * from the old to the new location.
     749                 :  *
     750                 :  * To deal with rewrites that abort the filename of a mapping file contains
     751                 :  * the xid of the transaction performing the rewrite, which then can be
     752                 :  * checked before being read in.
     753                 :  *
     754                 :  * For efficiency we don't immediately spill every single map mapping for a
     755                 :  * row to disk but only do so in batches when we've collected several of them
     756                 :  * in memory or when end_heap_rewrite() has been called.
     757                 :  *
     758                 :  * Crash-Safety: This module diverts from the usual patterns of doing WAL
     759                 :  * since it cannot rely on checkpoint flushing out all buffers and thus
     760                 :  * waiting for exclusive locks on buffers. Usually the XLogInsert() covering
     761                 :  * buffer modifications is performed while the buffer(s) that are being
     762                 :  * modified are exclusively locked guaranteeing that both the WAL record and
     763                 :  * the modified heap are on either side of the checkpoint. But since the
     764                 :  * mapping files we log aren't in shared_buffers that interlock doesn't work.
     765                 :  *
     766                 :  * Instead we simply write the mapping files out to disk, *before* the
     767                 :  * XLogInsert() is performed. That guarantees that either the XLogInsert() is
     768                 :  * inserted after the checkpoint's redo pointer or that the checkpoint (via
     769                 :  * CheckPointLogicalRewriteHeap()) has flushed the (partial) mapping file to
     770                 :  * disk. That leaves the tail end that has not yet been flushed open to
     771                 :  * corruption, which is solved by including the current offset in the
     772                 :  * xl_heap_rewrite_mapping records and truncating the mapping file to it
     773                 :  * during replay. Every time a rewrite is finished all generated mapping files
     774                 :  * are synced to disk.
     775                 :  *
     776                 :  * Note that if we were only concerned about crash safety we wouldn't have to
     777                 :  * deal with WAL logging at all - an fsync() at the end of a rewrite would be
     778                 :  * sufficient for crash safety. Any mapping that hasn't been safely flushed to
     779                 :  * disk has to be by an aborted (explicitly or via a crash) transaction and is
     780                 :  * ignored by virtue of the xid in its name being subject to a
     781                 :  * TransactionDidCommit() check. But we want to support having standbys via
     782                 :  * physical replication, both for availability and to do logical decoding
     783                 :  * there.
     784                 :  * ------------------------------------------------------------------------
     785                 :  */
     786                 : 
     787                 : /*
     788                 :  * Do preparations for logging logical mappings during a rewrite if
     789                 :  * necessary. If we detect that we don't need to log anything we'll prevent
     790                 :  * any further action by the various logical rewrite functions.
     791                 :  */
     792 ECB             : static void
     793 GIC         262 : logical_begin_heap_rewrite(RewriteState state)
     794                 : {
     795                 :     HASHCTL     hash_ctl;
     796                 :     TransactionId logical_xmin;
     797                 : 
     798                 :     /*
     799                 :      * We only need to persist these mappings if the rewritten table can be
     800                 :      * accessed during logical decoding, if not, we can skip doing any
     801                 :      * additional work.
     802 ECB             :      */
     803 CBC         262 :     state->rs_logical_rewrite =
     804 GIC         262 :         RelationIsAccessibleInLogicalDecoding(state->rs_old_rel);
     805 ECB             : 
     806 CBC         262 :     if (!state->rs_logical_rewrite)
     807 GIC         242 :         return;
     808 ECB             : 
     809 GIC          21 :     ProcArrayGetReplicationSlotXmin(NULL, &logical_xmin);
     810                 : 
     811                 :     /*
     812                 :      * If there are no logical slots in progress we don't need to do anything,
     813                 :      * there cannot be any remappings for relevant rows yet. The relation's
     814                 :      * lock protects us against races.
     815 ECB             :      */
     816 GIC          21 :     if (logical_xmin == InvalidTransactionId)
     817 ECB             :     {
     818 CBC           1 :         state->rs_logical_rewrite = false;
     819 GIC           1 :         return;
     820                 :     }
     821 ECB             : 
     822 CBC          20 :     state->rs_logical_xmin = logical_xmin;
     823              20 :     state->rs_begin_lsn = GetXLogInsertRecPtr();
     824 GIC          20 :     state->rs_num_rewrite_mappings = 0;
     825 ECB             : 
     826 CBC          20 :     hash_ctl.keysize = sizeof(TransactionId);
     827              20 :     hash_ctl.entrysize = sizeof(RewriteMappingFile);
     828 GIC          20 :     hash_ctl.hcxt = state->rs_cxt;
     829 ECB             : 
     830 CBC          20 :     state->rs_logical_mappings =
     831 GIC          20 :         hash_create("Logical rewrite mapping",
     832                 :                     128,        /* arbitrary initial size */
     833                 :                     &hash_ctl,
     834                 :                     HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     835                 : }
     836                 : 
     837                 : /*
     838                 :  * Flush all logical in-memory mappings to disk, but don't fsync them yet.
     839                 :  */
     840 ECB             : static void
     841 GIC           9 : logical_heap_rewrite_flush_mappings(RewriteState state)
     842                 : {
     843                 :     HASH_SEQ_STATUS seq_status;
     844                 :     RewriteMappingFile *src;
     845                 :     dlist_mutable_iter iter;
     846 ECB             : 
     847 GIC           9 :     Assert(state->rs_logical_rewrite);
     848                 : 
     849 ECB             :     /* no logical rewrite in progress, no need to iterate over mappings */
     850 GBC           9 :     if (state->rs_num_rewrite_mappings == 0)
     851 UIC           0 :         return;
     852 ECB             : 
     853 GIC           9 :     elog(DEBUG1, "flushing %u logical rewrite mapping entries",
     854                 :          state->rs_num_rewrite_mappings);
     855 ECB             : 
     856 CBC           9 :     hash_seq_init(&seq_status, state->rs_logical_mappings);
     857 GIC          98 :     while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
     858                 :     {
     859                 :         char       *waldata;
     860                 :         char       *waldata_start;
     861                 :         xl_heap_rewrite_mapping xlrec;
     862                 :         Oid         dboid;
     863                 :         uint32      len;
     864 ECB             :         int         written;
     865 GNC          89 :         uint32      num_mappings = dclist_count(&src->mappings);
     866                 : 
     867                 :         /* this file hasn't got any new mappings */
     868              89 :         if (num_mappings == 0)
     869 UBC           0 :             continue;
     870                 : 
     871 CBC          89 :         if (state->rs_old_rel->rd_rel->relisshared)
     872 UBC           0 :             dboid = InvalidOid;
     873                 :         else
     874 CBC          89 :             dboid = MyDatabaseId;
     875                 : 
     876 GNC          89 :         xlrec.num_mappings = num_mappings;
     877 CBC          89 :         xlrec.mapped_rel = RelationGetRelid(state->rs_old_rel);
     878              89 :         xlrec.mapped_xid = src->xid;
     879              89 :         xlrec.mapped_db = dboid;
     880              89 :         xlrec.offset = src->off;
     881              89 :         xlrec.start_lsn = state->rs_begin_lsn;
     882                 : 
     883                 :         /* write all mappings consecutively */
     884 GNC          89 :         len = num_mappings * sizeof(LogicalRewriteMappingData);
     885 CBC          89 :         waldata_start = waldata = palloc(len);
     886                 : 
     887                 :         /*
     888                 :          * collect data we need to write out, but don't modify ondisk data yet
     889                 :          */
     890 GNC         748 :         dclist_foreach_modify(iter, &src->mappings)
     891                 :         {
     892                 :             RewriteMappingDataEntry *pmap;
     893                 : 
     894             659 :             pmap = dclist_container(RewriteMappingDataEntry, node, iter.cur);
     895                 : 
     896 CBC         659 :             memcpy(waldata, &pmap->map, sizeof(pmap->map));
     897             659 :             waldata += sizeof(pmap->map);
     898                 : 
     899                 :             /* remove from the list and free */
     900 GNC         659 :             dclist_delete_from(&src->mappings, &pmap->node);
     901 CBC         659 :             pfree(pmap);
     902                 : 
     903                 :             /* update bookkeeping */
     904             659 :             state->rs_num_rewrite_mappings--;
     905                 :         }
     906 ECB             : 
     907 GNC          89 :         Assert(dclist_count(&src->mappings) == 0);
     908 GIC          89 :         Assert(waldata == waldata_start + len);
     909                 : 
     910                 :         /*
     911                 :          * Note that we deviate from the usual WAL coding practices here,
     912                 :          * check the above "Logical rewrite support" comment for reasoning.
     913 ECB             :          */
     914 GIC          89 :         written = FileWrite(src->vfd, waldata_start, len, src->off,
     915 ECB             :                             WAIT_EVENT_LOGICAL_REWRITE_WRITE);
     916 GBC          89 :         if (written != len)
     917 UIC           0 :             ereport(ERROR,
     918                 :                     (errcode_for_file_access(),
     919                 :                      errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
     920 ECB             :                             written, len)));
     921 GIC          89 :         src->off += len;
     922 ECB             : 
     923 CBC          89 :         XLogBeginInsert();
     924              89 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     925 GIC          89 :         XLogRegisterData(waldata_start, len);
     926                 : 
     927 ECB             :         /* write xlog record */
     928 GIC          89 :         XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE);
     929 ECB             : 
     930 GIC          89 :         pfree(waldata_start);
     931 ECB             :     }
     932 GIC           9 :     Assert(state->rs_num_rewrite_mappings == 0);
     933                 : }
     934                 : 
     935                 : /*
     936                 :  * Logical remapping part of end_heap_rewrite().
     937                 :  */
     938 ECB             : static void
     939 GIC         262 : logical_end_heap_rewrite(RewriteState state)
     940                 : {
     941                 :     HASH_SEQ_STATUS seq_status;
     942                 :     RewriteMappingFile *src;
     943                 : 
     944 ECB             :     /* done, no logical rewrite in progress */
     945 CBC         262 :     if (!state->rs_logical_rewrite)
     946 GIC         242 :         return;
     947                 : 
     948 ECB             :     /* writeout remaining in-memory entries */
     949 CBC          20 :     if (state->rs_num_rewrite_mappings > 0)
     950 GIC           9 :         logical_heap_rewrite_flush_mappings(state);
     951                 : 
     952 ECB             :     /* Iterate over all mappings we have written and fsync the files. */
     953 CBC          20 :     hash_seq_init(&seq_status, state->rs_logical_mappings);
     954 GIC         109 :     while ((src = (RewriteMappingFile *) hash_seq_search(&seq_status)) != NULL)
     955 ECB             :     {
     956 GBC          89 :         if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
     957 UIC           0 :             ereport(data_sync_elevel(ERROR),
     958                 :                     (errcode_for_file_access(),
     959 ECB             :                      errmsg("could not fsync file \"%s\": %m", src->path)));
     960 GIC          89 :         FileClose(src->vfd);
     961                 :     }
     962                 :     /* memory context cleanup will deal with the rest */
     963                 : }
     964                 : 
     965                 : /*
     966                 :  * Log a single (old->new) mapping for 'xid'.
     967                 :  */
     968 ECB             : static void
     969 GIC         659 : logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
     970                 :                             LogicalRewriteMappingData *map)
     971                 : {
     972                 :     RewriteMappingFile *src;
     973                 :     RewriteMappingDataEntry *pmap;
     974                 :     Oid         relid;
     975                 :     bool        found;
     976 ECB             : 
     977 GIC         659 :     relid = RelationGetRelid(state->rs_old_rel);
     978                 : 
     979 ECB             :     /* look for existing mappings for this 'mapped' xid */
     980 GIC         659 :     src = hash_search(state->rs_logical_mappings, &xid,
     981                 :                       HASH_ENTER, &found);
     982                 : 
     983                 :     /*
     984                 :      * We haven't yet had the need to map anything for this xid, create
     985                 :      * per-xid data structures.
     986 ECB             :      */
     987 GIC         659 :     if (!found)
     988                 :     {
     989                 :         char        path[MAXPGPATH];
     990                 :         Oid         dboid;
     991 ECB             : 
     992 GBC          89 :         if (state->rs_old_rel->rd_rel->relisshared)
     993 UIC           0 :             dboid = InvalidOid;
     994 ECB             :         else
     995 GIC          89 :             dboid = MyDatabaseId;
     996 ECB             : 
     997 GIC          89 :         snprintf(path, MAXPGPATH,
     998                 :                  "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
     999 ECB             :                  dboid, relid,
    1000 GIC          89 :                  LSN_FORMAT_ARGS(state->rs_begin_lsn),
    1001                 :                  xid, GetCurrentTransactionId());
    1002 ECB             : 
    1003 GNC          89 :         dclist_init(&src->mappings);
    1004 CBC          89 :         src->off = 0;
    1005 GIC          89 :         memcpy(src->path, path, sizeof(path));
    1006 CBC          89 :         src->vfd = PathNameOpenFile(path,
    1007 EUB             :                                     O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1008 GIC          89 :         if (src->vfd < 0)
    1009 UIC           0 :             ereport(ERROR,
    1010                 :                     (errcode_for_file_access(),
    1011                 :                      errmsg("could not create file \"%s\": %m", path)));
    1012 ECB             :     }
    1013                 : 
    1014 CBC         659 :     pmap = MemoryContextAlloc(state->rs_cxt,
    1015 ECB             :                               sizeof(RewriteMappingDataEntry));
    1016 CBC         659 :     memcpy(&pmap->map, map, sizeof(LogicalRewriteMappingData));
    1017 GNC         659 :     dclist_push_tail(&src->mappings, &pmap->node);
    1018 GIC         659 :     state->rs_num_rewrite_mappings++;
    1019                 : 
    1020                 :     /*
    1021 ECB             :      * Write out buffer every time we've too many in-memory entries across all
    1022 EUB             :      * mapping files.
    1023 ECB             :      */
    1024 GIC         659 :     if (state->rs_num_rewrite_mappings >= 1000 /* arbitrary number */ )
    1025 UIC           0 :         logical_heap_rewrite_flush_mappings(state);
    1026 GIC         659 : }
    1027                 : 
    1028                 : /*
    1029                 :  * Perform logical remapping for a tuple that's mapped from old_tid to
    1030 ECB             :  * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
    1031                 :  */
    1032                 : static void
    1033 CBC      384627 : logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
    1034 ECB             :                            HeapTuple new_tuple)
    1035                 : {
    1036 GIC      384627 :     ItemPointerData new_tid = new_tuple->t_self;
    1037 CBC      384627 :     TransactionId cutoff = state->rs_logical_xmin;
    1038 ECB             :     TransactionId xmin;
    1039                 :     TransactionId xmax;
    1040 GIC      384627 :     bool        do_log_xmin = false;
    1041          384627 :     bool        do_log_xmax = false;
    1042 ECB             :     LogicalRewriteMappingData map;
    1043                 : 
    1044                 :     /* no logical rewrite in progress, we don't need to log anything */
    1045 CBC      384627 :     if (!state->rs_logical_rewrite)
    1046 GIC      383983 :         return;
    1047 ECB             : 
    1048 GIC       25907 :     xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
    1049                 :     /* use *GetUpdateXid to correctly deal with multixacts */
    1050           25907 :     xmax = HeapTupleHeaderGetUpdateXid(new_tuple->t_data);
    1051                 : 
    1052 ECB             :     /*
    1053                 :      * Log the mapping iff the tuple has been created recently.
    1054                 :      */
    1055 CBC       25907 :     if (TransactionIdIsNormal(xmin) && !TransactionIdPrecedes(xmin, cutoff))
    1056 GIC         485 :         do_log_xmin = true;
    1057                 : 
    1058           25907 :     if (!TransactionIdIsNormal(xmax))
    1059                 :     {
    1060                 :         /*
    1061                 :          * no xmax is set, can't have any permanent ones, so this check is
    1062 ECB             :          * sufficient
    1063                 :          */
    1064                 :     }
    1065 GIC         452 :     else if (HEAP_XMAX_IS_LOCKED_ONLY(new_tuple->t_data->t_infomask))
    1066 ECB             :     {
    1067                 :         /* only locked, we don't care */
    1068                 :     }
    1069 CBC         452 :     else if (!TransactionIdPrecedes(xmax, cutoff))
    1070                 :     {
    1071                 :         /* tuple has been deleted recently, log */
    1072 GIC         452 :         do_log_xmax = true;
    1073 ECB             :     }
    1074                 : 
    1075                 :     /* if neither needs to be logged, we're done */
    1076 GIC       25907 :     if (!do_log_xmin && !do_log_xmax)
    1077 CBC       25263 :         return;
    1078 ECB             : 
    1079                 :     /* fill out mapping information */
    1080 GNC         644 :     map.old_locator = state->rs_old_rel->rd_locator;
    1081 GIC         644 :     map.old_tid = old_tid;
    1082 GNC         644 :     map.new_locator = state->rs_new_rel->rd_locator;
    1083 GIC         644 :     map.new_tid = new_tid;
    1084                 : 
    1085                 :     /* ---
    1086                 :      * Now persist the mapping for the individual xids that are affected. We
    1087                 :      * need to log for both xmin and xmax if they aren't the same transaction
    1088                 :      * since the mapping files are per "affected" xid.
    1089                 :      * We don't muster all that much effort detecting whether xmin and xmax
    1090                 :      * are actually the same transaction, we just check whether the xid is the
    1091                 :      * same disregarding subtransactions. Logging too much is relatively
    1092                 :      * harmless and we could never do the check fully since subtransaction
    1093 ECB             :      * data is thrown away during restarts.
    1094                 :      * ---
    1095                 :      */
    1096 CBC         644 :     if (do_log_xmin)
    1097             485 :         logical_rewrite_log_mapping(state, xmin, &map);
    1098                 :     /* separately log mapping for xmax unless it'd be redundant */
    1099 GIC         644 :     if (do_log_xmax && !TransactionIdEquals(xmin, xmax))
    1100             174 :         logical_rewrite_log_mapping(state, xmax, &map);
    1101                 : }
    1102                 : 
    1103                 : /*
    1104 EUB             :  * Replay XLOG_HEAP2_REWRITE records
    1105                 :  */
    1106                 : void
    1107 UIC           0 : heap_xlog_logical_rewrite(XLogReaderState *r)
    1108                 : {
    1109                 :     char        path[MAXPGPATH];
    1110                 :     int         fd;
    1111                 :     xl_heap_rewrite_mapping *xlrec;
    1112 EUB             :     uint32      len;
    1113                 :     char       *data;
    1114                 : 
    1115 UIC           0 :     xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
    1116                 : 
    1117 UBC           0 :     snprintf(path, MAXPGPATH,
    1118 EUB             :              "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
    1119                 :              xlrec->mapped_db, xlrec->mapped_rel,
    1120 UBC           0 :              LSN_FORMAT_ARGS(xlrec->start_lsn),
    1121 UIC           0 :              xlrec->mapped_xid, XLogRecGetXid(r));
    1122 EUB             : 
    1123 UBC           0 :     fd = OpenTransientFile(path,
    1124                 :                            O_CREAT | O_WRONLY | PG_BINARY);
    1125 UIC           0 :     if (fd < 0)
    1126               0 :         ereport(ERROR,
    1127                 :                 (errcode_for_file_access(),
    1128                 :                  errmsg("could not create file \"%s\": %m", path)));
    1129                 : 
    1130                 :     /*
    1131 EUB             :      * Truncate all data that's not guaranteed to have been safely fsynced (by
    1132                 :      * previous record or by the last checkpoint).
    1133                 :      */
    1134 UIC           0 :     pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE);
    1135               0 :     if (ftruncate(fd, xlrec->offset) != 0)
    1136               0 :         ereport(ERROR,
    1137 EUB             :                 (errcode_for_file_access(),
    1138                 :                  errmsg("could not truncate file \"%s\" to %u: %m",
    1139                 :                         path, (uint32) xlrec->offset)));
    1140 UIC           0 :     pgstat_report_wait_end();
    1141 EUB             : 
    1142 UIC           0 :     data = XLogRecGetData(r) + sizeof(*xlrec);
    1143                 : 
    1144 UBC           0 :     len = xlrec->num_mappings * sizeof(LogicalRewriteMappingData);
    1145 EUB             : 
    1146                 :     /* write out tail end of mapping file (again) */
    1147 UIC           0 :     errno = 0;
    1148               0 :     pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE);
    1149 UBC           0 :     if (pg_pwrite(fd, data, len, xlrec->offset) != len)
    1150 EUB             :     {
    1151                 :         /* if write didn't set errno, assume problem is no disk space */
    1152 UIC           0 :         if (errno == 0)
    1153               0 :             errno = ENOSPC;
    1154               0 :         ereport(ERROR,
    1155 EUB             :                 (errcode_for_file_access(),
    1156                 :                  errmsg("could not write to file \"%s\": %m", path)));
    1157                 :     }
    1158 UIC           0 :     pgstat_report_wait_end();
    1159                 : 
    1160                 :     /*
    1161                 :      * Now fsync all previously written data. We could improve things and only
    1162 EUB             :      * do this for the last write to a file, but the required bookkeeping
    1163                 :      * doesn't seem worth the trouble.
    1164                 :      */
    1165 UIC           0 :     pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC);
    1166               0 :     if (pg_fsync(fd) != 0)
    1167 UBC           0 :         ereport(data_sync_elevel(ERROR),
    1168                 :                 (errcode_for_file_access(),
    1169 EUB             :                  errmsg("could not fsync file \"%s\": %m", path)));
    1170 UBC           0 :     pgstat_report_wait_end();
    1171                 : 
    1172 UIC           0 :     if (CloseTransientFile(fd) != 0)
    1173 UBC           0 :         ereport(ERROR,
    1174                 :                 (errcode_for_file_access(),
    1175                 :                  errmsg("could not close file \"%s\": %m", path)));
    1176 UIC           0 : }
    1177                 : 
    1178                 : /* ---
    1179                 :  * Perform a checkpoint for logical rewrite mappings
    1180                 :  *
    1181                 :  * This serves two tasks:
    1182                 :  * 1) Remove all mappings not needed anymore based on the logical restart LSN
    1183                 :  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
    1184                 :  *    only has to deal with the parts of a mapping that have been written out
    1185                 :  *    after the checkpoint started.
    1186 ECB             :  * ---
    1187                 :  */
    1188                 : void
    1189 GIC        2363 : CheckPointLogicalRewriteHeap(void)
    1190                 : {
    1191                 :     XLogRecPtr  cutoff;
    1192                 :     XLogRecPtr  redo;
    1193                 :     DIR        *mappings_dir;
    1194                 :     struct dirent *mapping_de;
    1195                 :     char        path[MAXPGPATH + 20];
    1196                 : 
    1197                 :     /*
    1198 ECB             :      * We start of with a minimum of the last redo pointer. No new decoding
    1199                 :      * slot will start before that, so that's a safe upper bound for removal.
    1200                 :      */
    1201 CBC        2363 :     redo = GetRedoRecPtr();
    1202                 : 
    1203                 :     /* now check for the restart ptrs from existing slots */
    1204            2363 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    1205 EUB             : 
    1206                 :     /* don't start earlier than the restart lsn */
    1207 CBC        2363 :     if (cutoff != InvalidXLogRecPtr && redo < cutoff)
    1208 LBC           0 :         cutoff = redo;
    1209                 : 
    1210 GIC        2363 :     mappings_dir = AllocateDir("pg_logical/mappings");
    1211            7267 :     while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
    1212                 :     {
    1213                 :         Oid         dboid;
    1214                 :         Oid         relid;
    1215                 :         XLogRecPtr  lsn;
    1216                 :         TransactionId rewrite_xid;
    1217                 :         TransactionId create_xid;
    1218 ECB             :         uint32      hi,
    1219                 :                     lo;
    1220                 :         PGFileType  de_type;
    1221                 : 
    1222 GIC        4904 :         if (strcmp(mapping_de->d_name, ".") == 0 ||
    1223 CBC        2541 :             strcmp(mapping_de->d_name, "..") == 0)
    1224            4726 :             continue;
    1225                 : 
    1226             178 :         snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
    1227 GNC         178 :         de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
    1228                 : 
    1229             178 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    1230 UIC           0 :             continue;
    1231                 : 
    1232 ECB             :         /* Skip over files that cannot be ours. */
    1233 GBC         178 :         if (strncmp(mapping_de->d_name, "map-", 4) != 0)
    1234 UIC           0 :             continue;
    1235 ECB             : 
    1236 GIC         178 :         if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
    1237 EUB             :                    &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
    1238 UIC           0 :             elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
    1239 ECB             : 
    1240 GIC         178 :         lsn = ((uint64) hi) << 32 | lo;
    1241 ECB             : 
    1242 GIC         178 :         if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
    1243 ECB             :         {
    1244 CBC          89 :             elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
    1245 GBC          89 :             if (unlink(path) < 0)
    1246 UIC           0 :                 ereport(ERROR,
    1247                 :                         (errcode_for_file_access(),
    1248                 :                          errmsg("could not remove file \"%s\": %m", path)));
    1249                 :         }
    1250                 :         else
    1251                 :         {
    1252 ECB             :             /* on some operating systems fsyncing a file requires O_RDWR */
    1253 GIC          89 :             int         fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    1254                 : 
    1255                 :             /*
    1256                 :              * The file cannot vanish due to concurrency since this function
    1257                 :              * is the only one removing logical mappings and only one
    1258                 :              * checkpoint can be in progress at a time.
    1259 ECB             :              */
    1260 GBC          89 :             if (fd < 0)
    1261 UIC           0 :                 ereport(ERROR,
    1262                 :                         (errcode_for_file_access(),
    1263                 :                          errmsg("could not open file \"%s\": %m", path)));
    1264                 : 
    1265                 :             /*
    1266                 :              * We could try to avoid fsyncing files that either haven't
    1267                 :              * changed or have only been created since the checkpoint's start,
    1268                 :              * but it's currently not deemed worth the effort.
    1269 ECB             :              */
    1270 CBC          89 :             pgstat_report_wait_start(WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC);
    1271 GBC          89 :             if (pg_fsync(fd) != 0)
    1272 UIC           0 :                 ereport(data_sync_elevel(ERROR),
    1273                 :                         (errcode_for_file_access(),
    1274 ECB             :                          errmsg("could not fsync file \"%s\": %m", path)));
    1275 GIC          89 :             pgstat_report_wait_end();
    1276 ECB             : 
    1277 GBC          89 :             if (CloseTransientFile(fd) != 0)
    1278 UIC           0 :                 ereport(ERROR,
    1279                 :                         (errcode_for_file_access(),
    1280                 :                          errmsg("could not close file \"%s\": %m", path)));
    1281                 :         }
    1282 ECB             :     }
    1283 GIC        2363 :     FreeDir(mappings_dir);
    1284                 : 
    1285 ECB             :     /* persist directory entries to disk */
    1286 CBC        2363 :     fsync_fname("pg_logical/mappings", true);
    1287 GIC        2363 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a