LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - reorderbuffer.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 93.1 % 1662 1548 2 40 63 9 28 1008 78 434 74 1045 3 29
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 87 87 84 3 87
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * reorderbuffer.c
       4                 :  *    PostgreSQL logical replay/reorder buffer management
       5                 :  *
       6                 :  *
       7                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/replication/logical/reorderbuffer.c
      12                 :  *
      13                 :  * NOTES
      14                 :  *    This module gets handed individual pieces of transactions in the order
      15                 :  *    they are written to the WAL and is responsible to reassemble them into
      16                 :  *    toplevel transaction sized pieces. When a transaction is completely
      17                 :  *    reassembled - signaled by reading the transaction commit record - it
      18                 :  *    will then call the output plugin (cf. ReorderBufferCommit()) with the
      19                 :  *    individual changes. The output plugins rely on snapshots built by
      20                 :  *    snapbuild.c which hands them to us.
      21                 :  *
      22                 :  *    Transactions and subtransactions/savepoints in postgres are not
      23                 :  *    immediately linked to each other from outside the performing
      24                 :  *    backend. Only at commit/abort (or special xact_assignment records) they
      25                 :  *    are linked together. Which means that we will have to splice together a
      26                 :  *    toplevel transaction from its subtransactions. To do that efficiently we
      27                 :  *    build a binary heap indexed by the smallest current lsn of the individual
      28                 :  *    subtransactions' changestreams. As the individual streams are inherently
      29                 :  *    ordered by LSN - since that is where we build them from - the transaction
      30                 :  *    can easily be reassembled by always using the subtransaction with the
      31                 :  *    smallest current LSN from the heap.
      32                 :  *
      33                 :  *    In order to cope with large transactions - which can be several times as
      34                 :  *    big as the available memory - this module supports spooling the contents
      35                 :  *    of a large transactions to disk. When the transaction is replayed the
      36                 :  *    contents of individual (sub-)transactions will be read from disk in
      37                 :  *    chunks.
      38                 :  *
      39                 :  *    This module also has to deal with reassembling toast records from the
      40                 :  *    individual chunks stored in WAL. When a new (or initial) version of a
      41                 :  *    tuple is stored in WAL it will always be preceded by the toast chunks
      42                 :  *    emitted for the columns stored out of line. Within a single toplevel
      43                 :  *    transaction there will be no other data carrying records between a row's
      44                 :  *    toast chunks and the row data itself. See ReorderBufferToast* for
      45                 :  *    details.
      46                 :  *
      47                 :  *    ReorderBuffer uses two special memory context types - SlabContext for
      48                 :  *    allocations of fixed-length structures (changes and transactions), and
      49                 :  *    GenerationContext for the variable-length transaction data (allocated
      50                 :  *    and freed in groups with similar lifespans).
      51                 :  *
      52                 :  *    To limit the amount of memory used by decoded changes, we track memory
      53                 :  *    used at the reorder buffer level (i.e. total amount of memory), and for
      54                 :  *    each transaction. When the total amount of used memory exceeds the
      55                 :  *    limit, the transaction consuming the most memory is then serialized to
      56                 :  *    disk.
      57                 :  *
      58                 :  *    Only decoded changes are evicted from memory (spilled to disk), not the
      59                 :  *    transaction records. The number of toplevel transactions is limited,
      60                 :  *    but a transaction with many subtransactions may still consume significant
      61                 :  *    amounts of memory. However, the transaction records are fairly small and
      62                 :  *    are not included in the memory limit.
      63                 :  *
      64                 :  *    The current eviction algorithm is very simple - the transaction is
      65                 :  *    picked merely by size, while it might be useful to also consider age
      66                 :  *    (LSN) of the changes for example. With the new Generational memory
      67                 :  *    allocator, evicting the oldest changes would make it more likely the
      68                 :  *    memory gets actually freed.
      69                 :  *
      70                 :  *    We still rely on max_changes_in_memory when loading serialized changes
      71                 :  *    back into memory. At that point we can't use the memory limit directly
      72                 :  *    as we load the subxacts independently. One option to deal with this
      73                 :  *    would be to count the subxacts, and allow each to allocate 1/N of the
      74                 :  *    memory limit. That however does not seem very appealing, because with
      75                 :  *    many subtransactions it may easily cause thrashing (short cycles of
      76                 :  *    deserializing and applying very few changes). We probably should give
      77                 :  *    a bit more memory to the oldest subtransactions, because it's likely
      78                 :  *    they are the source for the next sequence of changes.
      79                 :  *
      80                 :  * -------------------------------------------------------------------------
      81                 :  */
      82                 : #include "postgres.h"
      83                 : 
      84                 : #include <unistd.h>
      85                 : #include <sys/stat.h>
      86                 : 
      87                 : #include "access/detoast.h"
      88                 : #include "access/heapam.h"
      89                 : #include "access/rewriteheap.h"
      90                 : #include "access/transam.h"
      91                 : #include "access/xact.h"
      92                 : #include "access/xlog_internal.h"
      93                 : #include "catalog/catalog.h"
      94                 : #include "lib/binaryheap.h"
      95                 : #include "miscadmin.h"
      96                 : #include "pgstat.h"
      97                 : #include "replication/logical.h"
      98                 : #include "replication/reorderbuffer.h"
      99                 : #include "replication/slot.h"
     100                 : #include "replication/snapbuild.h"    /* just for SnapBuildSnapDecRefcount */
     101                 : #include "storage/bufmgr.h"
     102                 : #include "storage/fd.h"
     103                 : #include "storage/sinval.h"
     104                 : #include "utils/builtins.h"
     105                 : #include "utils/combocid.h"
     106                 : #include "utils/memdebug.h"
     107                 : #include "utils/memutils.h"
     108                 : #include "utils/rel.h"
     109                 : #include "utils/relfilenumbermap.h"
     110                 : 
     111                 : 
     112                 : /* entry for a hash table we use to map from xid to our transaction state */
     113                 : typedef struct ReorderBufferTXNByIdEnt
     114                 : {
     115                 :     TransactionId xid;
     116                 :     ReorderBufferTXN *txn;
     117                 : } ReorderBufferTXNByIdEnt;
     118                 : 
     119                 : /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
     120                 : typedef struct ReorderBufferTupleCidKey
     121                 : {
     122                 :     RelFileLocator rlocator;
     123                 :     ItemPointerData tid;
     124                 : } ReorderBufferTupleCidKey;
     125                 : 
     126                 : typedef struct ReorderBufferTupleCidEnt
     127                 : {
     128                 :     ReorderBufferTupleCidKey key;
     129                 :     CommandId   cmin;
     130                 :     CommandId   cmax;
     131                 :     CommandId   combocid;       /* just for debugging */
     132                 : } ReorderBufferTupleCidEnt;
     133                 : 
     134                 : /* Virtual file descriptor with file offset tracking */
     135                 : typedef struct TXNEntryFile
     136                 : {
     137                 :     File        vfd;            /* -1 when the file is closed */
     138                 :     off_t       curOffset;      /* offset for next write or read. Reset to 0
     139                 :                                  * when vfd is opened. */
     140                 : } TXNEntryFile;
     141                 : 
     142                 : /* k-way in-order change iteration support structures */
     143                 : typedef struct ReorderBufferIterTXNEntry
     144                 : {
     145                 :     XLogRecPtr  lsn;
     146                 :     ReorderBufferChange *change;
     147                 :     ReorderBufferTXN *txn;
     148                 :     TXNEntryFile file;
     149                 :     XLogSegNo   segno;
     150                 : } ReorderBufferIterTXNEntry;
     151                 : 
     152                 : typedef struct ReorderBufferIterTXNState
     153                 : {
     154                 :     binaryheap *heap;
     155                 :     Size        nr_txns;
     156                 :     dlist_head  old_change;
     157                 :     ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
     158                 : } ReorderBufferIterTXNState;
     159                 : 
     160                 : /* toast datastructures */
     161                 : typedef struct ReorderBufferToastEnt
     162                 : {
     163                 :     Oid         chunk_id;       /* toast_table.chunk_id */
     164                 :     int32       last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
     165                 :                                  * have seen */
     166                 :     Size        num_chunks;     /* number of chunks we've already seen */
     167                 :     Size        size;           /* combined size of chunks seen */
     168                 :     dlist_head  chunks;         /* linked list of chunks */
     169                 :     struct varlena *reconstructed;  /* reconstructed varlena now pointed to in
     170                 :                                      * main tup */
     171                 : } ReorderBufferToastEnt;
     172                 : 
     173                 : /* Disk serialization support datastructures */
     174                 : typedef struct ReorderBufferDiskChange
     175                 : {
     176                 :     Size        size;
     177                 :     ReorderBufferChange change;
     178                 :     /* data follows */
     179                 : } ReorderBufferDiskChange;
     180                 : 
     181                 : #define IsSpecInsert(action) \
     182                 : ( \
     183                 :     ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
     184                 : )
     185                 : #define IsSpecConfirmOrAbort(action) \
     186                 : ( \
     187                 :     (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
     188                 :     ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
     189                 : )
     190                 : #define IsInsertOrUpdate(action) \
     191                 : ( \
     192                 :     (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
     193                 :     ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
     194                 :     ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
     195                 : )
     196                 : 
     197                 : /*
     198                 :  * Maximum number of changes kept in memory, per transaction. After that,
     199                 :  * changes are spooled to disk.
     200                 :  *
     201                 :  * The current value should be sufficient to decode the entire transaction
     202                 :  * without hitting disk in OLTP workloads, while starting to spool to disk in
     203                 :  * other workloads reasonably fast.
     204                 :  *
     205                 :  * At some point in the future it probably makes sense to have a more elaborate
     206                 :  * resource management here, but it's not entirely clear what that would look
     207                 :  * like.
     208                 :  */
     209                 : int         logical_decoding_work_mem;
     210                 : static const Size max_changes_in_memory = 4096; /* XXX for restore only */
     211                 : 
     212                 : /* GUC variable */
     213                 : int         logical_replication_mode = LOGICAL_REP_MODE_BUFFERED;
     214                 : 
     215                 : /* ---------------------------------------
     216                 :  * primary reorderbuffer support routines
     217                 :  * ---------------------------------------
     218                 :  */
     219                 : static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
     220                 : static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
     221                 : static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
     222                 :                                                TransactionId xid, bool create, bool *is_new,
     223                 :                                                XLogRecPtr lsn, bool create_as_top);
     224                 : static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
     225                 :                                               ReorderBufferTXN *subtxn);
     226                 : 
     227                 : static void AssertTXNLsnOrder(ReorderBuffer *rb);
     228                 : 
     229                 : /* ---------------------------------------
     230                 :  * support functions for lsn-order iterating over the ->changes of a
     231                 :  * transaction and its subtransactions
     232                 :  *
     233                 :  * used for iteration over the k-way heap merge of a transaction and its
     234                 :  * subtransactions
     235                 :  * ---------------------------------------
     236                 :  */
     237                 : static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
     238                 :                                      ReorderBufferIterTXNState *volatile *iter_state);
     239                 : static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
     240                 : static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
     241                 :                                        ReorderBufferIterTXNState *state);
     242                 : static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
     243                 : 
     244                 : /*
     245                 :  * ---------------------------------------
     246                 :  * Disk serialization support functions
     247                 :  * ---------------------------------------
     248                 :  */
     249                 : static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb);
     250                 : static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
     251                 : static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
     252                 :                                          int fd, ReorderBufferChange *change);
     253                 : static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
     254                 :                                         TXNEntryFile *file, XLogSegNo *segno);
     255                 : static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
     256                 :                                        char *data);
     257                 : static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
     258                 : static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
     259                 :                                      bool txn_prepared);
     260                 : static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
     261                 : static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
     262                 :                                         TransactionId xid, XLogSegNo segno);
     263                 : 
     264                 : static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
     265                 : static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
     266                 :                                       ReorderBufferTXN *txn, CommandId cid);
     267                 : 
     268                 : /*
     269                 :  * ---------------------------------------
     270                 :  * Streaming support functions
     271                 :  * ---------------------------------------
     272                 :  */
     273                 : static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
     274                 : static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
     275                 : static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
     276                 : static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
     277                 : 
     278                 : /* ---------------------------------------
     279                 :  * toast reassembly support
     280                 :  * ---------------------------------------
     281                 :  */
     282                 : static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
     283                 : static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
     284                 : static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
     285                 :                                       Relation relation, ReorderBufferChange *change);
     286                 : static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
     287                 :                                           Relation relation, ReorderBufferChange *change);
     288                 : 
     289                 : /*
     290                 :  * ---------------------------------------
     291                 :  * memory accounting
     292                 :  * ---------------------------------------
     293                 :  */
     294                 : static Size ReorderBufferChangeSize(ReorderBufferChange *change);
     295                 : static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
     296                 :                                             ReorderBufferChange *change,
     297                 :                                             bool addition, Size sz);
     298                 : 
     299                 : /*
     300                 :  * Allocate a new ReorderBuffer and clean out any old serialized state from
     301                 :  * prior ReorderBuffer instances for the same slot.
     302                 :  */
     303                 : ReorderBuffer *
     304 GIC         821 : ReorderBufferAllocate(void)
     305                 : {
     306                 :     ReorderBuffer *buffer;
     307 ECB             :     HASHCTL     hash_ctl;
     308                 :     MemoryContext new_ctx;
     309                 : 
     310 GIC         821 :     Assert(MyReplicationSlot != NULL);
     311                 : 
     312                 :     /* allocate memory in own context, to have better accountability */
     313 CBC         821 :     new_ctx = AllocSetContextCreate(CurrentMemoryContext,
     314                 :                                     "ReorderBuffer",
     315                 :                                     ALLOCSET_DEFAULT_SIZES);
     316 ECB             : 
     317                 :     buffer =
     318 GIC         821 :         (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
     319                 : 
     320             821 :     memset(&hash_ctl, 0, sizeof(hash_ctl));
     321 ECB             : 
     322 GIC         821 :     buffer->context = new_ctx;
     323 ECB             : 
     324 GIC         821 :     buffer->change_context = SlabContextCreate(new_ctx,
     325 ECB             :                                                "Change",
     326                 :                                                SLAB_DEFAULT_BLOCK_SIZE,
     327                 :                                                sizeof(ReorderBufferChange));
     328                 : 
     329 GIC         821 :     buffer->txn_context = SlabContextCreate(new_ctx,
     330                 :                                             "TXN",
     331                 :                                             SLAB_DEFAULT_BLOCK_SIZE,
     332 ECB             :                                             sizeof(ReorderBufferTXN));
     333                 : 
     334                 :     /*
     335                 :      * XXX the allocation sizes used below pre-date generation context's block
     336                 :      * growing code.  These values should likely be benchmarked and set to
     337                 :      * more suitable values.
     338                 :      */
     339 GIC         821 :     buffer->tup_context = GenerationContextCreate(new_ctx,
     340                 :                                                   "Tuples",
     341                 :                                                   SLAB_LARGE_BLOCK_SIZE,
     342 ECB             :                                                   SLAB_LARGE_BLOCK_SIZE,
     343                 :                                                   SLAB_LARGE_BLOCK_SIZE);
     344                 : 
     345 GIC         821 :     hash_ctl.keysize = sizeof(TransactionId);
     346             821 :     hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
     347             821 :     hash_ctl.hcxt = buffer->context;
     348 ECB             : 
     349 CBC         821 :     buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
     350 ECB             :                                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     351                 : 
     352 CBC         821 :     buffer->by_txn_last_xid = InvalidTransactionId;
     353 GIC         821 :     buffer->by_txn_last_txn = NULL;
     354                 : 
     355 CBC         821 :     buffer->outbuf = NULL;
     356             821 :     buffer->outbufsize = 0;
     357 GIC         821 :     buffer->size = 0;
     358 ECB             : 
     359 CBC         821 :     buffer->spillTxns = 0;
     360             821 :     buffer->spillCount = 0;
     361 GIC         821 :     buffer->spillBytes = 0;
     362 CBC         821 :     buffer->streamTxns = 0;
     363             821 :     buffer->streamCount = 0;
     364             821 :     buffer->streamBytes = 0;
     365             821 :     buffer->totalTxns = 0;
     366             821 :     buffer->totalBytes = 0;
     367 ECB             : 
     368 CBC         821 :     buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
     369 ECB             : 
     370 GIC         821 :     dlist_init(&buffer->toplevel_by_lsn);
     371 CBC         821 :     dlist_init(&buffer->txns_by_base_snapshot_lsn);
     372 GNC         821 :     dclist_init(&buffer->catchange_txns);
     373                 : 
     374 ECB             :     /*
     375                 :      * Ensure there's no stale data from prior uses of this slot, in case some
     376                 :      * prior exit avoided calling ReorderBufferFree. Failure to do this can
     377                 :      * produce duplicated txns, and it's very cheap if there's nothing there.
     378                 :      */
     379 GIC         821 :     ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
     380                 : 
     381             821 :     return buffer;
     382                 : }
     383 ECB             : 
     384                 : /*
     385                 :  * Free a ReorderBuffer
     386                 :  */
     387                 : void
     388 GIC         679 : ReorderBufferFree(ReorderBuffer *rb)
     389                 : {
     390             679 :     MemoryContext context = rb->context;
     391                 : 
     392 ECB             :     /*
     393                 :      * We free separately allocated data by entirely scrapping reorderbuffer's
     394                 :      * memory context.
     395                 :      */
     396 GIC         679 :     MemoryContextDelete(context);
     397                 : 
     398                 :     /* Free disk space used by unconsumed reorder buffers */
     399             679 :     ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
     400 CBC         679 : }
     401                 : 
     402                 : /*
     403 ECB             :  * Get an unused, possibly preallocated, ReorderBufferTXN.
     404                 :  */
     405                 : static ReorderBufferTXN *
     406 GIC        3310 : ReorderBufferGetTXN(ReorderBuffer *rb)
     407                 : {
     408                 :     ReorderBufferTXN *txn;
     409                 : 
     410 ECB             :     txn = (ReorderBufferTXN *)
     411 GIC        3310 :         MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
     412                 : 
     413            3310 :     memset(txn, 0, sizeof(ReorderBufferTXN));
     414                 : 
     415 CBC        3310 :     dlist_init(&txn->changes);
     416 GIC        3310 :     dlist_init(&txn->tuplecids);
     417 CBC        3310 :     dlist_init(&txn->subtxns);
     418                 : 
     419 ECB             :     /* InvalidCommandId is not zero, so set it explicitly */
     420 CBC        3310 :     txn->command_id = InvalidCommandId;
     421            3310 :     txn->output_plugin_private = NULL;
     422                 : 
     423 GIC        3310 :     return txn;
     424 ECB             : }
     425                 : 
     426                 : /*
     427                 :  * Free a ReorderBufferTXN.
     428                 :  */
     429                 : static void
     430 GIC        3262 : ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
     431                 : {
     432                 :     /* clean the lookup cache if we were cached (quite likely) */
     433            3262 :     if (rb->by_txn_last_xid == txn->xid)
     434 ECB             :     {
     435 GIC        3073 :         rb->by_txn_last_xid = InvalidTransactionId;
     436            3073 :         rb->by_txn_last_txn = NULL;
     437 ECB             :     }
     438                 : 
     439                 :     /* free data that's contained */
     440                 : 
     441 GIC        3262 :     if (txn->gid != NULL)
     442                 :     {
     443              38 :         pfree(txn->gid);
     444              38 :         txn->gid = NULL;
     445 ECB             :     }
     446                 : 
     447 CBC        3262 :     if (txn->tuplecid_hash != NULL)
     448 ECB             :     {
     449 GIC         432 :         hash_destroy(txn->tuplecid_hash);
     450             432 :         txn->tuplecid_hash = NULL;
     451 ECB             :     }
     452                 : 
     453 CBC        3262 :     if (txn->invalidations)
     454 ECB             :     {
     455 GIC         982 :         pfree(txn->invalidations);
     456             982 :         txn->invalidations = NULL;
     457 ECB             :     }
     458                 : 
     459                 :     /* Reset the toast hash */
     460 CBC        3262 :     ReorderBufferToastReset(rb, txn);
     461                 : 
     462 GIC        3262 :     pfree(txn);
     463            3262 : }
     464 ECB             : 
     465                 : /*
     466                 :  * Get a fresh ReorderBufferChange.
     467                 :  */
     468                 : ReorderBufferChange *
     469 GIC     1915007 : ReorderBufferGetChange(ReorderBuffer *rb)
     470                 : {
     471                 :     ReorderBufferChange *change;
     472                 : 
     473 ECB             :     change = (ReorderBufferChange *)
     474 GIC     1915007 :         MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
     475                 : 
     476         1915007 :     memset(change, 0, sizeof(ReorderBufferChange));
     477         1915007 :     return change;
     478 ECB             : }
     479                 : 
     480                 : /*
     481                 :  * Free a ReorderBufferChange and update memory accounting, if requested.
     482                 :  */
     483                 : void
     484 GIC     1914746 : ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
     485                 :                           bool upd_mem)
     486                 : {
     487                 :     /* update memory accounting info */
     488 CBC     1914746 :     if (upd_mem)
     489 GIC     1914678 :         ReorderBufferChangeMemoryUpdate(rb, change, false,
     490                 :                                         ReorderBufferChangeSize(change));
     491                 : 
     492 ECB             :     /* free contained data */
     493 CBC     1914746 :     switch (change->action)
     494                 :     {
     495 GIC     1843893 :         case REORDER_BUFFER_CHANGE_INSERT:
     496                 :         case REORDER_BUFFER_CHANGE_UPDATE:
     497 ECB             :         case REORDER_BUFFER_CHANGE_DELETE:
     498                 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
     499 CBC     1843893 :             if (change->data.tp.newtuple)
     500                 :             {
     501 GIC     1564706 :                 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
     502         1564706 :                 change->data.tp.newtuple = NULL;
     503 ECB             :             }
     504                 : 
     505 CBC     1843893 :             if (change->data.tp.oldtuple)
     506 ECB             :             {
     507 GIC      210918 :                 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
     508          210918 :                 change->data.tp.oldtuple = NULL;
     509 ECB             :             }
     510 GIC     1843893 :             break;
     511 CBC          39 :         case REORDER_BUFFER_CHANGE_MESSAGE:
     512              39 :             if (change->data.msg.prefix != NULL)
     513 GIC          39 :                 pfree(change->data.msg.prefix);
     514 CBC          39 :             change->data.msg.prefix = NULL;
     515              39 :             if (change->data.msg.message != NULL)
     516              39 :                 pfree(change->data.msg.message);
     517              39 :             change->data.msg.message = NULL;
     518              39 :             break;
     519            4567 :         case REORDER_BUFFER_CHANGE_INVALIDATION:
     520            4567 :             if (change->data.inval.invalidations)
     521            4567 :                 pfree(change->data.inval.invalidations);
     522            4567 :             change->data.inval.invalidations = NULL;
     523            4567 :             break;
     524             998 :         case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
     525             998 :             if (change->data.snapshot)
     526 ECB             :             {
     527 CBC         998 :                 ReorderBufferFreeSnap(rb, change->data.snapshot);
     528             998 :                 change->data.snapshot = NULL;
     529 ECB             :             }
     530 GIC         998 :             break;
     531 ECB             :             /* no data in addition to the struct itself */
     532 CBC          38 :         case REORDER_BUFFER_CHANGE_TRUNCATE:
     533 GIC          38 :             if (change->data.truncate.relids != NULL)
     534 ECB             :             {
     535 GIC          38 :                 ReorderBufferReturnRelids(rb, change->data.truncate.relids);
     536 CBC          38 :                 change->data.truncate.relids = NULL;
     537 ECB             :             }
     538 GIC          38 :             break;
     539 CBC       65211 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
     540 ECB             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
     541                 :         case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
     542                 :         case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
     543 CBC       65211 :             break;
     544                 :     }
     545                 : 
     546 GIC     1914746 :     pfree(change);
     547 CBC     1914746 : }
     548                 : 
     549                 : /*
     550 ECB             :  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
     551                 :  * tuple_len (excluding header overhead).
     552                 :  */
     553                 : ReorderBufferTupleBuf *
     554 GIC     1775664 : ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
     555                 : {
     556                 :     ReorderBufferTupleBuf *tuple;
     557                 :     Size        alloc_len;
     558 ECB             : 
     559 GIC     1775664 :     alloc_len = tuple_len + SizeofHeapTupleHeader;
     560                 : 
     561                 :     tuple = (ReorderBufferTupleBuf *)
     562         1775664 :         MemoryContextAlloc(rb->tup_context,
     563 ECB             :                            sizeof(ReorderBufferTupleBuf) +
     564                 :                            MAXIMUM_ALIGNOF + alloc_len);
     565 GIC     1775664 :     tuple->alloc_tuple_size = alloc_len;
     566 CBC     1775664 :     tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
     567                 : 
     568 GIC     1775664 :     return tuple;
     569 ECB             : }
     570                 : 
     571                 : /*
     572                 :  * Free a ReorderBufferTupleBuf.
     573                 :  */
     574                 : void
     575 GIC     1775624 : ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
     576                 : {
     577         1775624 :     pfree(tuple);
     578         1775624 : }
     579 ECB             : 
     580                 : /*
     581                 :  * Get an array for relids of truncated relations.
     582                 :  *
     583                 :  * We use the global memory context (for the whole reorder buffer), because
     584                 :  * none of the existing ones seems like a good match (some are SLAB, so we
     585                 :  * can't use those, and tup_context is meant for tuple data, not relids). We
     586                 :  * could add yet another context, but it seems like an overkill - TRUNCATE is
     587                 :  * not particularly common operation, so it does not seem worth it.
     588                 :  */
     589                 : Oid *
     590 GIC          42 : ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
     591                 : {
     592                 :     Oid        *relids;
     593                 :     Size        alloc_len;
     594 ECB             : 
     595 GIC          42 :     alloc_len = sizeof(Oid) * nrelids;
     596                 : 
     597              42 :     relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
     598                 : 
     599 CBC          42 :     return relids;
     600                 : }
     601 ECB             : 
     602                 : /*
     603                 :  * Free an array of relids.
     604                 :  */
     605                 : void
     606 GIC          38 : ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
     607                 : {
     608              38 :     pfree(relids);
     609              38 : }
     610 ECB             : 
     611                 : /*
     612                 :  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
     613                 :  * If create is true, and a transaction doesn't already exist, create it
     614                 :  * (with the given LSN, and as top transaction if that's specified);
     615                 :  * when this happens, is_new is set to true.
     616                 :  */
     617                 : static ReorderBufferTXN *
     618 GIC     6433598 : ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
     619                 :                       bool *is_new, XLogRecPtr lsn, bool create_as_top)
     620                 : {
     621                 :     ReorderBufferTXN *txn;
     622 ECB             :     ReorderBufferTXNByIdEnt *ent;
     623                 :     bool        found;
     624                 : 
     625 GIC     6433598 :     Assert(TransactionIdIsValid(xid));
     626                 : 
     627                 :     /*
     628                 :      * Check the one-entry lookup cache first
     629 ECB             :      */
     630 GIC     6433598 :     if (TransactionIdIsValid(rb->by_txn_last_xid) &&
     631         6430494 :         rb->by_txn_last_xid == xid)
     632                 :     {
     633         5440881 :         txn = rb->by_txn_last_txn;
     634 ECB             : 
     635 CBC     5440881 :         if (txn != NULL)
     636                 :         {
     637 ECB             :             /* found it, and it's valid */
     638 GIC     5440869 :             if (is_new)
     639 CBC        2546 :                 *is_new = false;
     640 GIC     5440869 :             return txn;
     641                 :         }
     642 ECB             : 
     643                 :         /*
     644                 :          * cached as non-existent, and asked not to create? Then nothing else
     645                 :          * to do.
     646                 :          */
     647 GIC          12 :         if (!create)
     648               9 :             return NULL;
     649                 :         /* otherwise fall through to create it */
     650                 :     }
     651 ECB             : 
     652                 :     /*
     653                 :      * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
     654                 :      * create an entry.
     655                 :      */
     656                 : 
     657                 :     /* search the lookup table */
     658                 :     ent = (ReorderBufferTXNByIdEnt *)
     659 GIC      992720 :         hash_search(rb->by_txn,
     660                 :                     &xid,
     661                 :                     create ? HASH_ENTER : HASH_FIND,
     662                 :                     &found);
     663 CBC      992720 :     if (found)
     664 GIC      988127 :         txn = ent->txn;
     665            4593 :     else if (create)
     666                 :     {
     667 ECB             :         /* initialize the new entry, if creation was requested */
     668 CBC        3310 :         Assert(ent != NULL);
     669            3310 :         Assert(lsn != InvalidXLogRecPtr);
     670                 : 
     671 GIC        3310 :         ent->txn = ReorderBufferGetTXN(rb);
     672 CBC        3310 :         ent->txn->xid = xid;
     673            3310 :         txn = ent->txn;
     674 GIC        3310 :         txn->first_lsn = lsn;
     675 CBC        3310 :         txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
     676 ECB             : 
     677 CBC        3310 :         if (create_as_top)
     678 ECB             :         {
     679 CBC        2633 :             dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
     680 GIC        2633 :             AssertTXNLsnOrder(rb);
     681 ECB             :         }
     682                 :     }
     683                 :     else
     684 CBC        1283 :         txn = NULL;             /* not found and not asked to create */
     685                 : 
     686                 :     /* update cache */
     687 GIC      992720 :     rb->by_txn_last_xid = xid;
     688 CBC      992720 :     rb->by_txn_last_txn = txn;
     689                 : 
     690 GIC      992720 :     if (is_new)
     691 CBC        1787 :         *is_new = !found;
     692 ECB             : 
     693 GIC      992720 :     Assert(!create || txn != NULL);
     694 CBC      992720 :     return txn;
     695 ECB             : }
     696                 : 
     697                 : /*
     698                 :  * Record the partial change for the streaming of in-progress transactions.  We
     699                 :  * can stream only complete changes so if we have a partial change like toast
     700                 :  * table insert or speculative insert then we mark such a 'txn' so that it
     701                 :  * can't be streamed.  We also ensure that if the changes in such a 'txn' can
     702                 :  * be streamed and are above logical_decoding_work_mem threshold then we stream
     703                 :  * them as soon as we have a complete change.
     704                 :  */
     705                 : static void
     706 GIC     1713655 : ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
     707                 :                                   ReorderBufferChange *change,
     708                 :                                   bool toast_insert)
     709                 : {
     710 ECB             :     ReorderBufferTXN *toptxn;
     711                 : 
     712                 :     /*
     713                 :      * The partial changes need to be processed only while streaming
     714                 :      * in-progress transactions.
     715                 :      */
     716 GIC     1713655 :     if (!ReorderBufferCanStream(rb))
     717         1226598 :         return;
     718                 : 
     719                 :     /* Get the top transaction. */
     720 GNC      487057 :     toptxn = rbtxn_get_toptxn(txn);
     721 ECB             : 
     722                 :     /*
     723                 :      * Indicate a partial change for toast inserts.  The change will be
     724                 :      * considered as complete once we get the insert or update on the main
     725                 :      * table and we are sure that the pending toast chunks are not required
     726                 :      * anymore.
     727                 :      *
     728                 :      * If we allow streaming when there are pending toast chunks then such
     729                 :      * chunks won't be released till the insert (multi_insert) is complete and
     730                 :      * we expect the txn to have streamed all changes after streaming.  This
     731                 :      * restriction is mainly to ensure the correctness of streamed
     732                 :      * transactions and it doesn't seem worth uplifting such a restriction
     733                 :      * just to allow this case because anyway we will stream the transaction
     734                 :      * once such an insert is complete.
     735                 :      */
     736 GIC      487057 :     if (toast_insert)
     737 CBC        1451 :         toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
     738          485606 :     else if (rbtxn_has_partial_change(toptxn) &&
     739              33 :              IsInsertOrUpdate(change->action) &&
     740              33 :              change->data.tp.clear_toast_afterwards)
     741              23 :         toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
     742 ECB             : 
     743                 :     /*
     744                 :      * Indicate a partial change for speculative inserts.  The change will be
     745                 :      * considered as complete once we get the speculative confirm or abort
     746                 :      * token.
     747                 :      */
     748 GIC      487057 :     if (IsSpecInsert(change->action))
     749 LBC           0 :         toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
     750 GBC      487057 :     else if (rbtxn_has_partial_change(toptxn) &&
     751 CBC        1461 :              IsSpecConfirmOrAbort(change->action))
     752 LBC           0 :         toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
     753 EUB             : 
     754                 :     /*
     755                 :      * Stream the transaction if it is serialized before and the changes are
     756                 :      * now complete in the top-level transaction.
     757                 :      *
     758                 :      * The reason for doing the streaming of such a transaction as soon as we
     759                 :      * get the complete change for it is that previously it would have reached
     760                 :      * the memory threshold and wouldn't get streamed because of incomplete
     761                 :      * changes.  Delaying such transactions would increase apply lag for them.
     762                 :      */
     763 GIC      487057 :     if (ReorderBufferCanStartStreaming(rb) &&
     764 CBC      157832 :         !(rbtxn_has_partial_change(toptxn)) &&
     765 GNC      156411 :         rbtxn_is_serialized(txn) &&
     766               6 :         rbtxn_has_streamable_change(toptxn))
     767 CBC           6 :         ReorderBufferStreamTXN(rb, toptxn);
     768 ECB             : }
     769                 : 
     770                 : /*
     771                 :  * Queue a change into a transaction so it can be replayed upon commit or will be
     772                 :  * streamed when we reach logical_decoding_work_mem threshold.
     773                 :  */
     774                 : void
     775 GIC     1713723 : ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
     776                 :                          ReorderBufferChange *change, bool toast_insert)
     777 ECB             : {
     778                 :     ReorderBufferTXN *txn;
     779                 : 
     780 GIC     1713723 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
     781                 : 
     782 ECB             :     /*
     783                 :      * While streaming the previous changes we have detected that the
     784                 :      * transaction is aborted.  So there is no point in collecting further
     785                 :      * changes for it.
     786                 :      */
     787 GIC     1713723 :     if (txn->concurrent_abort)
     788                 :     {
     789 ECB             :         /*
     790                 :          * We don't need to update memory accounting for this change as we
     791                 :          * have not added it to the queue yet.
     792                 :          */
     793 GIC          68 :         ReorderBufferReturnChange(rb, change, false);
     794              68 :         return;
     795 ECB             :     }
     796                 : 
     797                 :     /*
     798                 :      * The changes that are sent downstream are considered streamable.  We
     799                 :      * remember such transactions so that only those will later be considered
     800                 :      * for streaming.
     801                 :      */
     802 GNC     1713655 :     if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
     803          538571 :         change->action == REORDER_BUFFER_CHANGE_UPDATE ||
     804          331637 :         change->action == REORDER_BUFFER_CHANGE_DELETE ||
     805           64237 :         change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
     806           46321 :         change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
     807           46282 :         change->action == REORDER_BUFFER_CHANGE_MESSAGE)
     808                 :     {
     809         1667411 :         ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
     810                 : 
     811         1667411 :         toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
     812                 :     }
     813                 : 
     814 GIC     1713655 :     change->lsn = lsn;
     815         1713655 :     change->txn = txn;
     816                 : 
     817         1713655 :     Assert(InvalidXLogRecPtr != lsn);
     818         1713655 :     dlist_push_tail(&txn->changes, &change->node);
     819         1713655 :     txn->nentries++;
     820         1713655 :     txn->nentries_mem++;
     821 ECB             : 
     822                 :     /* update memory accounting information */
     823 CBC     1713655 :     ReorderBufferChangeMemoryUpdate(rb, change, true,
     824 ECB             :                                     ReorderBufferChangeSize(change));
     825                 : 
     826                 :     /* process partial change */
     827 GIC     1713655 :     ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
     828 ECB             : 
     829                 :     /* check the memory limits and evict something if needed */
     830 CBC     1713655 :     ReorderBufferCheckMemoryLimit(rb);
     831                 : }
     832                 : 
     833 ECB             : /*
     834                 :  * A transactional message is queued to be processed upon commit and a
     835                 :  * non-transactional message gets processed immediately.
     836                 :  */
     837                 : void
     838 CBC          44 : ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
     839                 :                           Snapshot snap, XLogRecPtr lsn,
     840                 :                           bool transactional, const char *prefix,
     841                 :                           Size message_size, const char *message)
     842 ECB             : {
     843 GIC          44 :     if (transactional)
     844                 :     {
     845                 :         MemoryContext oldcontext;
     846 ECB             :         ReorderBufferChange *change;
     847                 : 
     848 GIC          38 :         Assert(xid != InvalidTransactionId);
     849 ECB             : 
     850                 :         /*
     851                 :          * We don't expect snapshots for transactional changes - we'll use the
     852                 :          * snapshot derived later during apply (unless the change gets
     853                 :          * skipped).
     854                 :          */
     855 GNC          38 :         Assert(!snap);
     856                 : 
     857 CBC          38 :         oldcontext = MemoryContextSwitchTo(rb->context);
     858                 : 
     859 GIC          38 :         change = ReorderBufferGetChange(rb);
     860              38 :         change->action = REORDER_BUFFER_CHANGE_MESSAGE;
     861              38 :         change->data.msg.prefix = pstrdup(prefix);
     862 CBC          38 :         change->data.msg.message_size = message_size;
     863 GIC          38 :         change->data.msg.message = palloc(message_size);
     864              38 :         memcpy(change->data.msg.message, message, message_size);
     865                 : 
     866              38 :         ReorderBufferQueueChange(rb, xid, lsn, change, false);
     867 ECB             : 
     868 GIC          38 :         MemoryContextSwitchTo(oldcontext);
     869                 :     }
     870                 :     else
     871                 :     {
     872               6 :         ReorderBufferTXN *txn = NULL;
     873 GNC           6 :         volatile Snapshot snapshot_now = snap;
     874 ECB             : 
     875                 :         /* Non-transactional changes require a valid snapshot. */
     876 CBC           6 :         Assert(snapshot_now);
     877                 : 
     878               6 :         if (xid != InvalidTransactionId)
     879               3 :             txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
     880 ECB             : 
     881                 :         /* setup snapshot to allow catalog access */
     882 CBC           6 :         SetupHistoricSnapshot(snapshot_now, NULL);
     883               6 :         PG_TRY();
     884                 :         {
     885               6 :             rb->message(rb, txn, lsn, false, prefix, message_size, message);
     886                 : 
     887               6 :             TeardownHistoricSnapshot(false);
     888                 :         }
     889 UIC           0 :         PG_CATCH();
     890                 :         {
     891 LBC           0 :             TeardownHistoricSnapshot(true);
     892               0 :             PG_RE_THROW();
     893                 :         }
     894 GIC           6 :         PG_END_TRY();
     895 ECB             :     }
     896 GIC          44 : }
     897 ECB             : 
     898                 : /*
     899                 :  * AssertTXNLsnOrder
     900                 :  *      Verify LSN ordering of transaction lists in the reorderbuffer
     901                 :  *
     902                 :  * Other LSN-related invariants are checked too.
     903                 :  *
     904                 :  * No-op if assertions are not in use.
     905                 :  */
     906                 : static void
     907 GIC        6446 : AssertTXNLsnOrder(ReorderBuffer *rb)
     908 EUB             : {
     909                 : #ifdef USE_ASSERT_CHECKING
     910 GBC        6446 :     LogicalDecodingContext *ctx = rb->private_data;
     911 EUB             :     dlist_iter  iter;
     912 GIC        6446 :     XLogRecPtr  prev_first_lsn = InvalidXLogRecPtr;
     913 CBC        6446 :     XLogRecPtr  prev_base_snap_lsn = InvalidXLogRecPtr;
     914                 : 
     915 ECB             :     /*
     916                 :      * Skip the verification if we don't reach the LSN at which we start
     917                 :      * decoding the contents of transactions yet because until we reach the
     918                 :      * LSN, we could have transactions that don't have the association between
     919                 :      * the top-level transaction and subtransaction yet and consequently have
     920                 :      * the same LSN.  We don't guarantee this association until we try to
     921                 :      * decode the actual contents of transaction. The ordering of the records
     922                 :      * prior to the start_decoding_at LSN should have been checked before the
     923                 :      * restart.
     924                 :      */
     925 GIC        6446 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr))
     926 CBC        3428 :         return;
     927                 : 
     928 GIC        5819 :     dlist_foreach(iter, &rb->toplevel_by_lsn)
     929 ECB             :     {
     930 GIC        2801 :         ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
     931 ECB             :                                                     iter.cur);
     932                 : 
     933                 :         /* start LSN must be set */
     934 GIC        2801 :         Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
     935                 : 
     936                 :         /* If there is an end LSN, it must be higher than start LSN */
     937            2801 :         if (cur_txn->end_lsn != InvalidXLogRecPtr)
     938              20 :             Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
     939                 : 
     940                 :         /* Current initial LSN must be strictly higher than previous */
     941            2801 :         if (prev_first_lsn != InvalidXLogRecPtr)
     942             236 :             Assert(prev_first_lsn < cur_txn->first_lsn);
     943                 : 
     944 ECB             :         /* known-as-subtxn txns must not be listed */
     945 CBC        2801 :         Assert(!rbtxn_is_known_subxact(cur_txn));
     946                 : 
     947            2801 :         prev_first_lsn = cur_txn->first_lsn;
     948                 :     }
     949 ECB             : 
     950 GIC        4601 :     dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
     951                 :     {
     952            1583 :         ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
     953 ECB             :                                                     base_snapshot_node,
     954                 :                                                     iter.cur);
     955                 : 
     956                 :         /* base snapshot (and its LSN) must be set */
     957 CBC        1583 :         Assert(cur_txn->base_snapshot != NULL);
     958 GIC        1583 :         Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
     959                 : 
     960 ECB             :         /* current LSN must be strictly higher than previous */
     961 CBC        1583 :         if (prev_base_snap_lsn != InvalidXLogRecPtr)
     962 GIC         170 :             Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
     963                 : 
     964 ECB             :         /* known-as-subtxn txns must not be listed */
     965 GIC        1583 :         Assert(!rbtxn_is_known_subxact(cur_txn));
     966 ECB             : 
     967 GIC        1583 :         prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
     968                 :     }
     969 ECB             : #endif
     970                 : }
     971                 : 
     972                 : /*
     973                 :  * AssertChangeLsnOrder
     974                 :  *
     975                 :  * Check ordering of changes in the (sub)transaction.
     976                 :  */
     977                 : static void
     978 GIC        2162 : AssertChangeLsnOrder(ReorderBufferTXN *txn)
     979                 : {
     980 ECB             : #ifdef USE_ASSERT_CHECKING
     981                 :     dlist_iter  iter;
     982 GIC        2162 :     XLogRecPtr  prev_lsn = txn->first_lsn;
     983                 : 
     984 CBC      181917 :     dlist_foreach(iter, &txn->changes)
     985                 :     {
     986 ECB             :         ReorderBufferChange *cur_change;
     987                 : 
     988 GIC      179755 :         cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
     989                 : 
     990          179755 :         Assert(txn->first_lsn != InvalidXLogRecPtr);
     991          179755 :         Assert(cur_change->lsn != InvalidXLogRecPtr);
     992          179755 :         Assert(txn->first_lsn <= cur_change->lsn);
     993                 : 
     994          179755 :         if (txn->end_lsn != InvalidXLogRecPtr)
     995           26019 :             Assert(cur_change->lsn <= txn->end_lsn);
     996                 : 
     997 CBC      179755 :         Assert(prev_lsn <= cur_change->lsn);
     998                 : 
     999 GIC      179755 :         prev_lsn = cur_change->lsn;
    1000                 :     }
    1001 ECB             : #endif
    1002 GIC        2162 : }
    1003 ECB             : 
    1004                 : /*
    1005                 :  * ReorderBufferGetOldestTXN
    1006                 :  *      Return oldest transaction in reorderbuffer
    1007                 :  */
    1008                 : ReorderBufferTXN *
    1009 CBC         255 : ReorderBufferGetOldestTXN(ReorderBuffer *rb)
    1010 ECB             : {
    1011                 :     ReorderBufferTXN *txn;
    1012                 : 
    1013 CBC         255 :     AssertTXNLsnOrder(rb);
    1014 ECB             : 
    1015 GIC         255 :     if (dlist_is_empty(&rb->toplevel_by_lsn))
    1016 CBC         223 :         return NULL;
    1017                 : 
    1018              32 :     txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
    1019                 : 
    1020 GIC          32 :     Assert(!rbtxn_is_known_subxact(txn));
    1021 CBC          32 :     Assert(txn->first_lsn != InvalidXLogRecPtr);
    1022 GIC          32 :     return txn;
    1023                 : }
    1024                 : 
    1025                 : /*
    1026                 :  * ReorderBufferGetOldestXmin
    1027                 :  *      Return oldest Xmin in reorderbuffer
    1028 ECB             :  *
    1029                 :  * Returns oldest possibly running Xid from the point of view of snapshots
    1030                 :  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
    1031                 :  * there are none.
    1032                 :  *
    1033                 :  * Since snapshots are assigned monotonically, this equals the Xmin of the
    1034                 :  * base snapshot with minimal base_snapshot_lsn.
    1035                 :  */
    1036                 : TransactionId
    1037 CBC         274 : ReorderBufferGetOldestXmin(ReorderBuffer *rb)
    1038                 : {
    1039 ECB             :     ReorderBufferTXN *txn;
    1040                 : 
    1041 CBC         274 :     AssertTXNLsnOrder(rb);
    1042                 : 
    1043 GIC         274 :     if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
    1044             242 :         return InvalidTransactionId;
    1045                 : 
    1046              32 :     txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
    1047                 :                              &rb->txns_by_base_snapshot_lsn);
    1048              32 :     return txn->base_snapshot->xmin;
    1049                 : }
    1050                 : 
    1051                 : void
    1052             279 : ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
    1053                 : {
    1054             279 :     rb->current_restart_decoding_lsn = ptr;
    1055             279 : }
    1056 ECB             : 
    1057                 : /*
    1058                 :  * ReorderBufferAssignChild
    1059                 :  *
    1060                 :  * Make note that we know that subxid is a subtransaction of xid, seen as of
    1061                 :  * the given lsn.
    1062                 :  */
    1063                 : void
    1064 GIC         863 : ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
    1065 ECB             :                          TransactionId subxid, XLogRecPtr lsn)
    1066                 : {
    1067                 :     ReorderBufferTXN *txn;
    1068                 :     ReorderBufferTXN *subtxn;
    1069                 :     bool        new_top;
    1070                 :     bool        new_sub;
    1071                 : 
    1072 GIC         863 :     txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
    1073 CBC         863 :     subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
    1074 ECB             : 
    1075 GIC         863 :     if (!new_sub)
    1076                 :     {
    1077             186 :         if (rbtxn_is_known_subxact(subtxn))
    1078                 :         {
    1079                 :             /* already associated, nothing to do */
    1080             186 :             return;
    1081                 :         }
    1082                 :         else
    1083 ECB             :         {
    1084                 :             /*
    1085                 :              * We already saw this transaction, but initially added it to the
    1086                 :              * list of top-level txns.  Now that we know it's not top-level,
    1087                 :              * remove it from there.
    1088                 :              */
    1089 UIC           0 :             dlist_delete(&subtxn->node);
    1090                 :         }
    1091 ECB             :     }
    1092                 : 
    1093 GIC         677 :     subtxn->txn_flags |= RBTXN_IS_SUBXACT;
    1094 CBC         677 :     subtxn->toplevel_xid = xid;
    1095 GIC         677 :     Assert(subtxn->nsubtxns == 0);
    1096 ECB             : 
    1097                 :     /* set the reference to top-level transaction */
    1098 GIC         677 :     subtxn->toptxn = txn;
    1099 ECB             : 
    1100                 :     /* add to subtransaction list */
    1101 GIC         677 :     dlist_push_tail(&txn->subtxns, &subtxn->node);
    1102             677 :     txn->nsubtxns++;
    1103                 : 
    1104                 :     /* Possibly transfer the subtxn's snapshot to its top-level txn. */
    1105             677 :     ReorderBufferTransferSnapToParent(txn, subtxn);
    1106                 : 
    1107                 :     /* Verify LSN-ordering invariant */
    1108 GBC         677 :     AssertTXNLsnOrder(rb);
    1109                 : }
    1110                 : 
    1111                 : /*
    1112 ECB             :  * ReorderBufferTransferSnapToParent
    1113                 :  *      Transfer base snapshot from subtxn to top-level txn, if needed
    1114                 :  *
    1115                 :  * This is done if the top-level txn doesn't have a base snapshot, or if the
    1116                 :  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
    1117                 :  * snapshot's LSN.  This can happen if there are no changes in the toplevel
    1118                 :  * txn but there are some in the subtxn, or the first change in subtxn has
    1119                 :  * earlier LSN than first change in the top-level txn and we learned about
    1120                 :  * their kinship only now.
    1121                 :  *
    1122                 :  * The subtransaction's snapshot is cleared regardless of the transfer
    1123                 :  * happening, since it's not needed anymore in either case.
    1124                 :  *
    1125                 :  * We do this as soon as we become aware of their kinship, to avoid queueing
    1126                 :  * extra snapshots to txns known-as-subtxns -- only top-level txns will
    1127                 :  * receive further snapshots.
    1128                 :  */
    1129                 : static void
    1130 GIC         681 : ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
    1131                 :                                   ReorderBufferTXN *subtxn)
    1132                 : {
    1133             681 :     Assert(subtxn->toplevel_xid == txn->xid);
    1134                 : 
    1135             681 :     if (subtxn->base_snapshot != NULL)
    1136                 :     {
    1137 UIC           0 :         if (txn->base_snapshot == NULL ||
    1138               0 :             subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
    1139                 :         {
    1140                 :             /*
    1141                 :              * If the toplevel transaction already has a base snapshot but
    1142                 :              * it's newer than the subxact's, purge it.
    1143                 :              */
    1144               0 :             if (txn->base_snapshot != NULL)
    1145                 :             {
    1146               0 :                 SnapBuildSnapDecRefcount(txn->base_snapshot);
    1147               0 :                 dlist_delete(&txn->base_snapshot_node);
    1148                 :             }
    1149 ECB             : 
    1150                 :             /*
    1151                 :              * The snapshot is now the top transaction's; transfer it, and
    1152                 :              * adjust the list position of the top transaction in the list by
    1153                 :              * moving it to where the subtransaction is.
    1154                 :              */
    1155 UIC           0 :             txn->base_snapshot = subtxn->base_snapshot;
    1156 UBC           0 :             txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
    1157               0 :             dlist_insert_before(&subtxn->base_snapshot_node,
    1158                 :                                 &txn->base_snapshot_node);
    1159                 : 
    1160                 :             /*
    1161                 :              * The subtransaction doesn't have a snapshot anymore (so it
    1162                 :              * mustn't be in the list.)
    1163 EUB             :              */
    1164 UIC           0 :             subtxn->base_snapshot = NULL;
    1165 UBC           0 :             subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
    1166               0 :             dlist_delete(&subtxn->base_snapshot_node);
    1167                 :         }
    1168                 :         else
    1169                 :         {
    1170                 :             /* Base snap of toplevel is fine, so subxact's is not needed */
    1171 UIC           0 :             SnapBuildSnapDecRefcount(subtxn->base_snapshot);
    1172               0 :             dlist_delete(&subtxn->base_snapshot_node);
    1173               0 :             subtxn->base_snapshot = NULL;
    1174 UBC           0 :             subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
    1175 EUB             :         }
    1176                 :     }
    1177 GIC         681 : }
    1178                 : 
    1179                 : /*
    1180                 :  * Associate a subtransaction with its toplevel transaction at commit
    1181                 :  * time. There may be no further changes added after this.
    1182                 :  */
    1183 EUB             : void
    1184 GBC         267 : ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
    1185 EUB             :                          TransactionId subxid, XLogRecPtr commit_lsn,
    1186                 :                          XLogRecPtr end_lsn)
    1187                 : {
    1188                 :     ReorderBufferTXN *subtxn;
    1189                 : 
    1190 GBC         267 :     subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
    1191 EUB             :                                    InvalidXLogRecPtr, false);
    1192                 : 
    1193                 :     /*
    1194                 :      * No need to do anything if that subtxn didn't contain any changes
    1195                 :      */
    1196 CBC         267 :     if (!subtxn)
    1197 GIC          81 :         return;
    1198                 : 
    1199             186 :     subtxn->final_lsn = commit_lsn;
    1200             186 :     subtxn->end_lsn = end_lsn;
    1201                 : 
    1202                 :     /*
    1203 ECB             :      * Assign this subxact as a child of the toplevel xact (no-op if already
    1204                 :      * done.)
    1205                 :      */
    1206 GIC         186 :     ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
    1207                 : }
    1208                 : 
    1209 ECB             : 
    1210                 : /*
    1211                 :  * Support for efficiently iterating over a transaction's and its
    1212                 :  * subtransactions' changes.
    1213                 :  *
    1214                 :  * We do by doing a k-way merge between transactions/subtransactions. For that
    1215                 :  * we model the current heads of the different transactions as a binary heap
    1216                 :  * so we easily know which (sub-)transaction has the change with the smallest
    1217                 :  * lsn next.
    1218                 :  *
    1219                 :  * We assume the changes in individual transactions are already sorted by LSN.
    1220                 :  */
    1221                 : 
    1222                 : /*
    1223                 :  * Binary heap comparison function.
    1224                 :  */
    1225                 : static int
    1226 GIC       52311 : ReorderBufferIterCompare(Datum a, Datum b, void *arg)
    1227                 : {
    1228           52311 :     ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
    1229           52311 :     XLogRecPtr  pos_a = state->entries[DatumGetInt32(a)].lsn;
    1230           52311 :     XLogRecPtr  pos_b = state->entries[DatumGetInt32(b)].lsn;
    1231                 : 
    1232           52311 :     if (pos_a < pos_b)
    1233           50985 :         return 1;
    1234            1326 :     else if (pos_a == pos_b)
    1235 UIC           0 :         return 0;
    1236 GIC        1326 :     return -1;
    1237                 : }
    1238                 : 
    1239                 : /*
    1240                 :  * Allocate & initialize an iterator which iterates in lsn order over a
    1241                 :  * transaction and all its subtransactions.
    1242                 :  *
    1243                 :  * Note: The iterator state is returned through iter_state parameter rather
    1244                 :  * than the function's return value.  This is because the state gets cleaned up
    1245 ECB             :  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
    1246                 :  * back the state even if this function throws an exception.
    1247                 :  */
    1248                 : static void
    1249 CBC        1700 : ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
    1250                 :                          ReorderBufferIterTXNState *volatile *iter_state)
    1251 ECB             : {
    1252 CBC        1700 :     Size        nr_txns = 0;
    1253 ECB             :     ReorderBufferIterTXNState *state;
    1254 EUB             :     dlist_iter  cur_txn_i;
    1255 ECB             :     int32       off;
    1256                 : 
    1257 GIC        1700 :     *iter_state = NULL;
    1258                 : 
    1259                 :     /* Check ordering of changes in the toplevel transaction. */
    1260            1700 :     AssertChangeLsnOrder(txn);
    1261                 : 
    1262                 :     /*
    1263                 :      * Calculate the size of our heap: one element for every transaction that
    1264                 :      * contains changes.  (Besides the transactions already in the reorder
    1265                 :      * buffer, we count the one we were directly passed.)
    1266                 :      */
    1267            1700 :     if (txn->nentries > 0)
    1268 CBC        1524 :         nr_txns++;
    1269                 : 
    1270 GIC        2162 :     dlist_foreach(cur_txn_i, &txn->subtxns)
    1271 ECB             :     {
    1272                 :         ReorderBufferTXN *cur_txn;
    1273                 : 
    1274 GIC         462 :         cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
    1275                 : 
    1276 ECB             :         /* Check ordering of changes in this subtransaction. */
    1277 GIC         462 :         AssertChangeLsnOrder(cur_txn);
    1278                 : 
    1279 CBC         462 :         if (cur_txn->nentries > 0)
    1280 GIC         300 :             nr_txns++;
    1281                 :     }
    1282                 : 
    1283                 :     /* allocate iteration state */
    1284                 :     state = (ReorderBufferIterTXNState *)
    1285            1700 :         MemoryContextAllocZero(rb->context,
    1286 ECB             :                                sizeof(ReorderBufferIterTXNState) +
    1287 CBC        1700 :                                sizeof(ReorderBufferIterTXNEntry) * nr_txns);
    1288                 : 
    1289            1700 :     state->nr_txns = nr_txns;
    1290 GIC        1700 :     dlist_init(&state->old_change);
    1291                 : 
    1292            3524 :     for (off = 0; off < state->nr_txns; off++)
    1293 ECB             :     {
    1294 GIC        1824 :         state->entries[off].file.vfd = -1;
    1295            1824 :         state->entries[off].segno = 0;
    1296 ECB             :     }
    1297                 : 
    1298                 :     /* allocate heap */
    1299 CBC        1700 :     state->heap = binaryheap_allocate(state->nr_txns,
    1300                 :                                       ReorderBufferIterCompare,
    1301                 :                                       state);
    1302                 : 
    1303                 :     /* Now that the state fields are initialized, it is safe to return it. */
    1304            1700 :     *iter_state = state;
    1305                 : 
    1306 ECB             :     /*
    1307                 :      * Now insert items into the binary heap, in an unordered fashion.  (We
    1308                 :      * will run a heap assembly step at the end; this is more efficient.)
    1309                 :      */
    1310                 : 
    1311 CBC        1700 :     off = 0;
    1312                 : 
    1313 ECB             :     /* add toplevel transaction if it contains changes */
    1314 CBC        1700 :     if (txn->nentries > 0)
    1315                 :     {
    1316                 :         ReorderBufferChange *cur_change;
    1317                 : 
    1318            1524 :         if (rbtxn_is_serialized(txn))
    1319                 :         {
    1320                 :             /* serialize remaining changes */
    1321 GIC          21 :             ReorderBufferSerializeTXN(rb, txn);
    1322              21 :             ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
    1323 ECB             :                                         &state->entries[off].segno);
    1324                 :         }
    1325                 : 
    1326 GIC        1524 :         cur_change = dlist_head_element(ReorderBufferChange, node,
    1327                 :                                         &txn->changes);
    1328                 : 
    1329            1524 :         state->entries[off].lsn = cur_change->lsn;
    1330 CBC        1524 :         state->entries[off].change = cur_change;
    1331 GIC        1524 :         state->entries[off].txn = txn;
    1332                 : 
    1333 CBC        1524 :         binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
    1334                 :     }
    1335                 : 
    1336                 :     /* add subtransactions if they contain changes */
    1337            2162 :     dlist_foreach(cur_txn_i, &txn->subtxns)
    1338                 :     {
    1339                 :         ReorderBufferTXN *cur_txn;
    1340 ECB             : 
    1341 CBC         462 :         cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
    1342                 : 
    1343 GIC         462 :         if (cur_txn->nentries > 0)
    1344                 :         {
    1345 ECB             :             ReorderBufferChange *cur_change;
    1346                 : 
    1347 GIC         300 :             if (rbtxn_is_serialized(cur_txn))
    1348 ECB             :             {
    1349                 :                 /* serialize remaining changes */
    1350 CBC          16 :                 ReorderBufferSerializeTXN(rb, cur_txn);
    1351 GIC          16 :                 ReorderBufferRestoreChanges(rb, cur_txn,
    1352 ECB             :                                             &state->entries[off].file,
    1353                 :                                             &state->entries[off].segno);
    1354                 :             }
    1355 GIC         300 :             cur_change = dlist_head_element(ReorderBufferChange, node,
    1356 ECB             :                                             &cur_txn->changes);
    1357                 : 
    1358 GIC         300 :             state->entries[off].lsn = cur_change->lsn;
    1359             300 :             state->entries[off].change = cur_change;
    1360 CBC         300 :             state->entries[off].txn = cur_txn;
    1361                 : 
    1362             300 :             binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
    1363                 :         }
    1364                 :     }
    1365                 : 
    1366 ECB             :     /* assemble a valid binary heap */
    1367 GIC        1700 :     binaryheap_build(state->heap);
    1368            1700 : }
    1369 ECB             : 
    1370                 : /*
    1371                 :  * Return the next change when iterating over a transaction and its
    1372                 :  * subtransactions.
    1373                 :  *
    1374                 :  * Returns NULL when no further changes exist.
    1375                 :  */
    1376                 : static ReorderBufferChange *
    1377 CBC      354474 : ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
    1378 ECB             : {
    1379                 :     ReorderBufferChange *change;
    1380                 :     ReorderBufferIterTXNEntry *entry;
    1381                 :     int32       off;
    1382                 : 
    1383                 :     /* nothing there anymore */
    1384 GIC      354474 :     if (state->heap->bh_size == 0)
    1385            1689 :         return NULL;
    1386 ECB             : 
    1387 CBC      352785 :     off = DatumGetInt32(binaryheap_first(state->heap));
    1388 GIC      352785 :     entry = &state->entries[off];
    1389                 : 
    1390                 :     /* free memory we might have "leaked" in the previous *Next call */
    1391          352785 :     if (!dlist_is_empty(&state->old_change))
    1392                 :     {
    1393              45 :         change = dlist_container(ReorderBufferChange, node,
    1394                 :                                  dlist_pop_head_node(&state->old_change));
    1395              45 :         ReorderBufferReturnChange(rb, change, true);
    1396 CBC          45 :         Assert(dlist_is_empty(&state->old_change));
    1397                 :     }
    1398                 : 
    1399 GIC      352785 :     change = entry->change;
    1400                 : 
    1401                 :     /*
    1402                 :      * update heap with information about which transaction has the next
    1403 ECB             :      * relevant change in LSN order
    1404                 :      */
    1405                 : 
    1406                 :     /* there are in-memory changes */
    1407 CBC      352785 :     if (dlist_has_next(&entry->txn->changes, &entry->change->node))
    1408                 :     {
    1409 GIC      350928 :         dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
    1410 CBC      350928 :         ReorderBufferChange *next_change =
    1411 GIC      350928 :         dlist_container(ReorderBufferChange, node, next);
    1412 ECB             : 
    1413                 :         /* txn stays the same */
    1414 CBC      350928 :         state->entries[off].lsn = next_change->lsn;
    1415          350928 :         state->entries[off].change = next_change;
    1416                 : 
    1417 GIC      350928 :         binaryheap_replace_first(state->heap, Int32GetDatum(off));
    1418 CBC      350928 :         return change;
    1419                 :     }
    1420                 : 
    1421                 :     /* try to load changes from disk */
    1422 GIC        1857 :     if (entry->txn->nentries != entry->txn->nentries_mem)
    1423                 :     {
    1424                 :         /*
    1425                 :          * Ugly: restoring changes will reuse *Change records, thus delete the
    1426 ECB             :          * current one from the per-tx list and only free in the next call.
    1427                 :          */
    1428 CBC          65 :         dlist_delete(&change->node);
    1429              65 :         dlist_push_tail(&state->old_change, &change->node);
    1430 ECB             : 
    1431                 :         /*
    1432                 :          * Update the total bytes processed by the txn for which we are
    1433                 :          * releasing the current set of changes and restoring the new set of
    1434                 :          * changes.
    1435                 :          */
    1436 CBC          65 :         rb->totalBytes += entry->txn->size;
    1437              65 :         if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
    1438                 :                                         &state->entries[off].segno))
    1439                 :         {
    1440                 :             /* successfully restored changes from disk */
    1441 ECB             :             ReorderBufferChange *next_change =
    1442 GIC          36 :             dlist_head_element(ReorderBufferChange, node,
    1443                 :                                &entry->txn->changes);
    1444                 : 
    1445              36 :             elog(DEBUG2, "restored %u/%u changes from disk",
    1446                 :                  (uint32) entry->txn->nentries_mem,
    1447 ECB             :                  (uint32) entry->txn->nentries);
    1448                 : 
    1449 GIC          36 :             Assert(entry->txn->nentries_mem);
    1450                 :             /* txn stays the same */
    1451              36 :             state->entries[off].lsn = next_change->lsn;
    1452              36 :             state->entries[off].change = next_change;
    1453              36 :             binaryheap_replace_first(state->heap, Int32GetDatum(off));
    1454                 : 
    1455 CBC          36 :             return change;
    1456 ECB             :         }
    1457                 :     }
    1458                 : 
    1459                 :     /* ok, no changes there anymore, remove */
    1460 GIC        1821 :     binaryheap_remove_first(state->heap);
    1461 ECB             : 
    1462 GIC        1821 :     return change;
    1463                 : }
    1464 ECB             : 
    1465                 : /*
    1466                 :  * Deallocate the iterator
    1467                 :  */
    1468                 : static void
    1469 GIC        1699 : ReorderBufferIterTXNFinish(ReorderBuffer *rb,
    1470 ECB             :                            ReorderBufferIterTXNState *state)
    1471                 : {
    1472                 :     int32       off;
    1473                 : 
    1474 CBC        3522 :     for (off = 0; off < state->nr_txns; off++)
    1475                 :     {
    1476 GIC        1823 :         if (state->entries[off].file.vfd != -1)
    1477 UIC           0 :             FileClose(state->entries[off].file.vfd);
    1478                 :     }
    1479 ECB             : 
    1480                 :     /* free memory we might have "leaked" in the last *Next call */
    1481 CBC        1699 :     if (!dlist_is_empty(&state->old_change))
    1482                 :     {
    1483                 :         ReorderBufferChange *change;
    1484                 : 
    1485 GIC          19 :         change = dlist_container(ReorderBufferChange, node,
    1486                 :                                  dlist_pop_head_node(&state->old_change));
    1487              19 :         ReorderBufferReturnChange(rb, change, true);
    1488 CBC          19 :         Assert(dlist_is_empty(&state->old_change));
    1489                 :     }
    1490                 : 
    1491 GIC        1699 :     binaryheap_free(state->heap);
    1492            1699 :     pfree(state);
    1493 CBC        1699 : }
    1494                 : 
    1495 ECB             : /*
    1496 EUB             :  * Cleanup the contents of a transaction, usually after the transaction
    1497                 :  * committed or aborted.
    1498                 :  */
    1499                 : static void
    1500 CBC        3262 : ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    1501                 : {
    1502                 :     bool        found;
    1503                 :     dlist_mutable_iter iter;
    1504 ECB             : 
    1505                 :     /* cleanup subtransactions & their changes */
    1506 CBC        3447 :     dlist_foreach_modify(iter, &txn->subtxns)
    1507 ECB             :     {
    1508                 :         ReorderBufferTXN *subtxn;
    1509                 : 
    1510 CBC         185 :         subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
    1511 ECB             : 
    1512                 :         /*
    1513                 :          * Subtransactions are always associated to the toplevel TXN, even if
    1514                 :          * they originally were happening inside another subtxn, so we won't
    1515                 :          * ever recurse more than one level deep here.
    1516                 :          */
    1517 GIC         185 :         Assert(rbtxn_is_known_subxact(subtxn));
    1518             185 :         Assert(subtxn->nsubtxns == 0);
    1519 ECB             : 
    1520 GIC         185 :         ReorderBufferCleanupTXN(rb, subtxn);
    1521                 :     }
    1522                 : 
    1523                 :     /* cleanup changes in the txn */
    1524           75313 :     dlist_foreach_modify(iter, &txn->changes)
    1525 ECB             :     {
    1526                 :         ReorderBufferChange *change;
    1527                 : 
    1528 GIC       72051 :         change = dlist_container(ReorderBufferChange, node, iter.cur);
    1529 ECB             : 
    1530                 :         /* Check we're not mixing changes from different transactions. */
    1531 GIC       72051 :         Assert(change->txn == txn);
    1532                 : 
    1533           72051 :         ReorderBufferReturnChange(rb, change, true);
    1534                 :     }
    1535                 : 
    1536 ECB             :     /*
    1537                 :      * Cleanup the tuplecids we stored for decoding catalog snapshot access.
    1538                 :      * They are always stored in the toplevel transaction.
    1539                 :      */
    1540 GIC       25831 :     dlist_foreach_modify(iter, &txn->tuplecids)
    1541                 :     {
    1542                 :         ReorderBufferChange *change;
    1543 ECB             : 
    1544 GIC       22569 :         change = dlist_container(ReorderBufferChange, node, iter.cur);
    1545                 : 
    1546                 :         /* Check we're not mixing changes from different transactions. */
    1547 CBC       22569 :         Assert(change->txn == txn);
    1548 GIC       22569 :         Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
    1549                 : 
    1550 CBC       22569 :         ReorderBufferReturnChange(rb, change, true);
    1551                 :     }
    1552 ECB             : 
    1553                 :     /*
    1554                 :      * Cleanup the base snapshot, if set.
    1555                 :      */
    1556 GIC        3262 :     if (txn->base_snapshot != NULL)
    1557                 :     {
    1558            2568 :         SnapBuildSnapDecRefcount(txn->base_snapshot);
    1559 CBC        2568 :         dlist_delete(&txn->base_snapshot_node);
    1560                 :     }
    1561                 : 
    1562                 :     /*
    1563 ECB             :      * Cleanup the snapshot for the last streamed run.
    1564                 :      */
    1565 GIC        3262 :     if (txn->snapshot_now != NULL)
    1566 ECB             :     {
    1567 CBC          62 :         Assert(rbtxn_is_streamed(txn));
    1568 GIC          62 :         ReorderBufferFreeSnap(rb, txn->snapshot_now);
    1569 ECB             :     }
    1570                 : 
    1571                 :     /*
    1572                 :      * Remove TXN from its containing lists.
    1573                 :      *
    1574                 :      * Note: if txn is known as subxact, we are deleting the TXN from its
    1575                 :      * parent's list of known subxacts; this leaves the parent's nsubxacts
    1576                 :      * count too high, but we don't care.  Otherwise, we are deleting the TXN
    1577                 :      * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
    1578                 :      * list of catalog modifying transactions as well.
    1579                 :      */
    1580 GIC        3262 :     dlist_delete(&txn->node);
    1581 GNC        3262 :     if (rbtxn_has_catalog_changes(txn))
    1582            1023 :         dclist_delete_from(&rb->catchange_txns, &txn->catchange_node);
    1583                 : 
    1584                 :     /* now remove reference from buffer */
    1585            3262 :     hash_search(rb->by_txn,  &txn->xid, HASH_REMOVE,  &found);
    1586 CBC        3262 :     Assert(found);
    1587 ECB             : 
    1588                 :     /* remove entries spilled to disk */
    1589 GIC        3262 :     if (rbtxn_is_serialized(txn))
    1590             292 :         ReorderBufferRestoreCleanup(rb, txn);
    1591                 : 
    1592                 :     /* deallocate */
    1593            3262 :     ReorderBufferReturnTXN(rb, txn);
    1594            3262 : }
    1595                 : 
    1596                 : /*
    1597                 :  * Discard changes from a transaction (and subtransactions), either after
    1598                 :  * streaming or decoding them at PREPARE. Keep the remaining info -
    1599 ECB             :  * transactions, tuplecids, invalidations and snapshots.
    1600                 :  *
    1601                 :  * We additionally remove tuplecids after decoding the transaction at prepare
    1602                 :  * time as we only need to perform invalidation at rollback or commit prepared.
    1603                 :  *
    1604                 :  * 'txn_prepared' indicates that we have decoded the transaction at prepare
    1605                 :  * time.
    1606                 :  */
    1607                 : static void
    1608 CBC         993 : ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
    1609 ECB             : {
    1610                 :     dlist_mutable_iter iter;
    1611                 : 
    1612                 :     /* cleanup subtransactions & their changes */
    1613 CBC        1289 :     dlist_foreach_modify(iter, &txn->subtxns)
    1614                 :     {
    1615                 :         ReorderBufferTXN *subtxn;
    1616                 : 
    1617 GIC         296 :         subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
    1618                 : 
    1619                 :         /*
    1620                 :          * Subtransactions are always associated to the toplevel TXN, even if
    1621                 :          * they originally were happening inside another subtxn, so we won't
    1622                 :          * ever recurse more than one level deep here.
    1623                 :          */
    1624             296 :         Assert(rbtxn_is_known_subxact(subtxn));
    1625             296 :         Assert(subtxn->nsubtxns == 0);
    1626                 : 
    1627 CBC         296 :         ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
    1628                 :     }
    1629                 : 
    1630                 :     /* cleanup changes in the txn */
    1631 GIC      157382 :     dlist_foreach_modify(iter, &txn->changes)
    1632 ECB             :     {
    1633                 :         ReorderBufferChange *change;
    1634                 : 
    1635 GIC      156389 :         change = dlist_container(ReorderBufferChange, node, iter.cur);
    1636 ECB             : 
    1637                 :         /* Check we're not mixing changes from different transactions. */
    1638 GIC      156389 :         Assert(change->txn == txn);
    1639                 : 
    1640                 :         /* remove the change from it's containing list */
    1641          156389 :         dlist_delete(&change->node);
    1642                 : 
    1643 CBC      156389 :         ReorderBufferReturnChange(rb, change, true);
    1644 ECB             :     }
    1645                 : 
    1646                 :     /*
    1647                 :      * Mark the transaction as streamed.
    1648                 :      *
    1649                 :      * The top-level transaction, is marked as streamed always, even if it
    1650                 :      * does not contain any changes (that is, when all the changes are in
    1651                 :      * subtransactions).
    1652                 :      *
    1653                 :      * For subtransactions, we only mark them as streamed when there are
    1654                 :      * changes in them.
    1655                 :      *
    1656                 :      * We do it this way because of aborts - we don't want to send aborts for
    1657                 :      * XIDs the downstream is not aware of. And of course, it always knows
    1658                 :      * about the toplevel xact (we send the XID in all messages), but we never
    1659                 :      * stream XIDs of empty subxacts.
    1660                 :      */
    1661 GNC         993 :     if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
    1662 CBC         781 :         txn->txn_flags |= RBTXN_IS_STREAMED;
    1663                 : 
    1664 GIC         993 :     if (txn_prepared)
    1665                 :     {
    1666                 :         /*
    1667                 :          * If this is a prepared txn, cleanup the tuplecids we stored for
    1668                 :          * decoding catalog snapshot access. They are always stored in the
    1669                 :          * toplevel transaction.
    1670                 :          */
    1671             174 :         dlist_foreach_modify(iter, &txn->tuplecids)
    1672                 :         {
    1673                 :             ReorderBufferChange *change;
    1674                 : 
    1675             123 :             change = dlist_container(ReorderBufferChange, node, iter.cur);
    1676                 : 
    1677                 :             /* Check we're not mixing changes from different transactions. */
    1678             123 :             Assert(change->txn == txn);
    1679             123 :             Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
    1680 ECB             : 
    1681                 :             /* Remove the change from its containing list. */
    1682 GIC         123 :             dlist_delete(&change->node);
    1683 ECB             : 
    1684 GIC         123 :             ReorderBufferReturnChange(rb, change, true);
    1685                 :         }
    1686                 :     }
    1687                 : 
    1688                 :     /*
    1689                 :      * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
    1690 ECB             :      * memory. We could also keep the hash table and update it with new ctid
    1691                 :      * values, but this seems simpler and good enough for now.
    1692                 :      */
    1693 GIC         993 :     if (txn->tuplecid_hash != NULL)
    1694 ECB             :     {
    1695 GIC          22 :         hash_destroy(txn->tuplecid_hash);
    1696              22 :         txn->tuplecid_hash = NULL;
    1697 ECB             :     }
    1698                 : 
    1699                 :     /* If this txn is serialized then clean the disk space. */
    1700 GIC         993 :     if (rbtxn_is_serialized(txn))
    1701 ECB             :     {
    1702 GIC           6 :         ReorderBufferRestoreCleanup(rb, txn);
    1703 CBC           6 :         txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
    1704                 : 
    1705                 :         /*
    1706                 :          * We set this flag to indicate if the transaction is ever serialized.
    1707                 :          * We need this to accurately update the stats as otherwise the same
    1708                 :          * transaction can be counted as serialized multiple times.
    1709                 :          */
    1710 GIC           6 :         txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
    1711                 :     }
    1712 ECB             : 
    1713                 :     /* also reset the number of entries in the transaction */
    1714 CBC         993 :     txn->nentries_mem = 0;
    1715             993 :     txn->nentries = 0;
    1716 GIC         993 : }
    1717                 : 
    1718                 : /*
    1719                 :  * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
    1720                 :  * HeapTupleSatisfiesHistoricMVCC.
    1721 ECB             :  */
    1722                 : static void
    1723 GIC        1700 : ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
    1724                 : {
    1725                 :     dlist_iter  iter;
    1726                 :     HASHCTL     hash_ctl;
    1727                 : 
    1728            1700 :     if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
    1729 CBC        1246 :         return;
    1730                 : 
    1731 GIC         454 :     hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
    1732             454 :     hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
    1733 CBC         454 :     hash_ctl.hcxt = rb->context;
    1734 ECB             : 
    1735                 :     /*
    1736                 :      * create the hash with the exact number of to-be-stored tuplecids from
    1737                 :      * the start
    1738                 :      */
    1739 GIC         454 :     txn->tuplecid_hash =
    1740             454 :         hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
    1741                 :                     HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
    1742 ECB             : 
    1743 GIC       10013 :     dlist_foreach(iter, &txn->tuplecids)
    1744                 :     {
    1745                 :         ReorderBufferTupleCidKey key;
    1746                 :         ReorderBufferTupleCidEnt *ent;
    1747 ECB             :         bool        found;
    1748                 :         ReorderBufferChange *change;
    1749                 : 
    1750 CBC        9559 :         change = dlist_container(ReorderBufferChange, node, iter.cur);
    1751 ECB             : 
    1752 CBC        9559 :         Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
    1753                 : 
    1754                 :         /* be careful about padding */
    1755 GIC        9559 :         memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
    1756                 : 
    1757 GNC        9559 :         key.rlocator = change->data.tuplecid.locator;
    1758 ECB             : 
    1759 CBC        9559 :         ItemPointerCopy(&change->data.tuplecid.tid,
    1760                 :                         &key.tid);
    1761                 : 
    1762 ECB             :         ent = (ReorderBufferTupleCidEnt *)
    1763 GNC        9559 :             hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
    1764 GIC        9559 :         if (!found)
    1765                 :         {
    1766 CBC        8027 :             ent->cmin = change->data.tuplecid.cmin;
    1767 GIC        8027 :             ent->cmax = change->data.tuplecid.cmax;
    1768 CBC        8027 :             ent->combocid = change->data.tuplecid.combocid;
    1769                 :         }
    1770                 :         else
    1771 ECB             :         {
    1772                 :             /*
    1773                 :              * Maybe we already saw this tuple before in this transaction, but
    1774                 :              * if so it must have the same cmin.
    1775                 :              */
    1776 GIC        1532 :             Assert(ent->cmin == change->data.tuplecid.cmin);
    1777                 : 
    1778                 :             /*
    1779 ECB             :              * cmax may be initially invalid, but once set it can only grow,
    1780                 :              * and never become invalid again.
    1781                 :              */
    1782 CBC        1532 :             Assert((ent->cmax == InvalidCommandId) ||
    1783 ECB             :                    ((change->data.tuplecid.cmax != InvalidCommandId) &&
    1784                 :                     (change->data.tuplecid.cmax > ent->cmax)));
    1785 GIC        1532 :             ent->cmax = change->data.tuplecid.cmax;
    1786                 :         }
    1787                 :     }
    1788                 : }
    1789                 : 
    1790                 : /*
    1791                 :  * Copy a provided snapshot so we can modify it privately. This is needed so
    1792 ECB             :  * that catalog modifying transactions can look into intermediate catalog
    1793                 :  * states.
    1794                 :  */
    1795                 : static Snapshot
    1796 GIC        1534 : ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
    1797                 :                       ReorderBufferTXN *txn, CommandId cid)
    1798 ECB             : {
    1799                 :     Snapshot    snap;
    1800                 :     dlist_iter  iter;
    1801 CBC        1534 :     int         i = 0;
    1802                 :     Size        size;
    1803                 : 
    1804 GIC        1534 :     size = sizeof(SnapshotData) +
    1805            1534 :         sizeof(TransactionId) * orig_snap->xcnt +
    1806            1534 :         sizeof(TransactionId) * (txn->nsubtxns + 1);
    1807                 : 
    1808            1534 :     snap = MemoryContextAllocZero(rb->context, size);
    1809            1534 :     memcpy(snap, orig_snap, sizeof(SnapshotData));
    1810                 : 
    1811            1534 :     snap->copied = true;
    1812 CBC        1534 :     snap->active_count = 1;      /* mark as active so nobody frees it */
    1813 GIC        1534 :     snap->regd_count = 0;
    1814            1534 :     snap->xip = (TransactionId *) (snap + 1);
    1815                 : 
    1816            1534 :     memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
    1817 ECB             : 
    1818                 :     /*
    1819                 :      * snap->subxip contains all txids that belong to our transaction which we
    1820                 :      * need to check via cmin/cmax. That's why we store the toplevel
    1821                 :      * transaction in there as well.
    1822                 :      */
    1823 GIC        1534 :     snap->subxip = snap->xip + snap->xcnt;
    1824 CBC        1534 :     snap->subxip[i++] = txn->xid;
    1825 ECB             : 
    1826                 :     /*
    1827                 :      * subxcnt isn't decreased when subtransactions abort, so count manually.
    1828                 :      * Since it's an upper boundary it is safe to use it for the allocation
    1829                 :      * above.
    1830                 :      */
    1831 GIC        1534 :     snap->subxcnt = 1;
    1832 ECB             : 
    1833 GIC        1842 :     dlist_foreach(iter, &txn->subtxns)
    1834                 :     {
    1835                 :         ReorderBufferTXN *sub_txn;
    1836                 : 
    1837             308 :         sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
    1838             308 :         snap->subxip[i++] = sub_txn->xid;
    1839 CBC         308 :         snap->subxcnt++;
    1840 ECB             :     }
    1841                 : 
    1842                 :     /* sort so we can bsearch() later */
    1843 GIC        1534 :     qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
    1844                 : 
    1845                 :     /* store the specified current CommandId */
    1846            1534 :     snap->curcid = cid;
    1847 ECB             : 
    1848 GIC        1534 :     return snap;
    1849 ECB             : }
    1850                 : 
    1851                 : /*
    1852                 :  * Free a previously ReorderBufferCopySnap'ed snapshot
    1853                 :  */
    1854                 : static void
    1855 CBC        2528 : ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
    1856                 : {
    1857 GIC        2528 :     if (snap->copied)
    1858            1532 :         pfree(snap);
    1859 ECB             :     else
    1860 GIC         996 :         SnapBuildSnapDecRefcount(snap);
    1861            2528 : }
    1862 ECB             : 
    1863                 : /*
    1864                 :  * If the transaction was (partially) streamed, we need to prepare or commit
    1865                 :  * it in a 'streamed' way.  That is, we first stream the remaining part of the
    1866                 :  * transaction, and then invoke stream_prepare or stream_commit message as per
    1867                 :  * the case.
    1868                 :  */
    1869                 : static void
    1870 GIC          61 : ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
    1871 ECB             : {
    1872                 :     /* we should only call this for previously streamed transactions */
    1873 CBC          61 :     Assert(rbtxn_is_streamed(txn));
    1874 ECB             : 
    1875 GIC          61 :     ReorderBufferStreamTXN(rb, txn);
    1876 ECB             : 
    1877 CBC          61 :     if (rbtxn_prepared(txn))
    1878                 :     {
    1879                 :         /*
    1880                 :          * Note, we send stream prepare even if a concurrent abort is
    1881                 :          * detected. See DecodePrepare for more information.
    1882                 :          */
    1883 GIC          12 :         rb->stream_prepare(rb, txn, txn->final_lsn);
    1884                 : 
    1885                 :         /*
    1886 ECB             :          * This is a PREPARED transaction, part of a two-phase commit. The
    1887                 :          * full cleanup will happen as part of the COMMIT PREPAREDs, so now
    1888                 :          * just truncate txn by removing changes and tuple_cids.
    1889                 :          */
    1890 GIC          12 :         ReorderBufferTruncateTXN(rb, txn, true);
    1891 ECB             :         /* Reset the CheckXidAlive */
    1892 GIC          12 :         CheckXidAlive = InvalidTransactionId;
    1893 ECB             :     }
    1894                 :     else
    1895                 :     {
    1896 GIC          49 :         rb->stream_commit(rb, txn, txn->final_lsn);
    1897              49 :         ReorderBufferCleanupTXN(rb, txn);
    1898                 :     }
    1899 CBC          61 : }
    1900                 : 
    1901                 : /*
    1902                 :  * Set xid to detect concurrent aborts.
    1903                 :  *
    1904                 :  * While streaming an in-progress transaction or decoding a prepared
    1905                 :  * transaction there is a possibility that the (sub)transaction might get
    1906 ECB             :  * aborted concurrently.  In such case if the (sub)transaction has catalog
    1907                 :  * update then we might decode the tuple using wrong catalog version.  For
    1908                 :  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0).  Now,
    1909                 :  * the transaction 501 updates the catalog tuple and after that we will have
    1910                 :  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0).  Now, if 501 is
    1911                 :  * aborted and some other transaction say 502 updates the same catalog tuple
    1912                 :  * then the first tuple will be changed to (xmin: 500, xmax: 502).  So, the
    1913                 :  * problem is that when we try to decode the tuple inserted/updated in 501
    1914                 :  * after the catalog update, we will see the catalog tuple with (xmin: 500,
    1915                 :  * xmax: 502) as visible because it will consider that the tuple is deleted by
    1916                 :  * xid 502 which is not visible to our snapshot.  And when we will try to
    1917                 :  * decode with that catalog tuple, it can lead to a wrong result or a crash.
    1918                 :  * So, it is necessary to detect concurrent aborts to allow streaming of
    1919                 :  * in-progress transactions or decoding of prepared transactions.
    1920                 :  *
    1921                 :  * For detecting the concurrent abort we set CheckXidAlive to the current
    1922                 :  * (sub)transaction's xid for which this change belongs to.  And, during
    1923                 :  * catalog scan we can check the status of the xid and if it is aborted we will
    1924                 :  * report a specific error so that we can stop streaming current transaction
    1925                 :  * and discard the already streamed changes on such an error.  We might have
    1926                 :  * already streamed some of the changes for the aborted (sub)transaction, but
    1927                 :  * that is fine because when we decode the abort we will stream abort message
    1928                 :  * to truncate the changes in the subscriber. Similarly, for prepared
    1929                 :  * transactions, we stop decoding if concurrent abort is detected and then
    1930                 :  * rollback the changes when rollback prepared is encountered. See
    1931                 :  * DecodePrepare.
    1932                 :  */
    1933                 : static inline void
    1934 GIC      177674 : SetupCheckXidLive(TransactionId xid)
    1935                 : {
    1936                 :     /*
    1937                 :      * If the input transaction id is already set as a CheckXidAlive then
    1938                 :      * nothing to do.
    1939                 :      */
    1940          177674 :     if (TransactionIdEquals(CheckXidAlive, xid))
    1941           91730 :         return;
    1942                 : 
    1943                 :     /*
    1944                 :      * setup CheckXidAlive if it's not committed yet.  We don't check if the
    1945                 :      * xid is aborted.  That will happen during catalog access.
    1946                 :      */
    1947           85944 :     if (!TransactionIdDidCommit(xid))
    1948             322 :         CheckXidAlive = xid;
    1949                 :     else
    1950 CBC       85622 :         CheckXidAlive = InvalidTransactionId;
    1951                 : }
    1952                 : 
    1953                 : /*
    1954                 :  * Helper function for ReorderBufferProcessTXN for applying change.
    1955                 :  */
    1956 ECB             : static inline void
    1957 CBC      333687 : ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
    1958                 :                          Relation relation, ReorderBufferChange *change,
    1959                 :                          bool streaming)
    1960                 : {
    1961 GIC      333687 :     if (streaming)
    1962          175984 :         rb->stream_change(rb, txn, relation, change);
    1963 ECB             :     else
    1964 CBC      157703 :         rb->apply_change(rb, txn, relation, change);
    1965 GIC      333683 : }
    1966 ECB             : 
    1967                 : /*
    1968                 :  * Helper function for ReorderBufferProcessTXN for applying the truncate.
    1969                 :  */
    1970                 : static inline void
    1971 GIC          17 : ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
    1972                 :                            int nrelations, Relation *relations,
    1973 ECB             :                            ReorderBufferChange *change, bool streaming)
    1974                 : {
    1975 GIC          17 :     if (streaming)
    1976 UIC           0 :         rb->stream_truncate(rb, txn, nrelations, relations, change);
    1977 ECB             :     else
    1978 CBC          17 :         rb->apply_truncate(rb, txn, nrelations, relations, change);
    1979 GIC          17 : }
    1980 ECB             : 
    1981                 : /*
    1982                 :  * Helper function for ReorderBufferProcessTXN for applying the message.
    1983                 :  */
    1984                 : static inline void
    1985 GIC          11 : ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
    1986                 :                           ReorderBufferChange *change, bool streaming)
    1987 ECB             : {
    1988 GIC          11 :     if (streaming)
    1989               3 :         rb->stream_message(rb, txn, change->lsn, true,
    1990               3 :                            change->data.msg.prefix,
    1991 ECB             :                            change->data.msg.message_size,
    1992 GBC           3 :                            change->data.msg.message);
    1993                 :     else
    1994 CBC           8 :         rb->message(rb, txn, change->lsn, true,
    1995               8 :                     change->data.msg.prefix,
    1996                 :                     change->data.msg.message_size,
    1997 GIC           8 :                     change->data.msg.message);
    1998              11 : }
    1999                 : 
    2000                 : /*
    2001 ECB             :  * Function to store the command id and snapshot at the end of the current
    2002                 :  * stream so that we can reuse the same while sending the next stream.
    2003                 :  */
    2004                 : static inline void
    2005 CBC         660 : ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2006 ECB             :                              Snapshot snapshot_now, CommandId command_id)
    2007                 : {
    2008 CBC         660 :     txn->command_id = command_id;
    2009                 : 
    2010 ECB             :     /* Avoid copying if it's already copied. */
    2011 CBC         660 :     if (snapshot_now->copied)
    2012 GIC         660 :         txn->snapshot_now = snapshot_now;
    2013 ECB             :     else
    2014 LBC           0 :         txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
    2015                 :                                                   txn, command_id);
    2016 GIC         660 : }
    2017                 : 
    2018                 : /*
    2019                 :  * Helper function for ReorderBufferProcessTXN to handle the concurrent
    2020                 :  * abort of the streaming transaction.  This resets the TXN such that it
    2021 ECB             :  * can be used to stream the remaining data of transaction being processed.
    2022                 :  * This can happen when the subtransaction is aborted and we still want to
    2023                 :  * continue processing the main or other subtransactions data.
    2024                 :  */
    2025                 : static void
    2026 GIC           7 : ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2027 ECB             :                       Snapshot snapshot_now,
    2028                 :                       CommandId command_id,
    2029                 :                       XLogRecPtr last_lsn,
    2030 EUB             :                       ReorderBufferChange *specinsert)
    2031                 : {
    2032 ECB             :     /* Discard the changes that we just streamed */
    2033 GIC           7 :     ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
    2034                 : 
    2035                 :     /* Free all resources allocated for toast reconstruction */
    2036               7 :     ReorderBufferToastReset(rb, txn);
    2037                 : 
    2038                 :     /* Return the spec insert change if it is not NULL */
    2039               7 :     if (specinsert != NULL)
    2040                 :     {
    2041 UIC           0 :         ReorderBufferReturnChange(rb, specinsert, true);
    2042 LBC           0 :         specinsert = NULL;
    2043                 :     }
    2044                 : 
    2045                 :     /*
    2046                 :      * For the streaming case, stop the stream and remember the command ID and
    2047                 :      * snapshot for the streaming run.
    2048                 :      */
    2049 CBC           7 :     if (rbtxn_is_streamed(txn))
    2050                 :     {
    2051 GIC           7 :         rb->stream_stop(rb, txn, last_lsn);
    2052 CBC           7 :         ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
    2053                 :     }
    2054 GIC           7 : }
    2055 ECB             : 
    2056                 : /*
    2057 EUB             :  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
    2058                 :  *
    2059                 :  * Send data of a transaction (and its subtransactions) to the
    2060                 :  * output plugin. We iterate over the top and subtransactions (using a k-way
    2061                 :  * merge) and replay the changes in lsn order.
    2062                 :  *
    2063                 :  * If streaming is true then data will be sent using stream API.
    2064                 :  *
    2065 ECB             :  * Note: "volatile" markers on some parameters are to avoid trouble with
    2066                 :  * PG_TRY inside the function.
    2067                 :  */
    2068                 : static void
    2069 GIC        1700 : ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
    2070 ECB             :                         XLogRecPtr commit_lsn,
    2071                 :                         volatile Snapshot snapshot_now,
    2072                 :                         volatile CommandId command_id,
    2073                 :                         bool streaming)
    2074                 : {
    2075                 :     bool        using_subtxn;
    2076 GIC        1700 :     MemoryContext ccxt = CurrentMemoryContext;
    2077            1700 :     ReorderBufferIterTXNState *volatile iterstate = NULL;
    2078            1700 :     volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
    2079            1700 :     ReorderBufferChange *volatile specinsert = NULL;
    2080            1700 :     volatile bool stream_started = false;
    2081            1700 :     ReorderBufferTXN *volatile curtxn = NULL;
    2082                 : 
    2083                 :     /* build data to be able to lookup the CommandIds of catalog tuples */
    2084            1700 :     ReorderBufferBuildTupleCidHash(rb, txn);
    2085 ECB             : 
    2086                 :     /* setup the initial snapshot */
    2087 GIC        1700 :     SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
    2088                 : 
    2089                 :     /*
    2090                 :      * Decoding needs access to syscaches et al., which in turn use
    2091                 :      * heavyweight locks and such. Thus we need to have enough state around to
    2092 ECB             :      * keep track of those.  The easiest way is to simply use a transaction
    2093                 :      * internally.  That also allows us to easily enforce that nothing writes
    2094                 :      * to the database by checking for xid assignments.
    2095                 :      *
    2096                 :      * When we're called via the SQL SRF there's already a transaction
    2097                 :      * started, so start an explicit subtransaction there.
    2098                 :      */
    2099 GIC        1700 :     using_subtxn = IsTransactionOrTransactionBlock();
    2100 ECB             : 
    2101 GIC        1700 :     PG_TRY();
    2102                 :     {
    2103 ECB             :         ReorderBufferChange *change;
    2104 GNC        1700 :         int         changes_count = 0;  /* used to accumulate the number of
    2105                 :                                          * changes */
    2106                 : 
    2107 GIC        1700 :         if (using_subtxn)
    2108             427 :             BeginInternalSubTransaction(streaming ? "stream" : "replay");
    2109                 :         else
    2110            1273 :             StartTransactionCommand();
    2111                 : 
    2112                 :         /*
    2113                 :          * We only need to send begin/begin-prepare for non-streamed
    2114                 :          * transactions.
    2115                 :          */
    2116            1700 :         if (!streaming)
    2117 ECB             :         {
    2118 GIC        1040 :             if (rbtxn_prepared(txn))
    2119 CBC          25 :                 rb->begin_prepare(rb, txn);
    2120                 :             else
    2121 GIC        1015 :                 rb->begin(rb, txn);
    2122 ECB             :         }
    2123                 : 
    2124 GIC        1700 :         ReorderBufferIterTXNInit(rb, txn, &iterstate);
    2125 CBC      356174 :         while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
    2126 ECB             :         {
    2127 GIC      352785 :             Relation    relation = NULL;
    2128 ECB             :             Oid         reloid;
    2129                 : 
    2130 GIC      352785 :             CHECK_FOR_INTERRUPTS();
    2131                 : 
    2132                 :             /*
    2133                 :              * We can't call start stream callback before processing first
    2134 ECB             :              * change.
    2135                 :              */
    2136 CBC      352785 :             if (prev_lsn == InvalidXLogRecPtr)
    2137 ECB             :             {
    2138 GIC        1666 :                 if (streaming)
    2139 ECB             :                 {
    2140 GIC         626 :                     txn->origin_id = change->origin_id;
    2141             626 :                     rb->stream_start(rb, txn, change->lsn);
    2142 CBC         626 :                     stream_started = true;
    2143 ECB             :                 }
    2144                 :             }
    2145                 : 
    2146                 :             /*
    2147                 :              * Enforce correct ordering of changes, merged from multiple
    2148                 :              * subtransactions. The changes may have the same LSN due to
    2149                 :              * MULTI_INSERT xlog records.
    2150                 :              */
    2151 GIC      352785 :             Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
    2152                 : 
    2153          352785 :             prev_lsn = change->lsn;
    2154 ECB             : 
    2155                 :             /*
    2156                 :              * Set the current xid to detect concurrent aborts. This is
    2157                 :              * required for the cases when we decode the changes before the
    2158                 :              * COMMIT record is processed.
    2159                 :              */
    2160 CBC      352785 :             if (streaming || rbtxn_prepared(change->txn))
    2161                 :             {
    2162 GIC      177674 :                 curtxn = change->txn;
    2163          177674 :                 SetupCheckXidLive(curtxn->xid);
    2164                 :             }
    2165                 : 
    2166          352785 :             switch (change->action)
    2167                 :             {
    2168            1782 :                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
    2169 ECB             : 
    2170                 :                     /*
    2171                 :                      * Confirmation for speculative insertion arrived. Simply
    2172                 :                      * use as a normal record. It'll be cleaned up at the end
    2173                 :                      * of INSERT processing.
    2174                 :                      */
    2175 GIC        1782 :                     if (specinsert == NULL)
    2176 UIC           0 :                         elog(ERROR, "invalid ordering of speculative insertion changes");
    2177 GIC        1782 :                     Assert(specinsert->data.tp.oldtuple == NULL);
    2178 CBC        1782 :                     change = specinsert;
    2179 GIC        1782 :                     change->action = REORDER_BUFFER_CHANGE_INSERT;
    2180 ECB             : 
    2181                 :                     /* intentionally fall through */
    2182 GIC      339295 :                 case REORDER_BUFFER_CHANGE_INSERT:
    2183                 :                 case REORDER_BUFFER_CHANGE_UPDATE:
    2184 ECB             :                 case REORDER_BUFFER_CHANGE_DELETE:
    2185 GIC      339295 :                     Assert(snapshot_now);
    2186 ECB             : 
    2187 GNC      339295 :                     reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
    2188                 :                                                   change->data.tp.rlocator.relNumber);
    2189                 : 
    2190                 :                     /*
    2191                 :                      * Mapped catalog tuple without data, emitted while
    2192                 :                      * catalog table was in the process of being rewritten. We
    2193                 :                      * can fail to look up the relfilenumber, because the
    2194 EUB             :                      * relmapper has no "historic" view, in contrast to the
    2195 ECB             :                      * normal catalog during decoding. Thus repeated rewrites
    2196                 :                      * can cause a lookup failure. That's OK because we do not
    2197                 :                      * decode catalog changes anyway. Normally such tuples
    2198                 :                      * would be skipped over below, but we can't identify
    2199                 :                      * whether the table should be logically logged without
    2200                 :                      * mapping the relfilenumber to the oid.
    2201                 :                      */
    2202 GIC      339288 :                     if (reloid == InvalidOid &&
    2203 CBC          76 :                         change->data.tp.newtuple == NULL &&
    2204 GIC          76 :                         change->data.tp.oldtuple == NULL)
    2205 CBC          76 :                         goto change_done;
    2206 GIC      339212 :                     else if (reloid == InvalidOid)
    2207 UNC           0 :                         elog(ERROR, "could not map filenumber \"%s\" to relation OID",
    2208                 :                              relpathperm(change->data.tp.rlocator,
    2209                 :                                          MAIN_FORKNUM));
    2210                 : 
    2211 GIC      339212 :                     relation = RelationIdGetRelation(reloid);
    2212                 : 
    2213          339212 :                     if (!RelationIsValid(relation))
    2214 UNC           0 :                         elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
    2215                 :                              reloid,
    2216                 :                              relpathperm(change->data.tp.rlocator,
    2217                 :                                          MAIN_FORKNUM));
    2218                 : 
    2219 GIC      339212 :                     if (!RelationIsLogicallyLogged(relation))
    2220 CBC        3550 :                         goto change_done;
    2221 ECB             : 
    2222                 :                     /*
    2223                 :                      * Ignore temporary heaps created during DDL unless the
    2224                 :                      * plugin has asked for them.
    2225 EUB             :                      */
    2226 GIC      335662 :                     if (relation->rd_rel->relrewrite && !rb->output_rewrites)
    2227              24 :                         goto change_done;
    2228                 : 
    2229 ECB             :                     /*
    2230                 :                      * For now ignore sequence changes entirely. Most of the
    2231                 :                      * time they don't log changes using records we
    2232 EUB             :                      * understand, so it doesn't make sense to handle the few
    2233                 :                      * cases we do.
    2234                 :                      */
    2235 GIC      335638 :                     if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
    2236 UIC           0 :                         goto change_done;
    2237 ECB             : 
    2238                 :                     /* user-triggered change */
    2239 GIC      335638 :                     if (!IsToastRelation(relation))
    2240                 :                     {
    2241          333687 :                         ReorderBufferToastReplace(rb, txn, relation, change);
    2242          333687 :                         ReorderBufferApplyChange(rb, txn, relation, change,
    2243                 :                                                  streaming);
    2244 ECB             : 
    2245                 :                         /*
    2246                 :                          * Only clear reassembled toast chunks if we're sure
    2247                 :                          * they're not required anymore. The creator of the
    2248                 :                          * tuple tells us.
    2249                 :                          */
    2250 GIC      333683 :                         if (change->data.tp.clear_toast_afterwards)
    2251          333462 :                             ReorderBufferToastReset(rb, txn);
    2252                 :                     }
    2253 ECB             :                     /* we're not interested in toast deletions */
    2254 GBC        1951 :                     else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
    2255                 :                     {
    2256                 :                         /*
    2257 ECB             :                          * Need to reassemble the full toasted Datum in
    2258                 :                          * memory, to ensure the chunks don't get reused till
    2259                 :                          * we're done remove it from the list of this
    2260                 :                          * transaction's changes. Otherwise it will get
    2261                 :                          * freed/reused while restoring spooled data from
    2262                 :                          * disk.
    2263                 :                          */
    2264 GIC        1720 :                         Assert(change->data.tp.newtuple != NULL);
    2265                 : 
    2266            1720 :                         dlist_delete(&change->node);
    2267            1720 :                         ReorderBufferToastAppendChunk(rb, txn, relation,
    2268 ECB             :                                                       change);
    2269                 :                     }
    2270                 : 
    2271 GIC         231 :             change_done:
    2272 ECB             : 
    2273                 :                     /*
    2274                 :                      * If speculative insertion was confirmed, the record
    2275                 :                      * isn't needed anymore.
    2276                 :                      */
    2277 GIC      339284 :                     if (specinsert != NULL)
    2278                 :                     {
    2279            1782 :                         ReorderBufferReturnChange(rb, specinsert, true);
    2280            1782 :                         specinsert = NULL;
    2281                 :                     }
    2282 ECB             : 
    2283 GIC      339284 :                     if (RelationIsValid(relation))
    2284 ECB             :                     {
    2285 CBC      339208 :                         RelationClose(relation);
    2286 GIC      339208 :                         relation = NULL;
    2287                 :                     }
    2288          339284 :                     break;
    2289 ECB             : 
    2290 GIC        1782 :                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
    2291                 : 
    2292                 :                     /*
    2293                 :                      * Speculative insertions are dealt with by delaying the
    2294                 :                      * processing of the insert until the confirmation record
    2295 ECB             :                      * arrives. For that we simply unlink the record from the
    2296                 :                      * chain, so it does not get freed/reused while restoring
    2297                 :                      * spooled data from disk.
    2298                 :                      *
    2299                 :                      * This is safe in the face of concurrent catalog changes
    2300                 :                      * because the relevant relation can't be changed between
    2301                 :                      * speculative insertion and confirmation due to
    2302                 :                      * CheckTableNotInUse() and locking.
    2303                 :                      */
    2304                 : 
    2305                 :                     /* clear out a pending (and thus failed) speculation */
    2306 CBC        1782 :                     if (specinsert != NULL)
    2307                 :                     {
    2308 LBC           0 :                         ReorderBufferReturnChange(rb, specinsert, true);
    2309 UIC           0 :                         specinsert = NULL;
    2310                 :                     }
    2311                 : 
    2312                 :                     /* and memorize the pending insertion */
    2313 GIC        1782 :                     dlist_delete(&change->node);
    2314            1782 :                     specinsert = change;
    2315            1782 :                     break;
    2316                 : 
    2317 UIC           0 :                 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
    2318                 : 
    2319                 :                     /*
    2320                 :                      * Abort for speculative insertion arrived. So cleanup the
    2321                 :                      * specinsert tuple and toast hash.
    2322                 :                      *
    2323                 :                      * Note that we get the spec abort change for each toast
    2324 ECB             :                      * entry but we need to perform the cleanup only the first
    2325                 :                      * time we get it for the main table.
    2326 EUB             :                      */
    2327 UBC           0 :                     if (specinsert != NULL)
    2328                 :                     {
    2329                 :                         /*
    2330                 :                          * We must clean the toast hash before processing a
    2331 ECB             :                          * completely new tuple to avoid confusion about the
    2332                 :                          * previous tuple's toast chunks.
    2333                 :                          */
    2334 UIC           0 :                         Assert(change->data.tp.clear_toast_afterwards);
    2335 UBC           0 :                         ReorderBufferToastReset(rb, txn);
    2336                 : 
    2337                 :                         /* We don't need this record anymore. */
    2338 UIC           0 :                         ReorderBufferReturnChange(rb, specinsert, true);
    2339               0 :                         specinsert = NULL;
    2340                 :                     }
    2341               0 :                     break;
    2342                 : 
    2343 GIC          17 :                 case REORDER_BUFFER_CHANGE_TRUNCATE:
    2344                 :                     {
    2345 EUB             :                         int         i;
    2346 GIC          17 :                         int         nrelids = change->data.truncate.nrelids;
    2347              17 :                         int         nrelations = 0;
    2348                 :                         Relation   *relations;
    2349                 : 
    2350              17 :                         relations = palloc0(nrelids * sizeof(Relation));
    2351              44 :                         for (i = 0; i < nrelids; i++)
    2352 EUB             :                         {
    2353 GBC          27 :                             Oid         relid = change->data.truncate.relids[i];
    2354                 :                             Relation    rel;
    2355                 : 
    2356 GNC          27 :                             rel = RelationIdGetRelation(relid);
    2357 EUB             : 
    2358 GNC          27 :                             if (!RelationIsValid(rel))
    2359 UBC           0 :                                 elog(ERROR, "could not open relation with OID %u", relid);
    2360                 : 
    2361 GNC          27 :                             if (!RelationIsLogicallyLogged(rel))
    2362 UIC           0 :                                 continue;
    2363                 : 
    2364 GNC          27 :                             relations[nrelations++] = rel;
    2365 ECB             :                         }
    2366                 : 
    2367                 :                         /* Apply the truncate. */
    2368 CBC          17 :                         ReorderBufferApplyTruncate(rb, txn, nrelations,
    2369 ECB             :                                                    relations, change,
    2370                 :                                                    streaming);
    2371                 : 
    2372 GIC          44 :                         for (i = 0; i < nrelations; i++)
    2373              27 :                             RelationClose(relations[i]);
    2374 ECB             : 
    2375 GIC          17 :                         break;
    2376 ECB             :                     }
    2377 EUB             : 
    2378 GIC          11 :                 case REORDER_BUFFER_CHANGE_MESSAGE:
    2379 CBC          11 :                     ReorderBufferApplyMessage(rb, txn, change, streaming);
    2380 GBC          11 :                     break;
    2381                 : 
    2382 CBC        1845 :                 case REORDER_BUFFER_CHANGE_INVALIDATION:
    2383                 :                     /* Execute the invalidation messages locally */
    2384 GIC        1845 :                     ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
    2385                 :                                                       change->data.inval.invalidations);
    2386 CBC        1845 :                     break;
    2387                 : 
    2388 GIC         460 :                 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
    2389                 :                     /* get rid of the old */
    2390 CBC         460 :                     TeardownHistoricSnapshot(false);
    2391 ECB             : 
    2392 GIC         460 :                     if (snapshot_now->copied)
    2393 ECB             :                     {
    2394 GIC         440 :                         ReorderBufferFreeSnap(rb, snapshot_now);
    2395             440 :                         snapshot_now =
    2396 CBC         440 :                             ReorderBufferCopySnap(rb, change->data.snapshot,
    2397 ECB             :                                                   txn, command_id);
    2398                 :                     }
    2399                 : 
    2400                 :                     /*
    2401                 :                      * Restored from disk, need to be careful not to double
    2402                 :                      * free. We could introduce refcounting for that, but for
    2403                 :                      * now this seems infrequent enough not to care.
    2404                 :                      */
    2405 GIC          20 :                     else if (change->data.snapshot->copied)
    2406 ECB             :                     {
    2407 UIC           0 :                         snapshot_now =
    2408 LBC           0 :                             ReorderBufferCopySnap(rb, change->data.snapshot,
    2409                 :                                                   txn, command_id);
    2410 ECB             :                     }
    2411                 :                     else
    2412                 :                     {
    2413 CBC          20 :                         snapshot_now = change->data.snapshot;
    2414 ECB             :                     }
    2415                 : 
    2416                 :                     /* and continue with the new one */
    2417 GIC         460 :                     SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
    2418             460 :                     break;
    2419                 : 
    2420            9375 :                 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
    2421            9375 :                     Assert(change->data.command_id != InvalidCommandId);
    2422                 : 
    2423 CBC        9375 :                     if (command_id < change->data.command_id)
    2424                 :                     {
    2425 GBC        1581 :                         command_id = change->data.command_id;
    2426 EUB             : 
    2427 GIC        1581 :                         if (!snapshot_now->copied)
    2428                 :                         {
    2429                 :                             /* we don't use the global one anymore */
    2430             434 :                             snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
    2431 ECB             :                                                                  txn, command_id);
    2432                 :                         }
    2433                 : 
    2434 GIC        1581 :                         snapshot_now->curcid = command_id;
    2435 ECB             : 
    2436 CBC        1581 :                         TeardownHistoricSnapshot(false);
    2437 GIC        1581 :                         SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
    2438 ECB             :                     }
    2439                 : 
    2440 GIC        9375 :                     break;
    2441 ECB             : 
    2442 UIC           0 :                 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
    2443 LBC           0 :                     elog(ERROR, "tuplecid value in changequeue");
    2444                 :                     break;
    2445 ECB             :             }
    2446                 : 
    2447                 :             /*
    2448                 :              * It is possible that the data is not sent to downstream for a
    2449                 :              * long time either because the output plugin filtered it or there
    2450                 :              * is a DDL that generates a lot of data that is not processed by
    2451                 :              * the plugin. So, in such cases, the downstream can timeout. To
    2452                 :              * avoid that we try to send a keepalive message if required.
    2453                 :              * Trying to send a keepalive message after every change has some
    2454                 :              * overhead, but testing showed there is no noticeable overhead if
    2455                 :              * we do it after every ~100 changes.
    2456                 :              */
    2457                 : #define CHANGES_THRESHOLD 100
    2458                 : 
    2459 GNC      352774 :             if (++changes_count >= CHANGES_THRESHOLD)
    2460                 :             {
    2461            3097 :                 rb->update_progress_txn(rb, txn, change->lsn);
    2462            3097 :                 changes_count = 0;
    2463                 :             }
    2464                 :         }
    2465                 : 
    2466 ECB             :         /* speculative insertion record must be freed by now */
    2467 GIC        1689 :         Assert(!specinsert);
    2468                 : 
    2469                 :         /* clean up the iterator */
    2470 CBC        1689 :         ReorderBufferIterTXNFinish(rb, iterstate);
    2471 GIC        1689 :         iterstate = NULL;
    2472 ECB             : 
    2473                 :         /*
    2474                 :          * Update total transaction count and total bytes processed by the
    2475                 :          * transaction and its subtransactions. Ensure to not count the
    2476                 :          * streamed transaction multiple times.
    2477                 :          *
    2478 EUB             :          * Note that the statistics computation has to be done after
    2479                 :          * ReorderBufferIterTXNFinish as it releases the serialized change
    2480                 :          * which we have already accounted in ReorderBufferIterTXNNext.
    2481                 :          */
    2482 GIC        1689 :         if (!rbtxn_is_streamed(txn))
    2483            1098 :             rb->totalTxns++;
    2484                 : 
    2485            1689 :         rb->totalBytes += txn->total_size;
    2486                 : 
    2487                 :         /*
    2488                 :          * Done with current changes, send the last message for this set of
    2489                 :          * changes depending upon streaming mode.
    2490                 :          */
    2491            1689 :         if (streaming)
    2492                 :         {
    2493             653 :             if (stream_started)
    2494                 :             {
    2495 CBC         619 :                 rb->stream_stop(rb, txn, prev_lsn);
    2496 GIC         619 :                 stream_started = false;
    2497 ECB             :             }
    2498                 :         }
    2499                 :         else
    2500                 :         {
    2501                 :             /*
    2502                 :              * Call either PREPARE (for two-phase transactions) or COMMIT (for
    2503                 :              * regular ones).
    2504                 :              */
    2505 GIC        1036 :             if (rbtxn_prepared(txn))
    2506 CBC          25 :                 rb->prepare(rb, txn, commit_lsn);
    2507 ECB             :             else
    2508 GIC        1011 :                 rb->commit(rb, txn, commit_lsn);
    2509                 :         }
    2510                 : 
    2511                 :         /* this is just a sanity check against bad output plugin behaviour */
    2512            1684 :         if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
    2513 UIC           0 :             elog(ERROR, "output plugin used XID %u",
    2514                 :                  GetCurrentTransactionId());
    2515                 : 
    2516                 :         /*
    2517                 :          * Remember the command ID and snapshot for the next set of changes in
    2518 ECB             :          * streaming mode.
    2519                 :          */
    2520 GIC        1684 :         if (streaming)
    2521 CBC         653 :             ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
    2522 GIC        1031 :         else if (snapshot_now->copied)
    2523             434 :             ReorderBufferFreeSnap(rb, snapshot_now);
    2524                 : 
    2525                 :         /* cleanup */
    2526            1684 :         TeardownHistoricSnapshot(false);
    2527 ECB             : 
    2528                 :         /*
    2529                 :          * Aborting the current (sub-)transaction as a whole has the right
    2530                 :          * semantics. We want all locks acquired in here to be released, not
    2531                 :          * reassigned to the parent and we do not want any database access
    2532                 :          * have persistent effects.
    2533                 :          */
    2534 GIC        1684 :         AbortCurrentTransaction();
    2535                 : 
    2536                 :         /* make sure there's no cache pollution */
    2537            1684 :         ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
    2538                 : 
    2539            1684 :         if (using_subtxn)
    2540             424 :             RollbackAndReleaseCurrentSubTransaction();
    2541 ECB             : 
    2542                 :         /*
    2543                 :          * We are here due to one of the four reasons: 1. Decoding an
    2544                 :          * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
    2545                 :          * prepared txn that was (partially) streamed. 4. Decoding a committed
    2546                 :          * txn.
    2547                 :          *
    2548                 :          * For 1, we allow truncation of txn data by removing the changes
    2549 EUB             :          * already streamed but still keeping other things like invalidations,
    2550                 :          * snapshot, and tuplecids. For 2 and 3, we indicate
    2551                 :          * ReorderBufferTruncateTXN to do more elaborate truncation of txn
    2552                 :          * data as the entire transaction has been decoded except for commit.
    2553                 :          * For 4, as the entire txn has been decoded, we can fully clean up
    2554                 :          * the TXN reorder buffer.
    2555                 :          */
    2556 CBC        1684 :         if (streaming || rbtxn_prepared(txn))
    2557 ECB             :         {
    2558 CBC         678 :             ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
    2559 ECB             :             /* Reset the CheckXidAlive */
    2560 GIC         678 :             CheckXidAlive = InvalidTransactionId;
    2561                 :         }
    2562 ECB             :         else
    2563 GIC        1006 :             ReorderBufferCleanupTXN(rb, txn);
    2564                 :     }
    2565              10 :     PG_CATCH();
    2566                 :     {
    2567              10 :         MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
    2568              10 :         ErrorData  *errdata = CopyErrorData();
    2569                 : 
    2570 ECB             :         /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
    2571 GIC          10 :         if (iterstate)
    2572              10 :             ReorderBufferIterTXNFinish(rb, iterstate);
    2573 ECB             : 
    2574 GIC          10 :         TeardownHistoricSnapshot(true);
    2575 ECB             : 
    2576                 :         /*
    2577                 :          * Force cache invalidation to happen outside of a valid transaction
    2578                 :          * to prevent catalog access as we just caught an error.
    2579                 :          */
    2580 GIC          10 :         AbortCurrentTransaction();
    2581                 : 
    2582                 :         /* make sure there's no cache pollution */
    2583              10 :         ReorderBufferExecuteInvalidations(txn->ninvalidations,
    2584                 :                                           txn->invalidations);
    2585                 : 
    2586              10 :         if (using_subtxn)
    2587               3 :             RollbackAndReleaseCurrentSubTransaction();
    2588                 : 
    2589                 :         /*
    2590                 :          * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
    2591                 :          * abort of the (sub)transaction we are streaming or preparing. We
    2592 ECB             :          * need to do the cleanup and return gracefully on this error, see
    2593                 :          * SetupCheckXidLive.
    2594                 :          *
    2595                 :          * This error code can be thrown by one of the callbacks we call
    2596                 :          * during decoding so we need to ensure that we return gracefully only
    2597                 :          * when we are sending the data in streaming mode and the streaming is
    2598                 :          * not finished yet or when we are sending the data out on a PREPARE
    2599                 :          * during a two-phase commit.
    2600                 :          */
    2601 CBC          10 :         if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
    2602 GIC           7 :             (stream_started || rbtxn_prepared(txn)))
    2603 ECB             :         {
    2604                 :             /* curtxn must be set for streaming or prepared transactions */
    2605 GIC           7 :             Assert(curtxn);
    2606                 : 
    2607 ECB             :             /* Cleanup the temporary error state. */
    2608 CBC           7 :             FlushErrorState();
    2609 GIC           7 :             FreeErrorData(errdata);
    2610 CBC           7 :             errdata = NULL;
    2611 GIC           7 :             curtxn->concurrent_abort = true;
    2612                 : 
    2613                 :             /* Reset the TXN so that it is allowed to stream remaining data. */
    2614               7 :             ReorderBufferResetTXN(rb, txn, snapshot_now,
    2615                 :                                   command_id, prev_lsn,
    2616 ECB             :                                   specinsert);
    2617                 :         }
    2618                 :         else
    2619                 :         {
    2620 GIC           3 :             ReorderBufferCleanupTXN(rb, txn);
    2621               3 :             MemoryContextSwitchTo(ecxt);
    2622 CBC           3 :             PG_RE_THROW();
    2623 ECB             :         }
    2624                 :     }
    2625 GIC        1691 :     PG_END_TRY();
    2626            1691 : }
    2627                 : 
    2628                 : /*
    2629                 :  * Perform the replay of a transaction and its non-aborted subtransactions.
    2630                 :  *
    2631                 :  * Subtransactions previously have to be processed by
    2632                 :  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
    2633                 :  * transaction with ReorderBufferAssignChild.
    2634                 :  *
    2635                 :  * This interface is called once a prepare or toplevel commit is read for both
    2636                 :  * streamed as well as non-streamed transactions.
    2637 ECB             :  */
    2638                 : static void
    2639 GIC        1102 : ReorderBufferReplay(ReorderBufferTXN *txn,
    2640                 :                     ReorderBuffer *rb, TransactionId xid,
    2641 ECB             :                     XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
    2642                 :                     TimestampTz commit_time,
    2643                 :                     RepOriginId origin_id, XLogRecPtr origin_lsn)
    2644                 : {
    2645                 :     Snapshot    snapshot_now;
    2646 CBC        1102 :     CommandId   command_id = FirstCommandId;
    2647 ECB             : 
    2648 GIC        1102 :     txn->final_lsn = commit_lsn;
    2649            1102 :     txn->end_lsn = end_lsn;
    2650 CBC        1102 :     txn->xact_time.commit_time = commit_time;
    2651 GIC        1102 :     txn->origin_id = origin_id;
    2652            1102 :     txn->origin_lsn = origin_lsn;
    2653                 : 
    2654                 :     /*
    2655                 :      * If the transaction was (partially) streamed, we need to commit it in a
    2656 ECB             :      * 'streamed' way. That is, we first stream the remaining part of the
    2657                 :      * transaction, and then invoke stream_commit message.
    2658                 :      *
    2659                 :      * Called after everything (origin ID, LSN, ...) is stored in the
    2660                 :      * transaction to avoid passing that information directly.
    2661                 :      */
    2662 CBC        1102 :     if (rbtxn_is_streamed(txn))
    2663                 :     {
    2664 GIC          61 :         ReorderBufferStreamCommit(rb, txn);
    2665              61 :         return;
    2666                 :     }
    2667                 : 
    2668                 :     /*
    2669                 :      * If this transaction has no snapshot, it didn't make any changes to the
    2670                 :      * database, so there's nothing to decode.  Note that
    2671                 :      * ReorderBufferCommitChild will have transferred any snapshots from
    2672                 :      * subtransactions if there were any.
    2673                 :      */
    2674            1041 :     if (txn->base_snapshot == NULL)
    2675 ECB             :     {
    2676 GIC           1 :         Assert(txn->ninvalidations == 0);
    2677                 : 
    2678                 :         /*
    2679                 :          * Removing this txn before a commit might result in the computation
    2680                 :          * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
    2681                 :          */
    2682 CBC           1 :         if (!rbtxn_prepared(txn))
    2683 GIC           1 :             ReorderBufferCleanupTXN(rb, txn);
    2684 CBC           1 :         return;
    2685 ECB             :     }
    2686                 : 
    2687 CBC        1040 :     snapshot_now = txn->base_snapshot;
    2688 ECB             : 
    2689                 :     /* Process and send the changes to output plugin. */
    2690 GIC        1040 :     ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
    2691                 :                             command_id, false);
    2692                 : }
    2693                 : 
    2694                 : /*
    2695                 :  * Commit a transaction.
    2696                 :  *
    2697                 :  * See comments for ReorderBufferReplay().
    2698 ECB             :  */
    2699                 : void
    2700 CBC        1066 : ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
    2701 ECB             :                     XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
    2702                 :                     TimestampTz commit_time,
    2703                 :                     RepOriginId origin_id, XLogRecPtr origin_lsn)
    2704                 : {
    2705                 :     ReorderBufferTXN *txn;
    2706                 : 
    2707 GIC        1066 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    2708                 :                                 false);
    2709                 : 
    2710 ECB             :     /* unknown transaction, nothing to replay */
    2711 GIC        1066 :     if (txn == NULL)
    2712 CBC           1 :         return;
    2713                 : 
    2714 GIC        1065 :     ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
    2715                 :                         origin_id, origin_lsn);
    2716                 : }
    2717                 : 
    2718 ECB             : /*
    2719                 :  * Record the prepare information for a transaction.
    2720                 :  */
    2721                 : bool
    2722 GIC         124 : ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
    2723 ECB             :                                  XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
    2724                 :                                  TimestampTz prepare_time,
    2725                 :                                  RepOriginId origin_id, XLogRecPtr origin_lsn)
    2726                 : {
    2727                 :     ReorderBufferTXN *txn;
    2728                 : 
    2729 GIC         124 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
    2730                 : 
    2731                 :     /* unknown transaction, nothing to do */
    2732             124 :     if (txn == NULL)
    2733 UIC           0 :         return false;
    2734                 : 
    2735                 :     /*
    2736 ECB             :      * Remember the prepare information to be later used by commit prepared in
    2737                 :      * case we skip doing prepare.
    2738                 :      */
    2739 GIC         124 :     txn->final_lsn = prepare_lsn;
    2740             124 :     txn->end_lsn = end_lsn;
    2741             124 :     txn->xact_time.prepare_time = prepare_time;
    2742             124 :     txn->origin_id = origin_id;
    2743 CBC         124 :     txn->origin_lsn = origin_lsn;
    2744                 : 
    2745 GIC         124 :     return true;
    2746                 : }
    2747 ECB             : 
    2748                 : /* Remember that we have skipped prepare */
    2749                 : void
    2750 CBC          88 : ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
    2751                 : {
    2752                 :     ReorderBufferTXN *txn;
    2753                 : 
    2754 GIC          88 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
    2755                 : 
    2756                 :     /* unknown transaction, nothing to do */
    2757              88 :     if (txn == NULL)
    2758 LBC           0 :         return;
    2759                 : 
    2760 GIC          88 :     txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
    2761                 : }
    2762                 : 
    2763                 : /*
    2764                 :  * Prepare a two-phase transaction.
    2765 ECB             :  *
    2766                 :  * See comments for ReorderBufferReplay().
    2767                 :  */
    2768                 : void
    2769 GBC          36 : ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
    2770                 :                      char *gid)
    2771                 : {
    2772                 :     ReorderBufferTXN *txn;
    2773                 : 
    2774 GIC          36 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    2775 ECB             :                                 false);
    2776                 : 
    2777                 :     /* unknown transaction, nothing to replay */
    2778 CBC          36 :     if (txn == NULL)
    2779 LBC           0 :         return;
    2780                 : 
    2781 CBC          36 :     txn->txn_flags |= RBTXN_PREPARE;
    2782 GIC          36 :     txn->gid = pstrdup(gid);
    2783                 : 
    2784                 :     /* The prepare info must have been updated in txn by now. */
    2785              36 :     Assert(txn->final_lsn != InvalidXLogRecPtr);
    2786 ECB             : 
    2787 GIC          36 :     ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
    2788              36 :                         txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
    2789                 : 
    2790 ECB             :     /*
    2791                 :      * We send the prepare for the concurrently aborted xacts so that later
    2792                 :      * when rollback prepared is decoded and sent, the downstream should be
    2793                 :      * able to rollback such a xact. See comments atop DecodePrepare.
    2794 EUB             :      *
    2795                 :      * Note, for the concurrent_abort + streaming case a stream_prepare was
    2796 ECB             :      * already sent within the ReorderBufferReplay call above.
    2797                 :      */
    2798 GIC          36 :     if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
    2799 UIC           0 :         rb->prepare(rb, txn, txn->final_lsn);
    2800                 : }
    2801                 : 
    2802                 : /*
    2803                 :  * This is used to handle COMMIT/ROLLBACK PREPARED.
    2804                 :  */
    2805 ECB             : void
    2806 GIC          38 : ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
    2807                 :                             XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
    2808                 :                             XLogRecPtr two_phase_at,
    2809                 :                             TimestampTz commit_time, RepOriginId origin_id,
    2810 ECB             :                             XLogRecPtr origin_lsn, char *gid, bool is_commit)
    2811                 : {
    2812                 :     ReorderBufferTXN *txn;
    2813                 :     XLogRecPtr  prepare_end_lsn;
    2814                 :     TimestampTz prepare_time;
    2815 EUB             : 
    2816 GIC          38 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
    2817 ECB             : 
    2818                 :     /* unknown transaction, nothing to do */
    2819 GIC          38 :     if (txn == NULL)
    2820 UIC           0 :         return;
    2821 ECB             : 
    2822                 :     /*
    2823                 :      * By this time the txn has the prepare record information, remember it to
    2824                 :      * be later used for rollback.
    2825                 :      */
    2826 GIC          38 :     prepare_end_lsn = txn->end_lsn;
    2827              38 :     prepare_time = txn->xact_time.prepare_time;
    2828                 : 
    2829                 :     /* add the gid in the txn */
    2830              38 :     txn->gid = pstrdup(gid);
    2831                 : 
    2832                 :     /*
    2833                 :      * It is possible that this transaction is not decoded at prepare time
    2834 ECB             :      * either because by that time we didn't have a consistent snapshot, or
    2835 EUB             :      * two_phase was not enabled, or it was decoded earlier but we have
    2836                 :      * restarted. We only need to send the prepare if it was not decoded
    2837                 :      * earlier. We don't need to decode the xact for aborts if it is not done
    2838                 :      * already.
    2839                 :      */
    2840 GIC          38 :     if ((txn->final_lsn < two_phase_at) && is_commit)
    2841                 :     {
    2842 CBC           1 :         txn->txn_flags |= RBTXN_PREPARE;
    2843                 : 
    2844                 :         /*
    2845                 :          * The prepare info must have been updated in txn even if we skip
    2846                 :          * prepare.
    2847                 :          */
    2848 GIC           1 :         Assert(txn->final_lsn != InvalidXLogRecPtr);
    2849                 : 
    2850                 :         /*
    2851                 :          * By this time the txn has the prepare record information and it is
    2852 ECB             :          * important to use that so that downstream gets the accurate
    2853                 :          * information. If instead, we have passed commit information here
    2854                 :          * then downstream can behave as it has already replayed commit
    2855                 :          * prepared after the restart.
    2856 EUB             :          */
    2857 GIC           1 :         ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
    2858               1 :                             txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
    2859                 :     }
    2860                 : 
    2861              38 :     txn->final_lsn = commit_lsn;
    2862 CBC          38 :     txn->end_lsn = end_lsn;
    2863              38 :     txn->xact_time.commit_time = commit_time;
    2864 GIC          38 :     txn->origin_id = origin_id;
    2865              38 :     txn->origin_lsn = origin_lsn;
    2866 ECB             : 
    2867 GIC          38 :     if (is_commit)
    2868              29 :         rb->commit_prepared(rb, txn, commit_lsn);
    2869                 :     else
    2870               9 :         rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
    2871                 : 
    2872                 :     /* cleanup: make sure there's no cache pollution */
    2873              38 :     ReorderBufferExecuteInvalidations(txn->ninvalidations,
    2874                 :                                       txn->invalidations);
    2875              38 :     ReorderBufferCleanupTXN(rb, txn);
    2876 ECB             : }
    2877                 : 
    2878                 : /*
    2879                 :  * Abort a transaction that possibly has previous changes. Needs to be first
    2880                 :  * called for subtransactions and then for the toplevel xid.
    2881                 :  *
    2882                 :  * NB: Transactions handled here have to have actively aborted (i.e. have
    2883                 :  * produced an abort record). Implicitly aborted transactions are handled via
    2884                 :  * ReorderBufferAbortOld(); transactions we're just not interested in, but
    2885                 :  * which have committed are handled in ReorderBufferForget().
    2886                 :  *
    2887                 :  * This function purges this transaction and its contents from memory and
    2888                 :  * disk.
    2889                 :  */
    2890                 : void
    2891 GNC         105 : ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
    2892                 :                    TimestampTz abort_time)
    2893                 : {
    2894 ECB             :     ReorderBufferTXN *txn;
    2895                 : 
    2896 GIC         105 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    2897                 :                                 false);
    2898 ECB             : 
    2899                 :     /* unknown, nothing to remove */
    2900 CBC         105 :     if (txn == NULL)
    2901 LBC           0 :         return;
    2902 ECB             : 
    2903 GNC         105 :     txn->xact_time.abort_time = abort_time;
    2904                 : 
    2905                 :     /* For streamed transactions notify the remote node about the abort. */
    2906 CBC         105 :     if (rbtxn_is_streamed(txn))
    2907 ECB             :     {
    2908 GIC          29 :         rb->stream_abort(rb, txn, lsn);
    2909 ECB             : 
    2910                 :         /*
    2911                 :          * We might have decoded changes for this transaction that could load
    2912                 :          * the cache as per the current transaction's view (consider DDL's
    2913                 :          * happened in this transaction). We don't want the decoding of future
    2914                 :          * transactions to use those cache entries so execute invalidations.
    2915                 :          */
    2916 GIC          29 :         if (txn->ninvalidations > 0)
    2917 UIC           0 :             ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
    2918                 :                                                txn->invalidations);
    2919                 :     }
    2920                 : 
    2921                 :     /* cosmetic... */
    2922 GIC         105 :     txn->final_lsn = lsn;
    2923                 : 
    2924                 :     /* remove potential on-disk data, and deallocate */
    2925             105 :     ReorderBufferCleanupTXN(rb, txn);
    2926                 : }
    2927                 : 
    2928                 : /*
    2929                 :  * Abort all transactions that aren't actually running anymore because the
    2930 ECB             :  * server restarted.
    2931                 :  *
    2932                 :  * NB: These really have to be transactions that have aborted due to a server
    2933                 :  * crash/immediate restart, as we don't deal with invalidations here.
    2934                 :  */
    2935                 : void
    2936 GIC        1024 : ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
    2937                 : {
    2938                 :     dlist_mutable_iter it;
    2939 ECB             : 
    2940 EUB             :     /*
    2941                 :      * Iterate through all (potential) toplevel TXNs and abort all that are
    2942 ECB             :      * older than what possibly can be running. Once we've found the first
    2943                 :      * that is alive we stop, there might be some that acquired an xid earlier
    2944                 :      * but started writing later, but it's unlikely and they will be cleaned
    2945                 :      * up in a later call to this function.
    2946                 :      */
    2947 CBC        1031 :     dlist_foreach_modify(it, &rb->toplevel_by_lsn)
    2948                 :     {
    2949                 :         ReorderBufferTXN *txn;
    2950                 : 
    2951 GIC          41 :         txn = dlist_container(ReorderBufferTXN, node, it.cur);
    2952                 : 
    2953              41 :         if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
    2954                 :         {
    2955 CBC           7 :             elog(DEBUG2, "aborting old transaction %u", txn->xid);
    2956 EUB             : 
    2957                 :             /* Notify the remote node about the crash/immediate restart. */
    2958 GIC           7 :             if (rbtxn_is_streamed(txn))
    2959 UIC           0 :                 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
    2960                 : 
    2961 ECB             :             /* remove potential on-disk data, and deallocate this tx */
    2962 GIC           7 :             ReorderBufferCleanupTXN(rb, txn);
    2963                 :         }
    2964 ECB             :         else
    2965 GIC          34 :             return;
    2966                 :     }
    2967                 : }
    2968                 : 
    2969                 : /*
    2970                 :  * Forget the contents of a transaction if we aren't interested in its
    2971                 :  * contents. Needs to be first called for subtransactions and then for the
    2972                 :  * toplevel xid.
    2973                 :  *
    2974                 :  * This is significantly different to ReorderBufferAbort() because
    2975 ECB             :  * transactions that have committed need to be treated differently from aborted
    2976                 :  * ones since they may have modified the catalog.
    2977                 :  *
    2978                 :  * Note that this is only allowed to be called in the moment a transaction
    2979                 :  * commit has just been read, not earlier; otherwise later records referring
    2980                 :  * to this xid might re-create the transaction incompletely.
    2981                 :  */
    2982                 : void
    2983 GIC        2429 : ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    2984                 : {
    2985                 :     ReorderBufferTXN *txn;
    2986 ECB             : 
    2987 GIC        2429 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    2988                 :                                 false);
    2989                 : 
    2990 ECB             :     /* unknown, nothing to forget */
    2991 GIC        2429 :     if (txn == NULL)
    2992 CBC         561 :         return;
    2993                 : 
    2994                 :     /* this transaction mustn't be streamed */
    2995 GNC        1868 :     Assert(!rbtxn_is_streamed(txn));
    2996 ECB             : 
    2997 EUB             :     /* cosmetic... */
    2998 GIC        1868 :     txn->final_lsn = lsn;
    2999                 : 
    3000 ECB             :     /*
    3001                 :      * Process cache invalidation messages if there are any. Even if we're not
    3002                 :      * interested in the transaction's contents, it could have manipulated the
    3003                 :      * catalog and we need to update the caches according to that.
    3004                 :      */
    3005 GIC        1868 :     if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
    3006             541 :         ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
    3007                 :                                            txn->invalidations);
    3008                 :     else
    3009            1327 :         Assert(txn->ninvalidations == 0);
    3010                 : 
    3011                 :     /* remove potential on-disk data, and deallocate */
    3012            1868 :     ReorderBufferCleanupTXN(rb, txn);
    3013                 : }
    3014                 : 
    3015                 : /*
    3016                 :  * Invalidate cache for those transactions that need to be skipped just in case
    3017                 :  * catalogs were manipulated as part of the transaction.
    3018                 :  *
    3019                 :  * Note that this is a special-purpose function for prepared transactions where
    3020                 :  * we don't want to clean up the TXN even when we decide to skip it. See
    3021 ECB             :  * DecodePrepare.
    3022                 :  */
    3023                 : void
    3024 GIC          85 : ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    3025 ECB             : {
    3026                 :     ReorderBufferTXN *txn;
    3027                 : 
    3028 GIC          85 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    3029 ECB             :                                 false);
    3030                 : 
    3031                 :     /* unknown, nothing to do */
    3032 GIC          85 :     if (txn == NULL)
    3033 LBC           0 :         return;
    3034                 : 
    3035                 :     /*
    3036 ECB             :      * Process cache invalidation messages if there are any. Even if we're not
    3037                 :      * interested in the transaction's contents, it could have manipulated the
    3038                 :      * catalog and we need to update the caches according to that.
    3039                 :      */
    3040 GIC          85 :     if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
    3041              23 :         ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
    3042                 :                                            txn->invalidations);
    3043 ECB             :     else
    3044 CBC          62 :         Assert(txn->ninvalidations == 0);
    3045                 : }
    3046                 : 
    3047 ECB             : 
    3048                 : /*
    3049                 :  * Execute invalidations happening outside the context of a decoded
    3050                 :  * transaction. That currently happens either for xid-less commits
    3051                 :  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
    3052                 :  * transactions (via ReorderBufferForget()).
    3053                 :  */
    3054                 : void
    3055 GIC         566 : ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
    3056                 :                                    SharedInvalidationMessage *invalidations)
    3057                 : {
    3058             566 :     bool        use_subtxn = IsTransactionOrTransactionBlock();
    3059                 :     int         i;
    3060                 : 
    3061             566 :     if (use_subtxn)
    3062 CBC         397 :         BeginInternalSubTransaction("replay");
    3063                 : 
    3064                 :     /*
    3065                 :      * Force invalidations to happen outside of a valid transaction - that way
    3066 ECB             :      * entries will just be marked as invalid without accessing the catalog.
    3067                 :      * That's advantageous because we don't need to setup the full state
    3068                 :      * necessary for catalog access.
    3069                 :      */
    3070 CBC         566 :     if (use_subtxn)
    3071 GBC         397 :         AbortCurrentTransaction();
    3072                 : 
    3073 GIC       24954 :     for (i = 0; i < ninvalidations; i++)
    3074           24388 :         LocalExecuteInvalidationMessage(&invalidations[i]);
    3075                 : 
    3076             566 :     if (use_subtxn)
    3077             397 :         RollbackAndReleaseCurrentSubTransaction();
    3078 CBC         566 : }
    3079 ECB             : 
    3080                 : /*
    3081                 :  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
    3082                 :  * least once for every xid in XLogRecord->xl_xid (other places in records
    3083                 :  * may, but do not have to be passed through here).
    3084                 :  *
    3085                 :  * Reorderbuffer keeps some datastructures about transactions in LSN order,
    3086                 :  * for efficiency. To do that it has to know about when transactions are seen
    3087                 :  * first in the WAL. As many types of records are not actually interesting for
    3088                 :  * logical decoding, they do not necessarily pass though here.
    3089                 :  */
    3090                 : void
    3091 GIC     2465084 : ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    3092                 : {
    3093 ECB             :     /* many records won't have an xid assigned, centralize check here */
    3094 GIC     2465084 :     if (xid != InvalidTransactionId)
    3095         2463336 :         ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    3096 CBC     2465084 : }
    3097                 : 
    3098                 : /*
    3099 ECB             :  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
    3100                 :  * because the previous snapshot doesn't describe the catalog correctly for
    3101                 :  * following rows.
    3102                 :  */
    3103                 : void
    3104 GIC        1002 : ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
    3105                 :                          XLogRecPtr lsn, Snapshot snap)
    3106                 : {
    3107            1002 :     ReorderBufferChange *change = ReorderBufferGetChange(rb);
    3108 ECB             : 
    3109 CBC        1002 :     change->data.snapshot = snap;
    3110 GIC        1002 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
    3111 ECB             : 
    3112 CBC        1002 :     ReorderBufferQueueChange(rb, xid, lsn, change, false);
    3113 GIC        1002 : }
    3114 ECB             : 
    3115                 : /*
    3116                 :  * Set up the transaction's base snapshot.
    3117                 :  *
    3118                 :  * If we know that xid is a subtransaction, set the base snapshot on the
    3119                 :  * top-level transaction instead.
    3120                 :  */
    3121                 : void
    3122 GIC        2607 : ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
    3123                 :                              XLogRecPtr lsn, Snapshot snap)
    3124                 : {
    3125                 :     ReorderBufferTXN *txn;
    3126                 :     bool        is_new;
    3127                 : 
    3128 GNC        2607 :     Assert(snap != NULL);
    3129 ECB             : 
    3130                 :     /*
    3131                 :      * Fetch the transaction to operate on.  If we know it's a subtransaction,
    3132                 :      * operate on its top-level transaction instead.
    3133                 :      */
    3134 CBC        2607 :     txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
    3135 GIC        2607 :     if (rbtxn_is_known_subxact(txn))
    3136             120 :         txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
    3137                 :                                     NULL, InvalidXLogRecPtr, false);
    3138            2607 :     Assert(txn->base_snapshot == NULL);
    3139                 : 
    3140            2607 :     txn->base_snapshot = snap;
    3141            2607 :     txn->base_snapshot_lsn = lsn;
    3142 CBC        2607 :     dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
    3143                 : 
    3144 GIC        2607 :     AssertTXNLsnOrder(rb);
    3145 CBC        2607 : }
    3146                 : 
    3147 ECB             : /*
    3148                 :  * Access the catalog with this CommandId at this point in the changestream.
    3149                 :  *
    3150                 :  * May only be called for command ids > 1
    3151                 :  */
    3152                 : void
    3153 GIC       22808 : ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
    3154                 :                              XLogRecPtr lsn, CommandId cid)
    3155                 : {
    3156           22808 :     ReorderBufferChange *change = ReorderBufferGetChange(rb);
    3157                 : 
    3158           22808 :     change->data.command_id = cid;
    3159           22808 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
    3160 ECB             : 
    3161 GIC       22808 :     ReorderBufferQueueChange(rb, xid, lsn, change, false);
    3162           22808 : }
    3163                 : 
    3164                 : /*
    3165                 :  * Update memory counters to account for the new or removed change.
    3166 ECB             :  *
    3167                 :  * We update two counters - in the reorder buffer, and in the transaction
    3168                 :  * containing the change. The reorder buffer counter allows us to quickly
    3169                 :  * decide if we reached the memory limit, the transaction counter allows
    3170                 :  * us to quickly pick the largest transaction for eviction.
    3171                 :  *
    3172                 :  * When streaming is enabled, we need to update the toplevel transaction
    3173                 :  * counters instead - we don't really care about subtransactions as we
    3174                 :  * can't stream them individually anyway, and we only pick toplevel
    3175                 :  * transactions for eviction. So only toplevel transactions matter.
    3176                 :  */
    3177                 : static void
    3178 CBC     3807299 : ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
    3179 ECB             :                                 ReorderBufferChange *change,
    3180                 :                                 bool addition, Size sz)
    3181                 : {
    3182                 :     ReorderBufferTXN *txn;
    3183                 :     ReorderBufferTXN *toptxn;
    3184                 : 
    3185 GIC     3807299 :     Assert(change->txn);
    3186                 : 
    3187                 :     /*
    3188                 :      * Ignore tuple CID changes, because those are not evicted when reaching
    3189                 :      * memory limit. So we just don't count them, because it might easily
    3190                 :      * trigger a pointless attempt to spill.
    3191 ECB             :      */
    3192 GIC     3807299 :     if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
    3193           22692 :         return;
    3194 ECB             : 
    3195 GIC     3784607 :     txn = change->txn;
    3196 ECB             : 
    3197                 :     /*
    3198                 :      * Update the total size in top level as well. This is later used to
    3199                 :      * compute the decoding stats.
    3200                 :      */
    3201 GNC     3784607 :     toptxn = rbtxn_get_toptxn(txn);
    3202                 : 
    3203 GIC     3784607 :     if (addition)
    3204                 :     {
    3205         1892376 :         txn->size += sz;
    3206         1892376 :         rb->size += sz;
    3207                 : 
    3208                 :         /* Update the total size in the top transaction. */
    3209         1892376 :         toptxn->total_size += sz;
    3210                 :     }
    3211                 :     else
    3212                 :     {
    3213 CBC     1892231 :         Assert((rb->size >= sz) && (txn->size >= sz));
    3214 GIC     1892231 :         txn->size -= sz;
    3215         1892231 :         rb->size -= sz;
    3216                 : 
    3217                 :         /* Update the total size in the top transaction. */
    3218         1892231 :         toptxn->total_size -= sz;
    3219                 :     }
    3220 ECB             : 
    3221 GIC     3784607 :     Assert(txn->size <= rb->size);
    3222                 : }
    3223                 : 
    3224                 : /*
    3225                 :  * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
    3226                 :  *
    3227 ECB             :  * We do not include this change type in memory accounting, because we
    3228                 :  * keep CIDs in a separate list and do not evict them when reaching
    3229                 :  * the memory limit.
    3230                 :  */
    3231                 : void
    3232 GIC       22808 : ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
    3233                 :                              XLogRecPtr lsn, RelFileLocator locator,
    3234                 :                              ItemPointerData tid, CommandId cmin,
    3235                 :                              CommandId cmax, CommandId combocid)
    3236 ECB             : {
    3237 GIC       22808 :     ReorderBufferChange *change = ReorderBufferGetChange(rb);
    3238 ECB             :     ReorderBufferTXN *txn;
    3239                 : 
    3240 CBC       22808 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    3241 ECB             : 
    3242 GNC       22808 :     change->data.tuplecid.locator = locator;
    3243 GIC       22808 :     change->data.tuplecid.tid = tid;
    3244 CBC       22808 :     change->data.tuplecid.cmin = cmin;
    3245 GIC       22808 :     change->data.tuplecid.cmax = cmax;
    3246           22808 :     change->data.tuplecid.combocid = combocid;
    3247           22808 :     change->lsn = lsn;
    3248 CBC       22808 :     change->txn = txn;
    3249           22808 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
    3250 ECB             : 
    3251 GIC       22808 :     dlist_push_tail(&txn->tuplecids, &change->node);
    3252           22808 :     txn->ntuplecids++;
    3253 CBC       22808 : }
    3254                 : 
    3255                 : /*
    3256 ECB             :  * Accumulate the invalidations for executing them later.
    3257                 :  *
    3258                 :  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
    3259                 :  * accumulates all the invalidation messages in the toplevel transaction, if
    3260                 :  * available, otherwise in the current transaction, as well as in the form of
    3261                 :  * change in reorder buffer.  We require to record it in form of the change
    3262                 :  * so that we can execute only the required invalidations instead of executing
    3263                 :  * all the invalidations on each CommandId increment.  We also need to
    3264                 :  * accumulate these in the txn buffer because in some cases where we skip
    3265                 :  * processing the transaction (see ReorderBufferForget), we need to execute
    3266                 :  * all the invalidations together.
    3267                 :  */
    3268                 : void
    3269 GIC        4559 : ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
    3270                 :                               XLogRecPtr lsn, Size nmsgs,
    3271                 :                               SharedInvalidationMessage *msgs)
    3272 ECB             : {
    3273                 :     ReorderBufferTXN *txn;
    3274                 :     MemoryContext oldcontext;
    3275                 :     ReorderBufferChange *change;
    3276                 : 
    3277 CBC        4559 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    3278 ECB             : 
    3279 CBC        4559 :     oldcontext = MemoryContextSwitchTo(rb->context);
    3280 ECB             : 
    3281                 :     /*
    3282                 :      * Collect all the invalidations under the top transaction, if available,
    3283                 :      * so that we can execute them all together.  See comments atop this
    3284                 :      * function.
    3285                 :      */
    3286 GNC        4559 :     txn = rbtxn_get_toptxn(txn);
    3287 ECB             : 
    3288 GIC        4559 :     Assert(nmsgs > 0);
    3289                 : 
    3290                 :     /* Accumulate invalidations. */
    3291            4559 :     if (txn->ninvalidations == 0)
    3292                 :     {
    3293             995 :         txn->ninvalidations = nmsgs;
    3294             995 :         txn->invalidations = (SharedInvalidationMessage *)
    3295             995 :             palloc(sizeof(SharedInvalidationMessage) * nmsgs);
    3296             995 :         memcpy(txn->invalidations, msgs,
    3297                 :                sizeof(SharedInvalidationMessage) * nmsgs);
    3298                 :     }
    3299                 :     else
    3300                 :     {
    3301            3564 :         txn->invalidations = (SharedInvalidationMessage *)
    3302            3564 :             repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
    3303 CBC        3564 :                      (txn->ninvalidations + nmsgs));
    3304                 : 
    3305 GIC        3564 :         memcpy(txn->invalidations + txn->ninvalidations, msgs,
    3306                 :                nmsgs * sizeof(SharedInvalidationMessage));
    3307            3564 :         txn->ninvalidations += nmsgs;
    3308                 :     }
    3309                 : 
    3310            4559 :     change = ReorderBufferGetChange(rb);
    3311 CBC        4559 :     change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
    3312 GIC        4559 :     change->data.inval.ninvalidations = nmsgs;
    3313 CBC        4559 :     change->data.inval.invalidations = (SharedInvalidationMessage *)
    3314 GIC        4559 :         palloc(sizeof(SharedInvalidationMessage) * nmsgs);
    3315            4559 :     memcpy(change->data.inval.invalidations, msgs,
    3316                 :            sizeof(SharedInvalidationMessage) * nmsgs);
    3317                 : 
    3318            4559 :     ReorderBufferQueueChange(rb, xid, lsn, change, false);
    3319                 : 
    3320 CBC        4559 :     MemoryContextSwitchTo(oldcontext);
    3321 GIC        4559 : }
    3322 ECB             : 
    3323                 : /*
    3324                 :  * Apply all invalidations we know. Possibly we only need parts at this point
    3325                 :  * in the changestream but we don't know which those are.
    3326                 :  */
    3327                 : static void
    3328 CBC        3577 : ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
    3329 ECB             : {
    3330                 :     int         i;
    3331                 : 
    3332 GIC       37885 :     for (i = 0; i < nmsgs; i++)
    3333           34308 :         LocalExecuteInvalidationMessage(&msgs[i]);
    3334            3577 : }
    3335 ECB             : 
    3336                 : /*
    3337                 :  * Mark a transaction as containing catalog changes
    3338                 :  */
    3339                 : void
    3340 GIC       28337 : ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
    3341 ECB             :                                   XLogRecPtr lsn)
    3342                 : {
    3343                 :     ReorderBufferTXN *txn;
    3344                 : 
    3345 CBC       28337 :     txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
    3346 ECB             : 
    3347 GNC       28337 :     if (!rbtxn_has_catalog_changes(txn))
    3348                 :     {
    3349            1022 :         txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
    3350            1022 :         dclist_push_tail(&rb->catchange_txns, &txn->catchange_node);
    3351                 :     }
    3352 ECB             : 
    3353                 :     /*
    3354                 :      * Mark top-level transaction as having catalog changes too if one of its
    3355                 :      * children has so that the ReorderBufferBuildTupleCidHash can
    3356                 :      * conveniently check just top-level transaction and decide whether to
    3357                 :      * build the hash table or not.
    3358                 :      */
    3359 GNC       28337 :     if (rbtxn_is_subtxn(txn))
    3360                 :     {
    3361             903 :         ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
    3362                 : 
    3363             903 :         if (!rbtxn_has_catalog_changes(toptxn))
    3364                 :         {
    3365              18 :             toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
    3366              18 :             dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
    3367                 :         }
    3368                 :     }
    3369           28337 : }
    3370                 : 
    3371                 : /*
    3372                 :  * Return palloc'ed array of the transactions that have changed catalogs.
    3373                 :  * The returned array is sorted in xidComparator order.
    3374                 :  *
    3375                 :  * The caller must free the returned array when done with it.
    3376                 :  */
    3377                 : TransactionId *
    3378             208 : ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
    3379                 : {
    3380                 :     dlist_iter  iter;
    3381             208 :     TransactionId *xids = NULL;
    3382             208 :     size_t      xcnt = 0;
    3383                 : 
    3384                 :     /* Quick return if the list is empty */
    3385             208 :     if (dclist_count(&rb->catchange_txns) == 0)
    3386             202 :         return NULL;
    3387                 : 
    3388                 :     /* Initialize XID array */
    3389               6 :     xids = (TransactionId *) palloc(sizeof(TransactionId) *
    3390               6 :                                     dclist_count(&rb->catchange_txns));
    3391              14 :     dclist_foreach(iter, &rb->catchange_txns)
    3392                 :     {
    3393               8 :         ReorderBufferTXN *txn = dclist_container(ReorderBufferTXN,
    3394                 :                                                  catchange_node,
    3395                 :                                                  iter.cur);
    3396                 : 
    3397               8 :         Assert(rbtxn_has_catalog_changes(txn));
    3398                 : 
    3399               8 :         xids[xcnt++] = txn->xid;
    3400                 :     }
    3401                 : 
    3402               6 :     qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
    3403                 : 
    3404               6 :     Assert(xcnt == dclist_count(&rb->catchange_txns));
    3405               6 :     return xids;
    3406                 : }
    3407                 : 
    3408                 : /*
    3409                 :  * Query whether a transaction is already *known* to contain catalog
    3410                 :  * changes. This can be wrong until directly before the commit!
    3411 ECB             :  */
    3412                 : bool
    3413 GIC        3790 : ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
    3414                 : {
    3415 ECB             :     ReorderBufferTXN *txn;
    3416                 : 
    3417 CBC        3790 :     txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
    3418                 :                                 false);
    3419 GIC        3790 :     if (txn == NULL)
    3420             646 :         return false;
    3421                 : 
    3422            3144 :     return rbtxn_has_catalog_changes(txn);
    3423 ECB             : }
    3424                 : 
    3425                 : /*
    3426                 :  * ReorderBufferXidHasBaseSnapshot
    3427                 :  *      Have we already set the base snapshot for the given txn/subtxn?
    3428                 :  */
    3429                 : bool
    3430 CBC     1696415 : ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
    3431                 : {
    3432 ECB             :     ReorderBufferTXN *txn;
    3433                 : 
    3434 GIC     1696415 :     txn = ReorderBufferTXNByXid(rb, xid, false,
    3435                 :                                 NULL, InvalidXLogRecPtr, false);
    3436                 : 
    3437                 :     /* transaction isn't known yet, ergo no snapshot */
    3438         1696415 :     if (txn == NULL)
    3439               3 :         return false;
    3440                 : 
    3441                 :     /* a known subtxn? operate on top-level txn instead */
    3442 CBC     1696412 :     if (rbtxn_is_known_subxact(txn))
    3443 GIC      491936 :         txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
    3444 ECB             :                                     NULL, InvalidXLogRecPtr, false);
    3445                 : 
    3446 CBC     1696412 :     return txn->base_snapshot != NULL;
    3447                 : }
    3448 ECB             : 
    3449                 : 
    3450                 : /*
    3451                 :  * ---------------------------------------
    3452                 :  * Disk serialization support
    3453                 :  * ---------------------------------------
    3454                 :  */
    3455                 : 
    3456                 : /*
    3457                 :  * Ensure the IO buffer is >= sz.
    3458                 :  */
    3459                 : static void
    3460 GIC     3309901 : ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
    3461 ECB             : {
    3462 GIC     3309901 :     if (!rb->outbufsize)
    3463                 :     {
    3464 CBC          49 :         rb->outbuf = MemoryContextAlloc(rb->context, sz);
    3465              49 :         rb->outbufsize = sz;
    3466                 :     }
    3467 GIC     3309852 :     else if (rb->outbufsize < sz)
    3468 ECB             :     {
    3469 CBC         300 :         rb->outbuf = repalloc(rb->outbuf, sz);
    3470 GIC         300 :         rb->outbufsize = sz;
    3471                 :     }
    3472 CBC     3309901 : }
    3473 ECB             : 
    3474                 : /*
    3475                 :  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
    3476                 :  *
    3477                 :  * XXX With many subtransactions this might be quite slow, because we'll have
    3478                 :  * to walk through all of them. There are some options how we could improve
    3479                 :  * that: (a) maintain some secondary structure with transactions sorted by
    3480                 :  * amount of changes, (b) not looking for the entirely largest transaction,
    3481                 :  * but e.g. for transaction using at least some fraction of the memory limit,
    3482                 :  * and (c) evicting multiple transactions at once, e.g. to free a given portion
    3483                 :  * of the memory limit (e.g. 50%).
    3484                 :  */
    3485                 : static ReorderBufferTXN *
    3486 GIC        3698 : ReorderBufferLargestTXN(ReorderBuffer *rb)
    3487 ECB             : {
    3488                 :     HASH_SEQ_STATUS hash_seq;
    3489                 :     ReorderBufferTXNByIdEnt *ent;
    3490 GIC        3698 :     ReorderBufferTXN *largest = NULL;
    3491                 : 
    3492            3698 :     hash_seq_init(&hash_seq, rb->by_txn);
    3493            9509 :     while ((ent = hash_seq_search(&hash_seq)) != NULL)
    3494                 :     {
    3495            5811 :         ReorderBufferTXN *txn = ent->txn;
    3496 ECB             : 
    3497                 :         /* if the current transaction is larger, remember it */
    3498 GIC        5811 :         if ((!largest) || (txn->size > largest->size))
    3499            4756 :             largest = txn;
    3500 ECB             :     }
    3501                 : 
    3502 CBC        3698 :     Assert(largest);
    3503            3698 :     Assert(largest->size > 0);
    3504 GIC        3698 :     Assert(largest->size <= rb->size);
    3505 ECB             : 
    3506 GIC        3698 :     return largest;
    3507                 : }
    3508                 : 
    3509                 : /*
    3510                 :  * Find the largest streamable toplevel transaction to evict (by streaming).
    3511                 :  *
    3512                 :  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
    3513 ECB             :  * should give us the same transaction (because we don't update memory account
    3514                 :  * for subtransaction with streaming, so it's always 0). But we can simply
    3515                 :  * iterate over the limited number of toplevel transactions that have a base
    3516                 :  * snapshot. There is no use of selecting a transaction that doesn't have base
    3517                 :  * snapshot because we don't decode such transactions.  Also, we do not select
    3518                 :  * the transaction which doesn't have any streamable change.
    3519                 :  *
    3520                 :  * Note that, we skip transactions that contains incomplete changes. There
    3521                 :  * is a scope of optimization here such that we can select the largest
    3522                 :  * transaction which has incomplete changes.  But that will make the code and
    3523                 :  * design quite complex and that might not be worth the benefit.  If we plan to
    3524                 :  * stream the transactions that contains incomplete changes then we need to
    3525                 :  * find a way to partially stream/truncate the transaction changes in-memory
    3526                 :  * and build a mechanism to partially truncate the spilled files.
    3527                 :  * Additionally, whenever we partially stream the transaction we need to
    3528                 :  * maintain the last streamed lsn and next time we need to restore from that
    3529                 :  * segment and the offset in WAL.  As we stream the changes from the top
    3530                 :  * transaction and restore them subtransaction wise, we need to even remember
    3531                 :  * the subxact from where we streamed the last change.
    3532                 :  */
    3533                 : static ReorderBufferTXN *
    3534 GNC         637 : ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
    3535                 : {
    3536                 :     dlist_iter  iter;
    3537 GIC         637 :     Size        largest_size = 0;
    3538             637 :     ReorderBufferTXN *largest = NULL;
    3539                 : 
    3540                 :     /* Find the largest top-level transaction having a base snapshot. */
    3541            1387 :     dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
    3542                 :     {
    3543                 :         ReorderBufferTXN *txn;
    3544 ECB             : 
    3545 GIC         750 :         txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
    3546 ECB             : 
    3547                 :         /* must not be a subtxn */
    3548 CBC         750 :         Assert(!rbtxn_is_known_subxact(txn));
    3549 ECB             :         /* base_snapshot must be set */
    3550 GIC         750 :         Assert(txn->base_snapshot != NULL);
    3551 ECB             : 
    3552 GIC         750 :         if ((largest == NULL || txn->total_size > largest_size) &&
    3553 GNC         750 :             (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
    3554             659 :             rbtxn_has_streamable_change(txn))
    3555 ECB             :         {
    3556 GIC         659 :             largest = txn;
    3557 CBC         659 :             largest_size = txn->total_size;
    3558                 :         }
    3559                 :     }
    3560                 : 
    3561 GIC         637 :     return largest;
    3562                 : }
    3563                 : 
    3564                 : /*
    3565                 :  * Check whether the logical_decoding_work_mem limit was reached, and if yes
    3566                 :  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
    3567                 :  * disk or send to the output plugin until we reach under the memory limit.
    3568                 :  *
    3569                 :  * If logical_replication_mode is set to "immediate", stream or serialize the
    3570                 :  * changes immediately.
    3571                 :  *
    3572                 :  * XXX At this point we select the transactions until we reach under the memory
    3573                 :  * limit, but we might also adapt a more elaborate eviction strategy - for example
    3574 ECB             :  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
    3575                 :  * limit.
    3576                 :  */
    3577                 : static void
    3578 CBC     1713655 : ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
    3579                 : {
    3580 ECB             :     ReorderBufferTXN *txn;
    3581                 : 
    3582                 :     /*
    3583                 :      * Bail out if logical_replication_mode is buffered and we haven't exceeded
    3584                 :      * the memory limit.
    3585                 :      */
    3586 GNC     1713655 :     if (logical_replication_mode == LOGICAL_REP_MODE_BUFFERED &&
    3587         1713241 :         rb->size < logical_decoding_work_mem * 1024L)
    3588 GIC     1709364 :         return;
    3589                 : 
    3590 ECB             :     /*
    3591                 :      * If logical_replication_mode is immediate, loop until there's no change.
    3592                 :      * Otherwise, loop until we reach under the memory limit. One might think
    3593                 :      * that just by evicting the largest (sub)transaction we will come under
    3594                 :      * the memory limit based on assumption that the selected transaction is
    3595                 :      * at least as large as the most recent change (which caused us to go over
    3596                 :      * the memory limit). However, that is not true because a user can reduce
    3597                 :      * the logical_decoding_work_mem to a smaller value before the most recent
    3598                 :      * change.
    3599                 :      */
    3600 GNC        8582 :     while (rb->size >= logical_decoding_work_mem * 1024L ||
    3601            4705 :            (logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE &&
    3602             828 :             rb->size > 0))
    3603                 :     {
    3604                 :         /*
    3605                 :          * Pick the largest transaction (or subtransaction) and evict it from
    3606                 :          * memory by streaming, if possible.  Otherwise, spill to disk.
    3607                 :          */
    3608 GIC        4928 :         if (ReorderBufferCanStartStreaming(rb) &&
    3609 GNC         637 :             (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
    3610                 :         {
    3611                 :             /* we know there has to be one, because the size is not zero */
    3612             593 :             Assert(txn && rbtxn_is_toptxn(txn));
    3613 GIC         593 :             Assert(txn->total_size > 0);
    3614             593 :             Assert(rb->size >= txn->total_size);
    3615                 : 
    3616             593 :             ReorderBufferStreamTXN(rb, txn);
    3617                 :         }
    3618                 :         else
    3619                 :         {
    3620                 :             /*
    3621                 :              * Pick the largest transaction (or subtransaction) and evict it
    3622                 :              * from memory by serializing it to disk.
    3623                 :              */
    3624            3698 :             txn = ReorderBufferLargestTXN(rb);
    3625                 : 
    3626                 :             /* we know there has to be one, because the size is not zero */
    3627            3698 :             Assert(txn);
    3628            3698 :             Assert(txn->size > 0);
    3629 CBC        3698 :             Assert(rb->size >= txn->size);
    3630                 : 
    3631 GIC        3698 :             ReorderBufferSerializeTXN(rb, txn);
    3632 ECB             :         }
    3633                 : 
    3634                 :         /*
    3635                 :          * After eviction, the transaction should have no entries in memory,
    3636                 :          * and should use 0 bytes for changes.
    3637                 :          */
    3638 GIC        4291 :         Assert(txn->size == 0);
    3639            4291 :         Assert(txn->nentries_mem == 0);
    3640 ECB             :     }
    3641                 : 
    3642                 :     /* We must be under the memory limit now. */
    3643 CBC        4291 :     Assert(rb->size < logical_decoding_work_mem * 1024L);
    3644                 : }
    3645 ECB             : 
    3646                 : /*
    3647                 :  * Spill data of a large transaction (and its subtransactions) to disk.
    3648                 :  */
    3649                 : static void
    3650 GIC        4003 : ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    3651 ECB             : {
    3652                 :     dlist_iter  subtxn_i;
    3653                 :     dlist_mutable_iter change_i;
    3654 GIC        4003 :     int         fd = -1;
    3655            4003 :     XLogSegNo   curOpenSegNo = 0;
    3656 CBC        4003 :     Size        spilled = 0;
    3657 GIC        4003 :     Size        size = txn->size;
    3658                 : 
    3659            4003 :     elog(DEBUG2, "spill %u changes in XID %u to disk",
    3660                 :          (uint32) txn->nentries_mem, txn->xid);
    3661                 : 
    3662                 :     /* do the same to all child TXs */
    3663            4271 :     dlist_foreach(subtxn_i, &txn->subtxns)
    3664                 :     {
    3665                 :         ReorderBufferTXN *subtxn;
    3666                 : 
    3667             268 :         subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
    3668             268 :         ReorderBufferSerializeTXN(rb, subtxn);
    3669                 :     }
    3670                 : 
    3671                 :     /* serialize changestream */
    3672         1489026 :     dlist_foreach_modify(change_i, &txn->changes)
    3673 ECB             :     {
    3674                 :         ReorderBufferChange *change;
    3675                 : 
    3676 GIC     1485023 :         change = dlist_container(ReorderBufferChange, node, change_i.cur);
    3677                 : 
    3678                 :         /*
    3679                 :          * store in segment in which it belongs by start lsn, don't split over
    3680                 :          * multiple segments tho
    3681 ECB             :          */
    3682 CBC     1485023 :         if (fd == -1 ||
    3683         1481271 :             !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
    3684                 :         {
    3685                 :             char        path[MAXPGPATH];
    3686                 : 
    3687 GIC        3756 :             if (fd != -1)
    3688               4 :                 CloseTransientFile(fd);
    3689                 : 
    3690            3756 :             XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
    3691                 : 
    3692                 :             /*
    3693                 :              * No need to care about TLIs here, only used during a single run,
    3694                 :              * so each LSN only maps to a specific WAL record.
    3695 ECB             :              */
    3696 CBC        3756 :             ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
    3697 ECB             :                                         curOpenSegNo);
    3698                 : 
    3699                 :             /* open segment, create it if necessary */
    3700 GIC        3756 :             fd = OpenTransientFile(path,
    3701                 :                                    O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
    3702                 : 
    3703 CBC        3756 :             if (fd < 0)
    3704 LBC           0 :                 ereport(ERROR,
    3705                 :                         (errcode_for_file_access(),
    3706                 :                          errmsg("could not open file \"%s\": %m", path)));
    3707 ECB             :         }
    3708                 : 
    3709 CBC     1485023 :         ReorderBufferSerializeChange(rb, txn, fd, change);
    3710 GIC     1485023 :         dlist_delete(&change->node);
    3711 CBC     1485023 :         ReorderBufferReturnChange(rb, change, true);
    3712                 : 
    3713 GIC     1485023 :         spilled++;
    3714                 :     }
    3715                 : 
    3716                 :     /* update the statistics iff we have spilled anything */
    3717            4003 :     if (spilled)
    3718                 :     {
    3719 CBC        3752 :         rb->spillCount += 1;
    3720 GIC        3752 :         rb->spillBytes += size;
    3721                 : 
    3722 ECB             :         /* don't consider already serialized transactions */
    3723 CBC        3752 :         rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
    3724 ECB             : 
    3725                 :         /* update the decoding stats */
    3726 CBC        3752 :         UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
    3727                 :     }
    3728                 : 
    3729 GIC        4003 :     Assert(spilled == txn->nentries_mem);
    3730            4003 :     Assert(dlist_is_empty(&txn->changes));
    3731            4003 :     txn->nentries_mem = 0;
    3732            4003 :     txn->txn_flags |= RBTXN_IS_SERIALIZED;
    3733 ECB             : 
    3734 CBC        4003 :     if (fd != -1)
    3735 GIC        3752 :         CloseTransientFile(fd);
    3736            4003 : }
    3737                 : 
    3738 ECB             : /*
    3739                 :  * Serialize individual change to disk.
    3740                 :  */
    3741                 : static void
    3742 GIC     1485023 : ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
    3743                 :                              int fd, ReorderBufferChange *change)
    3744                 : {
    3745 ECB             :     ReorderBufferDiskChange *ondisk;
    3746 GIC     1485023 :     Size        sz = sizeof(ReorderBufferDiskChange);
    3747                 : 
    3748         1485023 :     ReorderBufferSerializeReserve(rb, sz);
    3749 ECB             : 
    3750 CBC     1485023 :     ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    3751         1485023 :     memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
    3752 ECB             : 
    3753 GIC     1485023 :     switch (change->action)
    3754 ECB             :     {
    3755                 :             /* fall through these, they're all similar enough */
    3756 GIC     1467750 :         case REORDER_BUFFER_CHANGE_INSERT:
    3757                 :         case REORDER_BUFFER_CHANGE_UPDATE:
    3758 ECB             :         case REORDER_BUFFER_CHANGE_DELETE:
    3759                 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
    3760                 :             {
    3761                 :                 char       *data;
    3762                 :                 ReorderBufferTupleBuf *oldtup,
    3763                 :                            *newtup;
    3764 GIC     1467750 :                 Size        oldlen = 0;
    3765         1467750 :                 Size        newlen = 0;
    3766                 : 
    3767 CBC     1467750 :                 oldtup = change->data.tp.oldtuple;
    3768 GIC     1467750 :                 newtup = change->data.tp.newtuple;
    3769                 : 
    3770         1467750 :                 if (oldtup)
    3771 ECB             :                 {
    3772 GIC      160127 :                     sz += sizeof(HeapTupleData);
    3773          160127 :                     oldlen = oldtup->tuple.t_len;
    3774          160127 :                     sz += oldlen;
    3775                 :                 }
    3776                 : 
    3777 CBC     1467750 :                 if (newtup)
    3778 ECB             :                 {
    3779 GIC     1253921 :                     sz += sizeof(HeapTupleData);
    3780         1253921 :                     newlen = newtup->tuple.t_len;
    3781         1253921 :                     sz += newlen;
    3782 ECB             :                 }
    3783                 : 
    3784                 :                 /* make sure we have enough space */
    3785 CBC     1467750 :                 ReorderBufferSerializeReserve(rb, sz);
    3786                 : 
    3787 GIC     1467750 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    3788                 :                 /* might have been reallocated above */
    3789         1467750 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    3790                 : 
    3791 CBC     1467750 :                 if (oldlen)
    3792                 :                 {
    3793 GIC      160127 :                     memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
    3794          160127 :                     data += sizeof(HeapTupleData);
    3795 ECB             : 
    3796 GIC      160127 :                     memcpy(data, oldtup->tuple.t_data, oldlen);
    3797          160127 :                     data += oldlen;
    3798 ECB             :                 }
    3799 EUB             : 
    3800 GIC     1467750 :                 if (newlen)
    3801                 :                 {
    3802         1253921 :                     memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
    3803         1253921 :                     data += sizeof(HeapTupleData);
    3804 ECB             : 
    3805 CBC     1253921 :                     memcpy(data, newtup->tuple.t_data, newlen);
    3806         1253921 :                     data += newlen;
    3807                 :                 }
    3808         1467750 :                 break;
    3809                 :             }
    3810 GIC          19 :         case REORDER_BUFFER_CHANGE_MESSAGE:
    3811                 :             {
    3812 ECB             :                 char       *data;
    3813 GIC          19 :                 Size        prefix_size = strlen(change->data.msg.prefix) + 1;
    3814 ECB             : 
    3815 CBC          19 :                 sz += prefix_size + change->data.msg.message_size +
    3816                 :                     sizeof(Size) + sizeof(Size);
    3817 GIC          19 :                 ReorderBufferSerializeReserve(rb, sz);
    3818 ECB             : 
    3819 GIC          19 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    3820                 : 
    3821 ECB             :                 /* might have been reallocated above */
    3822 GIC          19 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    3823                 : 
    3824 ECB             :                 /* write the prefix including the size */
    3825 CBC          19 :                 memcpy(data, &prefix_size, sizeof(Size));
    3826              19 :                 data += sizeof(Size);
    3827              19 :                 memcpy(data, change->data.msg.prefix,
    3828                 :                        prefix_size);
    3829              19 :                 data += prefix_size;
    3830 ECB             : 
    3831                 :                 /* write the message including the size */
    3832 GIC          19 :                 memcpy(data, &change->data.msg.message_size, sizeof(Size));
    3833              19 :                 data += sizeof(Size);
    3834              19 :                 memcpy(data, change->data.msg.message,
    3835                 :                        change->data.msg.message_size);
    3836              19 :                 data += change->data.msg.message_size;
    3837 ECB             : 
    3838 GIC          19 :                 break;
    3839                 :             }
    3840             117 :         case REORDER_BUFFER_CHANGE_INVALIDATION:
    3841 ECB             :             {
    3842                 :                 char       *data;
    3843 CBC         117 :                 Size        inval_size = sizeof(SharedInvalidationMessage) *
    3844 GIC         117 :                 change->data.inval.ninvalidations;
    3845 ECB             : 
    3846 CBC         117 :                 sz += inval_size;
    3847                 : 
    3848             117 :                 ReorderBufferSerializeReserve(rb, sz);
    3849 GIC         117 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    3850                 : 
    3851 ECB             :                 /* might have been reallocated above */
    3852 GIC         117 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    3853             117 :                 memcpy(data, change->data.inval.invalidations, inval_size);
    3854             117 :                 data += inval_size;
    3855                 : 
    3856             117 :                 break;
    3857                 :             }
    3858               2 :         case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
    3859 ECB             :             {
    3860                 :                 Snapshot    snap;
    3861                 :                 char       *data;
    3862                 : 
    3863 CBC           2 :                 snap = change->data.snapshot;
    3864                 : 
    3865               2 :                 sz += sizeof(SnapshotData) +
    3866 GIC           2 :                     sizeof(TransactionId) * snap->xcnt +
    3867 CBC           2 :                     sizeof(TransactionId) * snap->subxcnt;
    3868 ECB             : 
    3869                 :                 /* make sure we have enough space */
    3870 GIC           2 :                 ReorderBufferSerializeReserve(rb, sz);
    3871               2 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    3872 ECB             :                 /* might have been reallocated above */
    3873 GIC           2 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    3874 ECB             : 
    3875 CBC           2 :                 memcpy(data, snap, sizeof(SnapshotData));
    3876               2 :                 data += sizeof(SnapshotData);
    3877                 : 
    3878 GIC           2 :                 if (snap->xcnt)
    3879                 :                 {
    3880 CBC           2 :                     memcpy(data, snap->xip,
    3881 GIC           2 :                            sizeof(TransactionId) * snap->xcnt);
    3882 CBC           2 :                     data += sizeof(TransactionId) * snap->xcnt;
    3883                 :                 }
    3884 ECB             : 
    3885 GIC           2 :                 if (snap->subxcnt)
    3886 ECB             :                 {
    3887 UIC           0 :                     memcpy(data, snap->subxip,
    3888 LBC           0 :                            sizeof(TransactionId) * snap->subxcnt);
    3889               0 :                     data += sizeof(TransactionId) * snap->subxcnt;
    3890                 :                 }
    3891 CBC           2 :                 break;
    3892 ECB             :             }
    3893 UIC           0 :         case REORDER_BUFFER_CHANGE_TRUNCATE:
    3894                 :             {
    3895 ECB             :                 Size        size;
    3896                 :                 char       *data;
    3897                 : 
    3898                 :                 /* account for the OIDs of truncated relations */
    3899 UIC           0 :                 size = sizeof(Oid) * change->data.truncate.nrelids;
    3900 LBC           0 :                 sz += size;
    3901 ECB             : 
    3902                 :                 /* make sure we have enough space */
    3903 LBC           0 :                 ReorderBufferSerializeReserve(rb, sz);
    3904                 : 
    3905               0 :                 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
    3906                 :                 /* might have been reallocated above */
    3907 UIC           0 :                 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    3908 ECB             : 
    3909 UIC           0 :                 memcpy(data, change->data.truncate.relids, size);
    3910 LBC           0 :                 data += size;
    3911                 : 
    3912               0 :                 break;
    3913                 :             }
    3914 CBC       17135 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
    3915                 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
    3916                 :         case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
    3917 ECB             :         case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
    3918                 :             /* ReorderBufferChange contains everything important */
    3919 GIC       17135 :             break;
    3920 ECB             :     }
    3921                 : 
    3922 CBC     1485023 :     ondisk->size = sz;
    3923                 : 
    3924         1485023 :     errno = 0;
    3925 GIC     1485023 :     pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
    3926         1485023 :     if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
    3927 ECB             :     {
    3928 LBC           0 :         int         save_errno = errno;
    3929 ECB             : 
    3930 UIC           0 :         CloseTransientFile(fd);
    3931 ECB             : 
    3932                 :         /* if write didn't set errno, assume problem is no disk space */
    3933 LBC           0 :         errno = save_errno ? save_errno : ENOSPC;
    3934 UIC           0 :         ereport(ERROR,
    3935 ECB             :                 (errcode_for_file_access(),
    3936                 :                  errmsg("could not write to data file for XID %u: %m",
    3937                 :                         txn->xid)));
    3938                 :     }
    3939 CBC     1485023 :     pgstat_report_wait_end();
    3940                 : 
    3941 ECB             :     /*
    3942                 :      * Keep the transaction's final_lsn up to date with each change we send to
    3943                 :      * disk, so that ReorderBufferRestoreCleanup works correctly.  (We used to
    3944                 :      * only do this on commit and abort records, but that doesn't work if a
    3945                 :      * system crash leaves a transaction without its abort record).
    3946                 :      *
    3947                 :      * Make sure not to move it backwards.
    3948                 :      */
    3949 CBC     1485023 :     if (txn->final_lsn < change->lsn)
    3950 GIC     1480548 :         txn->final_lsn = change->lsn;
    3951 ECB             : 
    3952 GIC     1485023 :     Assert(ondisk->change.action == change->action);
    3953 CBC     1485023 : }
    3954                 : 
    3955                 : /* Returns true, if the output plugin supports streaming, false, otherwise. */
    3956                 : static inline bool
    3957 GIC     2205003 : ReorderBufferCanStream(ReorderBuffer *rb)
    3958 ECB             : {
    3959 GIC     2205003 :     LogicalDecodingContext *ctx = rb->private_data;
    3960 ECB             : 
    3961 CBC     2205003 :     return ctx->streaming;
    3962 ECB             : }
    3963                 : 
    3964                 : /* Returns true, if the streaming can be started now, false, otherwise. */
    3965                 : static inline bool
    3966 CBC      491348 : ReorderBufferCanStartStreaming(ReorderBuffer *rb)
    3967                 : {
    3968          491348 :     LogicalDecodingContext *ctx = rb->private_data;
    3969 GIC      491348 :     SnapBuild  *builder = ctx->snapshot_builder;
    3970 ECB             : 
    3971                 :     /* We can't start streaming unless a consistent state is reached. */
    3972 GIC      491348 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
    3973 LBC           0 :         return false;
    3974                 : 
    3975 ECB             :     /*
    3976                 :      * We can't start streaming immediately even if the streaming is enabled
    3977                 :      * because we previously decoded this transaction and now just are
    3978                 :      * restarting.
    3979                 :      */
    3980 CBC      491348 :     if (ReorderBufferCanStream(rb) &&
    3981 GNC      488688 :         !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
    3982 GBC      158469 :         return true;
    3983 EUB             : 
    3984 GBC      332879 :     return false;
    3985                 : }
    3986 ECB             : 
    3987                 : /*
    3988 EUB             :  * Send data of a large transaction (and its subtransactions) to the
    3989                 :  * output plugin, but using the stream API.
    3990                 :  */
    3991                 : static void
    3992 GIC         660 : ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    3993                 : {
    3994 EUB             :     Snapshot    snapshot_now;
    3995                 :     CommandId   command_id;
    3996                 :     Size        stream_bytes;
    3997                 :     bool        txn_is_streamed;
    3998                 : 
    3999                 :     /* We can never reach here for a subtransaction. */
    4000 GNC         660 :     Assert(rbtxn_is_toptxn(txn));
    4001                 : 
    4002 EUB             :     /*
    4003                 :      * We can't make any assumptions about base snapshot here, similar to what
    4004                 :      * ReorderBufferCommit() does. That relies on base_snapshot getting
    4005                 :      * transferred from subxact in ReorderBufferCommitChild(), but that was
    4006                 :      * not yet called as the transaction is in-progress.
    4007                 :      *
    4008                 :      * So just walk the subxacts and use the same logic here. But we only need
    4009 ECB             :      * to do that once, when the transaction is streamed for the first time.
    4010                 :      * After that we need to reuse the snapshot from the previous run.
    4011                 :      *
    4012                 :      * Unlike DecodeCommit which adds xids of all the subtransactions in
    4013                 :      * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
    4014                 :      * but we do add them to subxip array instead via ReorderBufferCopySnap.
    4015                 :      * This allows the catalog changes made in subtransactions decoded till
    4016                 :      * now to be visible.
    4017                 :      */
    4018 GIC         660 :     if (txn->snapshot_now == NULL)
    4019 ECB             :     {
    4020                 :         dlist_iter  subxact_i;
    4021                 : 
    4022                 :         /* make sure this transaction is streamed for the first time */
    4023 GBC          66 :         Assert(!rbtxn_is_streamed(txn));
    4024                 : 
    4025 EUB             :         /* at the beginning we should have invalid command ID */
    4026 GIC          66 :         Assert(txn->command_id == InvalidCommandId);
    4027                 : 
    4028 GBC          70 :         dlist_foreach(subxact_i, &txn->subtxns)
    4029 EUB             :         {
    4030                 :             ReorderBufferTXN *subtxn;
    4031                 : 
    4032 GIC           4 :             subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
    4033               4 :             ReorderBufferTransferSnapToParent(txn, subtxn);
    4034 ECB             :         }
    4035                 : 
    4036                 :         /*
    4037                 :          * If this transaction has no snapshot, it didn't make any changes to
    4038                 :          * the database till now, so there's nothing to decode.
    4039                 :          */
    4040 GIC          66 :         if (txn->base_snapshot == NULL)
    4041                 :         {
    4042 UIC           0 :             Assert(txn->ninvalidations == 0);
    4043               0 :             return;
    4044 ECB             :         }
    4045                 : 
    4046 GIC          66 :         command_id = FirstCommandId;
    4047 CBC          66 :         snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
    4048 ECB             :                                              txn, command_id);
    4049                 :     }
    4050                 :     else
    4051                 :     {
    4052                 :         /* the transaction must have been already streamed */
    4053 GIC         594 :         Assert(rbtxn_is_streamed(txn));
    4054 ECB             : 
    4055                 :         /*
    4056                 :          * Nah, we already have snapshot from the previous streaming run. We
    4057                 :          * assume new subxacts can't move the LSN backwards, and so can't beat
    4058                 :          * the LSN condition in the previous branch (so no need to walk
    4059                 :          * through subxacts again). In fact, we must not do that as we may be
    4060                 :          * using snapshot half-way through the subxact.
    4061                 :          */
    4062 GIC         594 :         command_id = txn->command_id;
    4063 ECB             : 
    4064                 :         /*
    4065                 :          * We can't use txn->snapshot_now directly because after the last
    4066                 :          * streaming run, we might have got some new sub-transactions. So we
    4067                 :          * need to add them to the snapshot.
    4068 EUB             :          */
    4069 GIC         594 :         snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
    4070                 :                                              txn, command_id);
    4071                 : 
    4072                 :         /* Free the previously copied snapshot. */
    4073             594 :         Assert(txn->snapshot_now->copied);
    4074             594 :         ReorderBufferFreeSnap(rb, txn->snapshot_now);
    4075 CBC         594 :         txn->snapshot_now = NULL;
    4076 ECB             :     }
    4077                 : 
    4078                 :     /*
    4079                 :      * Remember this information to be used later to update stats. We can't
    4080                 :      * update the stats here as an error while processing the changes would
    4081                 :      * lead to the accumulation of stats even though we haven't streamed all
    4082                 :      * the changes.
    4083                 :      */
    4084 GIC         660 :     txn_is_streamed = rbtxn_is_streamed(txn);
    4085             660 :     stream_bytes = txn->total_size;
    4086                 : 
    4087 ECB             :     /* Process and send the changes to output plugin. */
    4088 GIC         660 :     ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
    4089                 :                             command_id, true);
    4090                 : 
    4091             660 :     rb->streamCount += 1;
    4092             660 :     rb->streamBytes += stream_bytes;
    4093                 : 
    4094                 :     /* Don't consider already streamed transaction. */
    4095 CBC         660 :     rb->streamTxns += (txn_is_streamed) ? 0 : 1;
    4096                 : 
    4097                 :     /* update the decoding stats */
    4098 GIC         660 :     UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
    4099                 : 
    4100             660 :     Assert(dlist_is_empty(&txn->changes));
    4101             660 :     Assert(txn->nentries == 0);
    4102             660 :     Assert(txn->nentries_mem == 0);
    4103                 : }
    4104                 : 
    4105                 : /*
    4106                 :  * Size of a change in memory.
    4107                 :  */
    4108                 : static Size
    4109         3807299 : ReorderBufferChangeSize(ReorderBufferChange *change)
    4110                 : {
    4111         3807299 :     Size        sz = sizeof(ReorderBufferChange);
    4112                 : 
    4113 CBC     3807299 :     switch (change->action)
    4114                 :     {
    4115                 :             /* fall through these, they're all similar enough */
    4116 GIC     3688274 :         case REORDER_BUFFER_CHANGE_INSERT:
    4117                 :         case REORDER_BUFFER_CHANGE_UPDATE:
    4118 ECB             :         case REORDER_BUFFER_CHANGE_DELETE:
    4119                 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
    4120                 :             {
    4121                 :                 ReorderBufferTupleBuf *oldtup,
    4122                 :                            *newtup;
    4123 CBC     3688274 :                 Size        oldlen = 0;
    4124 GIC     3688274 :                 Size        newlen = 0;
    4125                 : 
    4126         3688274 :                 oldtup = change->data.tp.oldtuple;
    4127 CBC     3688274 :                 newtup = change->data.tp.newtuple;
    4128 ECB             : 
    4129 GIC     3688274 :                 if (oldtup)
    4130                 :                 {
    4131          421844 :                     sz += sizeof(HeapTupleData);
    4132          421844 :                     oldlen = oldtup->tuple.t_len;
    4133          421844 :                     sz += oldlen;
    4134                 :                 }
    4135 ECB             : 
    4136 GIC     3688274 :                 if (newtup)
    4137 EUB             :                 {
    4138 GBC     3129918 :                     sz += sizeof(HeapTupleData);
    4139 GIC     3129918 :                     newlen = newtup->tuple.t_len;
    4140         3129918 :                     sz += newlen;
    4141 ECB             :                 }
    4142                 : 
    4143 GIC     3688274 :                 break;
    4144                 :             }
    4145              78 :         case REORDER_BUFFER_CHANGE_MESSAGE:
    4146                 :             {
    4147              78 :                 Size        prefix_size = strlen(change->data.msg.prefix) + 1;
    4148 ECB             : 
    4149 GIC          78 :                 sz += prefix_size + change->data.msg.message_size +
    4150                 :                     sizeof(Size) + sizeof(Size);
    4151                 : 
    4152              78 :                 break;
    4153                 :             }
    4154            9115 :         case REORDER_BUFFER_CHANGE_INVALIDATION:
    4155                 :             {
    4156            9115 :                 sz += sizeof(SharedInvalidationMessage) *
    4157 CBC        9115 :                     change->data.inval.ninvalidations;
    4158 GIC        9115 :                 break;
    4159                 :             }
    4160            2002 :         case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
    4161                 :             {
    4162                 :                 Snapshot    snap;
    4163                 : 
    4164 CBC        2002 :                 snap = change->data.snapshot;
    4165                 : 
    4166 GIC        2002 :                 sz += sizeof(SnapshotData) +
    4167            2002 :                     sizeof(TransactionId) * snap->xcnt +
    4168 CBC        2002 :                     sizeof(TransactionId) * snap->subxcnt;
    4169 ECB             : 
    4170 CBC        2002 :                 break;
    4171                 :             }
    4172 GIC          74 :         case REORDER_BUFFER_CHANGE_TRUNCATE:
    4173                 :             {
    4174              74 :                 sz += sizeof(Oid) * change->data.truncate.nrelids;
    4175                 : 
    4176              74 :                 break;
    4177                 :             }
    4178          107756 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
    4179 ECB             :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
    4180                 :         case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
    4181                 :         case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
    4182                 :             /* ReorderBufferChange contains everything important */
    4183 CBC      107756 :             break;
    4184                 :     }
    4185                 : 
    4186         3807299 :     return sz;
    4187 ECB             : }
    4188                 : 
    4189                 : 
    4190                 : /*
    4191                 :  * Restore a number of changes spilled to disk back into memory.
    4192                 :  */
    4193                 : static Size
    4194 GIC         102 : ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
    4195 ECB             :                             TXNEntryFile *file, XLogSegNo *segno)
    4196                 : {
    4197 CBC         102 :     Size        restored = 0;
    4198                 :     XLogSegNo   last_segno;
    4199                 :     dlist_mutable_iter cleanup_iter;
    4200 GIC         102 :     File       *fd = &file->vfd;
    4201                 : 
    4202             102 :     Assert(txn->first_lsn != InvalidXLogRecPtr);
    4203             102 :     Assert(txn->final_lsn != InvalidXLogRecPtr);
    4204 ECB             : 
    4205                 :     /* free current entries, so we have memory for more */
    4206 CBC      175059 :     dlist_foreach_modify(cleanup_iter, &txn->changes)
    4207                 :     {
    4208          174957 :         ReorderBufferChange *cleanup =
    4209 GIC      174957 :         dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
    4210                 : 
    4211 CBC      174957 :         dlist_delete(&cleanup->node);
    4212 GIC      174957 :         ReorderBufferReturnChange(rb, cleanup, true);
    4213                 :     }
    4214             102 :     txn->nentries_mem = 0;
    4215             102 :     Assert(dlist_is_empty(&txn->changes));
    4216                 : 
    4217             102 :     XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
    4218 ECB             : 
    4219 CBC      178616 :     while (restored < max_changes_in_memory && *segno <= last_segno)
    4220                 :     {
    4221 ECB             :         int         readBytes;
    4222                 :         ReorderBufferDiskChange *ondisk;
    4223                 : 
    4224 CBC      178514 :         CHECK_FOR_INTERRUPTS();
    4225                 : 
    4226          178514 :         if (*fd == -1)
    4227 ECB             :         {
    4228                 :             char        path[MAXPGPATH];
    4229                 : 
    4230                 :             /* first time in */
    4231 CBC          38 :             if (*segno == 0)
    4232 GIC          37 :                 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
    4233 ECB             : 
    4234 CBC          38 :             Assert(*segno != 0 || dlist_is_empty(&txn->changes));
    4235 ECB             : 
    4236                 :             /*
    4237                 :              * No need to care about TLIs here, only used during a single run,
    4238                 :              * so each LSN only maps to a specific WAL record.
    4239                 :              */
    4240 CBC          38 :             ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
    4241                 :                                         *segno);
    4242 ECB             : 
    4243 GIC          38 :             *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
    4244 ECB             : 
    4245                 :             /* No harm in resetting the offset even in case of failure */
    4246 GIC          38 :             file->curOffset = 0;
    4247 ECB             : 
    4248 GIC          38 :             if (*fd < 0 && errno == ENOENT)
    4249 ECB             :             {
    4250 UIC           0 :                 *fd = -1;
    4251 LBC           0 :                 (*segno)++;
    4252               0 :                 continue;
    4253 ECB             :             }
    4254 GIC          38 :             else if (*fd < 0)
    4255 LBC           0 :                 ereport(ERROR,
    4256                 :                         (errcode_for_file_access(),
    4257                 :                          errmsg("could not open file \"%s\": %m",
    4258                 :                                 path)));
    4259 ECB             :         }
    4260                 : 
    4261                 :         /*
    4262                 :          * Read the statically sized part of a change which has information
    4263                 :          * about the total size. If we couldn't read a record, we're at the
    4264                 :          * end of this file.
    4265                 :          */
    4266 GIC      178514 :         ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
    4267 CBC      178514 :         readBytes = FileRead(file->vfd, rb->outbuf,
    4268                 :                              sizeof(ReorderBufferDiskChange),
    4269 ECB             :                              file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
    4270                 : 
    4271                 :         /* eof */
    4272 GIC      178514 :         if (readBytes == 0)
    4273 ECB             :         {
    4274 GIC          38 :             FileClose(*fd);
    4275              38 :             *fd = -1;
    4276              38 :             (*segno)++;
    4277              38 :             continue;
    4278 ECB             :         }
    4279 GIC      178476 :         else if (readBytes < 0)
    4280 UIC           0 :             ereport(ERROR,
    4281 ECB             :                     (errcode_for_file_access(),
    4282                 :                      errmsg("could not read from reorderbuffer spill file: %m")));
    4283 GIC      178476 :         else if (readBytes != sizeof(ReorderBufferDiskChange))
    4284 UIC           0 :             ereport(ERROR,
    4285                 :                     (errcode_for_file_access(),
    4286                 :                      errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
    4287                 :                             readBytes,
    4288                 :                             (uint32) sizeof(ReorderBufferDiskChange))));
    4289 ECB             : 
    4290 GIC      178476 :         file->curOffset += readBytes;
    4291                 : 
    4292 CBC      178476 :         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    4293                 : 
    4294 GIC      178476 :         ReorderBufferSerializeReserve(rb,
    4295 CBC      178476 :                                       sizeof(ReorderBufferDiskChange) + ondisk->size);
    4296 GIC      178476 :         ondisk = (ReorderBufferDiskChange *) rb->outbuf;
    4297 ECB             : 
    4298 CBC      178476 :         readBytes = FileRead(file->vfd,
    4299 GIC      178476 :                              rb->outbuf + sizeof(ReorderBufferDiskChange),
    4300          178476 :                              ondisk->size - sizeof(ReorderBufferDiskChange),
    4301 ECB             :                              file->curOffset,
    4302                 :                              WAIT_EVENT_REORDER_BUFFER_READ);
    4303                 : 
    4304 CBC      178476 :         if (readBytes < 0)
    4305 UIC           0 :             ereport(ERROR,
    4306 ECB             :                     (errcode_for_file_access(),
    4307                 :                      errmsg("could not read from reorderbuffer spill file: %m")));
    4308 GIC      178476 :         else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
    4309 LBC           0 :             ereport(ERROR,
    4310 ECB             :                     (errcode_for_file_access(),
    4311                 :                      errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
    4312                 :                             readBytes,
    4313                 :                             (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
    4314                 : 
    4315 GIC      178476 :         file->curOffset += readBytes;
    4316                 : 
    4317                 :         /*
    4318                 :          * ok, read a full change from disk, now restore it into proper
    4319 ECB             :          * in-memory format
    4320                 :          */
    4321 CBC      178476 :         ReorderBufferRestoreChange(rb, txn, rb->outbuf);
    4322 GIC      178476 :         restored++;
    4323                 :     }
    4324                 : 
    4325             102 :     return restored;
    4326 ECB             : }
    4327                 : 
    4328                 : /*
    4329                 :  * Convert change from its on-disk format to in-memory format and queue it onto
    4330                 :  * the TXN's ->changes list.
    4331                 :  *
    4332                 :  * Note: although "data" is declared char*, at entry it points to a
    4333                 :  * maxalign'd buffer, making it safe in most of this function to assume
    4334                 :  * that the pointed-to data is suitably aligned for direct access.
    4335                 :  */
    4336                 : static void
    4337 GIC      178476 : ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
    4338 ECB             :                            char *data)
    4339                 : {
    4340                 :     ReorderBufferDiskChange *ondisk;
    4341                 :     ReorderBufferChange *change;
    4342                 : 
    4343 CBC      178476 :     ondisk = (ReorderBufferDiskChange *) data;
    4344                 : 
    4345 GBC      178476 :     change = ReorderBufferGetChange(rb);
    4346 EUB             : 
    4347                 :     /* copy static part */
    4348 GIC      178476 :     memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
    4349 ECB             : 
    4350 GBC      178476 :     data += sizeof(ReorderBufferDiskChange);
    4351                 : 
    4352                 :     /* restore individual stuff */
    4353 GIC      178476 :     switch (change->action)
    4354                 :     {
    4355                 :             /* fall through these, they're all similar enough */
    4356          176581 :         case REORDER_BUFFER_CHANGE_INSERT:
    4357                 :         case REORDER_BUFFER_CHANGE_UPDATE:
    4358                 :         case REORDER_BUFFER_CHANGE_DELETE:
    4359                 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
    4360          176581 :             if (change->data.tp.oldtuple)
    4361 ECB             :             {
    4362 CBC        5006 :                 uint32      tuplelen = ((HeapTuple) data)->t_len;
    4363                 : 
    4364 GIC        5006 :                 change->data.tp.oldtuple =
    4365            5006 :                     ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
    4366                 : 
    4367 ECB             :                 /* restore ->tuple */
    4368 GIC        5006 :                 memcpy(&change->data.tp.oldtuple->tuple, data,
    4369 ECB             :                        sizeof(HeapTupleData));
    4370 CBC        5006 :                 data += sizeof(HeapTupleData);
    4371 ECB             : 
    4372                 :                 /* reset t_data pointer into the new tuplebuf */
    4373 GIC        5006 :                 change->data.tp.oldtuple->tuple.t_data =
    4374 CBC        5006 :                     ReorderBufferTupleBufData(change->data.tp.oldtuple);
    4375 EUB             : 
    4376                 :                 /* restore tuple data itself */
    4377 GIC        5006 :                 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
    4378 CBC        5006 :                 data += tuplelen;
    4379 EUB             :             }
    4380                 : 
    4381 GIC      176581 :             if (change->data.tp.newtuple)
    4382                 :             {
    4383                 :                 /* here, data might not be suitably aligned! */
    4384                 :                 uint32      tuplelen;
    4385 ECB             : 
    4386 GIC      166361 :                 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
    4387 ECB             :                        sizeof(uint32));
    4388                 : 
    4389 CBC      166361 :                 change->data.tp.newtuple =
    4390          166361 :                     ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
    4391 ECB             : 
    4392                 :                 /* restore ->tuple */
    4393 CBC      166361 :                 memcpy(&change->data.tp.newtuple->tuple, data,
    4394 ECB             :                        sizeof(HeapTupleData));
    4395 CBC      166361 :                 data += sizeof(HeapTupleData);
    4396                 : 
    4397                 :                 /* reset t_data pointer into the new tuplebuf */
    4398 GIC      166361 :                 change->data.tp.newtuple->tuple.t_data =
    4399 CBC      166361 :                     ReorderBufferTupleBufData(change->data.tp.newtuple);
    4400 EUB             : 
    4401                 :                 /* restore tuple data itself */
    4402 GIC      166361 :                 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
    4403 CBC      166361 :                 data += tuplelen;
    4404 EUB             :             }
    4405                 : 
    4406 GIC      176581 :             break;
    4407               1 :         case REORDER_BUFFER_CHANGE_MESSAGE:
    4408                 :             {
    4409                 :                 Size        prefix_size;
    4410 ECB             : 
    4411                 :                 /* read prefix */
    4412 GIC           1 :                 memcpy(&prefix_size, data, sizeof(Size));
    4413               1 :                 data += sizeof(Size);
    4414               1 :                 change->data.msg.prefix = MemoryContextAlloc(rb->context,
    4415                 :                                                              prefix_size);
    4416 CBC           1 :                 memcpy(change->data.msg.prefix, data, prefix_size);
    4417               1 :                 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
    4418 GIC           1 :                 data += prefix_size;
    4419                 : 
    4420 ECB             :                 /* read the message */
    4421 GIC           1 :                 memcpy(&change->data.msg.message_size, data, sizeof(Size));
    4422               1 :                 data += sizeof(Size);
    4423               1 :                 change->data.msg.message = MemoryContextAlloc(rb->context,
    4424                 :                                                               change->data.msg.message_size);
    4425               1 :                 memcpy(change->data.msg.message, data,
    4426                 :                        change->data.msg.message_size);
    4427               1 :                 data += change->data.msg.message_size;
    4428                 : 
    4429               1 :                 break;
    4430                 :             }
    4431              19 :         case REORDER_BUFFER_CHANGE_INVALIDATION:
    4432 ECB             :             {
    4433 GIC          19 :                 Size        inval_size = sizeof(SharedInvalidationMessage) *
    4434              19 :                 change->data.inval.ninvalidations;
    4435                 : 
    4436              19 :                 change->data.inval.invalidations =
    4437              19 :                     MemoryContextAlloc(rb->context, inval_size);
    4438 ECB             : 
    4439                 :                 /* read the message */
    4440 CBC          19 :                 memcpy(change->data.inval.invalidations, data, inval_size);
    4441                 : 
    4442 GIC          19 :                 break;
    4443 ECB             :             }
    4444 GIC           2 :         case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
    4445 ECB             :             {
    4446                 :                 Snapshot    oldsnap;
    4447                 :                 Snapshot    newsnap;
    4448                 :                 Size        size;
    4449                 : 
    4450 GIC           2 :                 oldsnap = (Snapshot) data;
    4451 ECB             : 
    4452 GIC           2 :                 size = sizeof(SnapshotData) +
    4453               2 :                     sizeof(TransactionId) * oldsnap->xcnt +
    4454               2 :                     sizeof(TransactionId) * (oldsnap->subxcnt + 0);
    4455 ECB             : 
    4456 GIC           2 :                 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
    4457 ECB             : 
    4458 GIC           2 :                 newsnap = change->data.snapshot;
    4459 ECB             : 
    4460 CBC           2 :                 memcpy(newsnap, data, size);
    4461 GIC           2 :                 newsnap->xip = (TransactionId *)
    4462                 :                     (((char *) newsnap) + sizeof(SnapshotData));
    4463 CBC           2 :                 newsnap->subxip = newsnap->xip + newsnap->xcnt;
    4464 GIC           2 :                 newsnap->copied = true;
    4465 CBC           2 :                 break;
    4466                 :             }
    4467                 :             /* the base struct contains all the data, easy peasy */
    4468 LBC           0 :         case REORDER_BUFFER_CHANGE_TRUNCATE:
    4469 ECB             :             {
    4470                 :                 Oid        *relids;
    4471                 : 
    4472 LBC           0 :                 relids = ReorderBufferGetRelids(rb,
    4473               0 :                                                 change->data.truncate.nrelids);
    4474 UIC           0 :                 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
    4475               0 :                 change->data.truncate.relids = relids;
    4476 ECB             : 
    4477 UIC           0 :                 break;
    4478                 :             }
    4479 GIC        1873 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
    4480                 :         case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
    4481 ECB             :         case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
    4482                 :         case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
    4483 GIC        1873 :             break;
    4484 ECB             :     }
    4485                 : 
    4486 GIC      178476 :     dlist_push_tail(&txn->changes, &change->node);
    4487          178476 :     txn->nentries_mem++;
    4488 ECB             : 
    4489                 :     /*
    4490                 :      * Update memory accounting for the restored change.  We need to do this
    4491                 :      * although we don't check the memory limit when restoring the changes in
    4492                 :      * this branch (we only do that when initially queueing the changes after
    4493                 :      * decoding), because we will release the changes later, and that will
    4494                 :      * update the accounting too (subtracting the size from the counters). And
    4495                 :      * we don't want to underflow there.
    4496                 :      */
    4497 CBC      178476 :     ReorderBufferChangeMemoryUpdate(rb, change, true,
    4498 ECB             :                                     ReorderBufferChangeSize(change));
    4499 GIC      178476 : }
    4500                 : 
    4501 ECB             : /*
    4502                 :  * Remove all on-disk stored for the passed in transaction.
    4503                 :  */
    4504                 : static void
    4505 GIC         298 : ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
    4506                 : {
    4507 ECB             :     XLogSegNo   first;
    4508                 :     XLogSegNo   cur;
    4509                 :     XLogSegNo   last;
    4510                 : 
    4511 CBC         298 :     Assert(txn->first_lsn != InvalidXLogRecPtr);
    4512             298 :     Assert(txn->final_lsn != InvalidXLogRecPtr);
    4513 ECB             : 
    4514 GIC         298 :     XLByteToSeg(txn->first_lsn, first, wal_segment_size);
    4515             298 :     XLByteToSeg(txn->final_lsn, last, wal_segment_size);
    4516 ECB             : 
    4517                 :     /* iterate over all possible filenames, and delete them */
    4518 CBC         600 :     for (cur = first; cur <= last; cur++)
    4519                 :     {
    4520 ECB             :         char        path[MAXPGPATH];
    4521                 : 
    4522 CBC         302 :         ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
    4523 GIC         302 :         if (unlink(path) != 0 && errno != ENOENT)
    4524 LBC           0 :             ereport(ERROR,
    4525                 :                     (errcode_for_file_access(),
    4526 ECB             :                      errmsg("could not remove file \"%s\": %m", path)));
    4527                 :     }
    4528 CBC         298 : }
    4529 ECB             : 
    4530                 : /*
    4531                 :  * Remove any leftover serialized reorder buffers from a slot directory after a
    4532                 :  * prior crash or decoding session exit.
    4533                 :  */
    4534                 : static void
    4535 CBC        1534 : ReorderBufferCleanupSerializedTXNs(const char *slotname)
    4536                 : {
    4537 ECB             :     DIR        *spill_dir;
    4538                 :     struct dirent *spill_de;
    4539                 :     struct stat statbuf;
    4540                 :     char        path[MAXPGPATH * 2 + 12];
    4541                 : 
    4542 GIC        1534 :     sprintf(path, "pg_replslot/%s", slotname);
    4543                 : 
    4544                 :     /* we're only handling directories here, skip if it's not ours */
    4545 CBC        1534 :     if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
    4546 UIC           0 :         return;
    4547 ECB             : 
    4548 CBC        1534 :     spill_dir = AllocateDir(path);
    4549            7670 :     while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
    4550                 :     {
    4551 ECB             :         /* only look at names that can be ours */
    4552 GIC        4602 :         if (strncmp(spill_de->d_name, "xid", 3) == 0)
    4553 ECB             :         {
    4554 UIC           0 :             snprintf(path, sizeof(path),
    4555 ECB             :                      "pg_replslot/%s/%s", slotname,
    4556 LBC           0 :                      spill_de->d_name);
    4557                 : 
    4558               0 :             if (unlink(path) != 0)
    4559               0 :                 ereport(ERROR,
    4560 ECB             :                         (errcode_for_file_access(),
    4561                 :                          errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
    4562                 :                                 path, slotname)));
    4563 EUB             :         }
    4564                 :     }
    4565 GIC        1534 :     FreeDir(spill_dir);
    4566                 : }
    4567 EUB             : 
    4568                 : /*
    4569                 :  * Given a replication slot, transaction ID and segment number, fill in the
    4570                 :  * corresponding spill file into 'path', which is a caller-owned buffer of size
    4571                 :  * at least MAXPGPATH.
    4572                 :  */
    4573                 : static void
    4574 CBC        4096 : ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
    4575                 :                             XLogSegNo segno)
    4576                 : {
    4577                 :     XLogRecPtr  recptr;
    4578 ECB             : 
    4579 GIC        4096 :     XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
    4580                 : 
    4581 CBC        4096 :     snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
    4582            4096 :              NameStr(MyReplicationSlot->data.name),
    4583 GIC        4096 :              xid, LSN_FORMAT_ARGS(recptr));
    4584            4096 : }
    4585                 : 
    4586                 : /*
    4587                 :  * Delete all data spilled to disk after we've restarted/crashed. It will be
    4588                 :  * recreated when the respective slots are reused.
    4589                 :  */
    4590                 : void
    4591            1176 : StartupReorderBuffer(void)
    4592 ECB             : {
    4593                 :     DIR        *logical_dir;
    4594                 :     struct dirent *logical_de;
    4595                 : 
    4596 GIC        1176 :     logical_dir = AllocateDir("pg_replslot");
    4597            3562 :     while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
    4598                 :     {
    4599            2386 :         if (strcmp(logical_de->d_name, ".") == 0 ||
    4600 CBC        1210 :             strcmp(logical_de->d_name, "..") == 0)
    4601 GIC        2352 :             continue;
    4602                 : 
    4603                 :         /* if it cannot be a slot, skip the directory */
    4604              34 :         if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
    4605 UIC           0 :             continue;
    4606 ECB             : 
    4607                 :         /*
    4608                 :          * ok, has to be a surviving logical slot, iterate and delete
    4609                 :          * everything starting with xid-*
    4610                 :          */
    4611 GIC          34 :         ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
    4612                 :     }
    4613 CBC        1176 :     FreeDir(logical_dir);
    4614 GIC        1176 : }
    4615                 : 
    4616                 : /* ---------------------------------------
    4617 ECB             :  * toast reassembly support
    4618                 :  * ---------------------------------------
    4619 EUB             :  */
    4620                 : 
    4621                 : /*
    4622                 :  * Initialize per tuple toast reconstruction support.
    4623 ECB             :  */
    4624                 : static void
    4625 GIC          33 : ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
    4626                 : {
    4627                 :     HASHCTL     hash_ctl;
    4628                 : 
    4629              33 :     Assert(txn->toast_hash == NULL);
    4630 ECB             : 
    4631 GIC          33 :     hash_ctl.keysize = sizeof(Oid);
    4632              33 :     hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
    4633              33 :     hash_ctl.hcxt = rb->context;
    4634              33 :     txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
    4635                 :                                   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
    4636              33 : }
    4637 ECB             : 
    4638                 : /*
    4639                 :  * Per toast-chunk handling for toast reconstruction
    4640                 :  *
    4641 EUB             :  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
    4642                 :  * toasted Datum comes along.
    4643 ECB             :  */
    4644                 : static void
    4645 GIC        1720 : ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
    4646                 :                               Relation relation, ReorderBufferChange *change)
    4647 ECB             : {
    4648                 :     ReorderBufferToastEnt *ent;
    4649 EUB             :     ReorderBufferTupleBuf *newtup;
    4650                 :     bool        found;
    4651                 :     int32       chunksize;
    4652                 :     bool        isnull;
    4653                 :     Pointer     chunk;
    4654 GBC        1720 :     TupleDesc   desc = RelationGetDescr(relation);
    4655                 :     Oid         chunk_id;
    4656                 :     int32       chunk_seq;
    4657                 : 
    4658 GIC        1720 :     if (txn->toast_hash == NULL)
    4659              33 :         ReorderBufferToastInitHash(rb, txn);
    4660 ECB             : 
    4661 GIC        1720 :     Assert(IsToastRelation(relation));
    4662                 : 
    4663            1720 :     newtup = change->data.tp.newtuple;
    4664            1720 :     chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
    4665            1720 :     Assert(!isnull);
    4666            1720 :     chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
    4667            1720 :     Assert(!isnull);
    4668                 : 
    4669 ECB             :     ent = (ReorderBufferToastEnt *)
    4670 GNC        1720 :         hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
    4671 ECB             : 
    4672 GIC        1720 :     if (!found)
    4673 ECB             :     {
    4674 CBC          47 :         Assert(ent->chunk_id == chunk_id);
    4675              47 :         ent->num_chunks = 0;
    4676              47 :         ent->last_chunk_seq = 0;
    4677 GIC          47 :         ent->size = 0;
    4678              47 :         ent->reconstructed = NULL;
    4679              47 :         dlist_init(&ent->chunks);
    4680                 : 
    4681              47 :         if (chunk_seq != 0)
    4682 UIC           0 :             elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
    4683 ECB             :                  chunk_seq, chunk_id);
    4684                 :     }
    4685 GIC        1673 :     else if (found && chunk_seq != ent->last_chunk_seq + 1)
    4686 UIC           0 :         elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
    4687                 :              chunk_seq, chunk_id, ent->last_chunk_seq + 1);
    4688 ECB             : 
    4689 CBC        1720 :     chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
    4690 GIC        1720 :     Assert(!isnull);
    4691 ECB             : 
    4692                 :     /* calculate size so we can allocate the right size at once later */
    4693 CBC        1720 :     if (!VARATT_IS_EXTENDED(chunk))
    4694 GIC        1720 :         chunksize = VARSIZE(chunk) - VARHDRSZ;
    4695 UIC           0 :     else if (VARATT_IS_SHORT(chunk))
    4696 ECB             :         /* could happen due to heap_form_tuple doing its thing */
    4697 UBC           0 :         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
    4698                 :     else
    4699 UIC           0 :         elog(ERROR, "unexpected type of toast chunk");
    4700                 : 
    4701 GIC        1720 :     ent->size += chunksize;
    4702            1720 :     ent->last_chunk_seq = chunk_seq;
    4703 CBC        1720 :     ent->num_chunks++;
    4704 GIC        1720 :     dlist_push_tail(&ent->chunks, &change->node);
    4705 CBC        1720 : }
    4706 ECB             : 
    4707                 : /*
    4708                 :  * Rejigger change->newtuple to point to in-memory toast tuples instead to
    4709                 :  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
    4710                 :  *
    4711                 :  * We cannot replace unchanged toast tuples though, so those will still point
    4712                 :  * to on-disk toast data.
    4713                 :  *
    4714                 :  * While updating the existing change with detoasted tuple data, we need to
    4715                 :  * update the memory accounting info, because the change size will differ.
    4716                 :  * Otherwise the accounting may get out of sync, triggering serialization
    4717                 :  * at unexpected times.
    4718                 :  *
    4719                 :  * We simply subtract size of the change before rejiggering the tuple, and
    4720                 :  * then adding the new size. This makes it look like the change was removed
    4721                 :  * and then added back, except it only tweaks the accounting info.
    4722                 :  *
    4723                 :  * In particular it can't trigger serialization, which would be pointless
    4724                 :  * anyway as it happens during commit processing right before handing
    4725                 :  * the change to the output plugin.
    4726                 :  */
    4727                 : static void
    4728 CBC      333687 : ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
    4729                 :                           Relation relation, ReorderBufferChange *change)
    4730                 : {
    4731                 :     TupleDesc   desc;
    4732                 :     int         natt;
    4733                 :     Datum      *attrs;
    4734                 :     bool       *isnull;
    4735                 :     bool       *free;
    4736                 :     HeapTuple   tmphtup;
    4737 ECB             :     Relation    toast_rel;
    4738                 :     TupleDesc   toast_desc;
    4739                 :     MemoryContext oldcontext;
    4740                 :     ReorderBufferTupleBuf *newtup;
    4741                 :     Size        old_size;
    4742                 : 
    4743                 :     /* no toast tuples changed */
    4744 GIC      333687 :     if (txn->toast_hash == NULL)
    4745          333442 :         return;
    4746 ECB             : 
    4747                 :     /*
    4748                 :      * We're going to modify the size of the change. So, to make sure the
    4749                 :      * accounting is correct we record the current change size and then after
    4750                 :      * re-computing the change we'll subtract the recorded size and then
    4751                 :      * re-add the new change size at the end. We don't immediately subtract
    4752                 :      * the old size because if there is any error before we add the new size,
    4753                 :      * we will release the changes and that will update the accounting info
    4754                 :      * (subtracting the size from the counters). And we don't want to
    4755                 :      * underflow there.
    4756                 :      */
    4757 CBC         245 :     old_size = ReorderBufferChangeSize(change);
    4758 ECB             : 
    4759 CBC         245 :     oldcontext = MemoryContextSwitchTo(rb->context);
    4760                 : 
    4761                 :     /* we should only have toast tuples in an INSERT or UPDATE */
    4762             245 :     Assert(change->data.tp.newtuple);
    4763                 : 
    4764             245 :     desc = RelationGetDescr(relation);
    4765                 : 
    4766             245 :     toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
    4767             245 :     if (!RelationIsValid(toast_rel))
    4768 LBC           0 :         elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
    4769 ECB             :              relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
    4770                 : 
    4771 CBC         245 :     toast_desc = RelationGetDescr(toast_rel);
    4772                 : 
    4773 ECB             :     /* should we allocate from stack instead? */
    4774 GBC         245 :     attrs = palloc0(sizeof(Datum) * desc->natts);
    4775 GIC         245 :     isnull = palloc0(sizeof(bool) * desc->natts);
    4776             245 :     free = palloc0(sizeof(bool) * desc->natts);
    4777 ECB             : 
    4778 GBC         245 :     newtup = change->data.tp.newtuple;
    4779                 : 
    4780 GIC         245 :     heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
    4781 ECB             : 
    4782 CBC         755 :     for (natt = 0; natt < desc->natts; natt++)
    4783                 :     {
    4784 GIC         510 :         Form_pg_attribute attr = TupleDescAttr(desc, natt);
    4785 ECB             :         ReorderBufferToastEnt *ent;
    4786                 :         struct varlena *varlena;
    4787 EUB             : 
    4788                 :         /* va_rawsize is the size of the original datum -- including header */
    4789                 :         struct varatt_external toast_pointer;
    4790                 :         struct varatt_indirect redirect_pointer;
    4791 GBC         510 :         struct varlena *new_datum = NULL;
    4792                 :         struct varlena *reconstructed;
    4793 ECB             :         dlist_iter  it;
    4794 CBC         510 :         Size        data_done = 0;
    4795 ECB             : 
    4796                 :         /* system columns aren't toasted */
    4797 CBC         510 :         if (attr->attnum < 0)
    4798 GIC         463 :             continue;
    4799                 : 
    4800             510 :         if (attr->attisdropped)
    4801 UIC           0 :             continue;
    4802                 : 
    4803                 :         /* not a varlena datatype */
    4804 GIC         510 :         if (attr->attlen != -1)
    4805             241 :             continue;
    4806                 : 
    4807                 :         /* no data */
    4808             269 :         if (isnull[natt])
    4809              12 :             continue;
    4810                 : 
    4811                 :         /* ok, we know we have a toast datum */
    4812             257 :         varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
    4813                 : 
    4814                 :         /* no need to do anything if the tuple isn't external */
    4815             257 :         if (!VARATT_IS_EXTERNAL(varlena))
    4816             202 :             continue;
    4817                 : 
    4818              55 :         VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
    4819                 : 
    4820 ECB             :         /*
    4821                 :          * Check whether the toast tuple changed, replace if so.
    4822                 :          */
    4823                 :         ent = (ReorderBufferToastEnt *)
    4824 GIC          55 :             hash_search(txn->toast_hash,
    4825                 :                         &toast_pointer.va_valueid,
    4826                 :                         HASH_FIND,
    4827                 :                         NULL);
    4828              55 :         if (ent == NULL)
    4829               8 :             continue;
    4830                 : 
    4831                 :         new_datum =
    4832              47 :             (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
    4833                 : 
    4834              47 :         free[natt] = true;
    4835                 : 
    4836 CBC          47 :         reconstructed = palloc0(toast_pointer.va_rawsize);
    4837 ECB             : 
    4838 GIC          47 :         ent->reconstructed = reconstructed;
    4839                 : 
    4840                 :         /* stitch toast tuple back together from its parts */
    4841            1767 :         dlist_foreach(it, &ent->chunks)
    4842                 :         {
    4843                 :             bool        isnull;
    4844                 :             ReorderBufferChange *cchange;
    4845                 :             ReorderBufferTupleBuf *ctup;
    4846                 :             Pointer     chunk;
    4847                 : 
    4848            1720 :             cchange = dlist_container(ReorderBufferChange, node, it.cur);
    4849 CBC        1720 :             ctup = cchange->data.tp.newtuple;
    4850 GIC        1720 :             chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
    4851 ECB             : 
    4852 GIC        1720 :             Assert(!isnull);
    4853            1720 :             Assert(!VARATT_IS_EXTERNAL(chunk));
    4854 CBC        1720 :             Assert(!VARATT_IS_SHORT(chunk));
    4855                 : 
    4856            1720 :             memcpy(VARDATA(reconstructed) + data_done,
    4857 GIC        1720 :                    VARDATA(chunk),
    4858 CBC        1720 :                    VARSIZE(chunk) - VARHDRSZ);
    4859            1720 :             data_done += VARSIZE(chunk) - VARHDRSZ;
    4860 EUB             :         }
    4861 GIC          47 :         Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
    4862                 : 
    4863 ECB             :         /* make sure its marked as compressed or not */
    4864 GIC          47 :         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
    4865               5 :             SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
    4866 ECB             :         else
    4867 CBC          42 :             SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
    4868 ECB             : 
    4869 GIC          47 :         memset(&redirect_pointer, 0, sizeof(redirect_pointer));
    4870 CBC          47 :         redirect_pointer.pointer = reconstructed;
    4871                 : 
    4872              47 :         SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
    4873 GIC          47 :         memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
    4874 ECB             :                sizeof(redirect_pointer));
    4875                 : 
    4876 CBC          47 :         attrs[natt] = PointerGetDatum(new_datum);
    4877                 :     }
    4878                 : 
    4879                 :     /*
    4880                 :      * Build tuple in separate memory & copy tuple back into the tuplebuf
    4881                 :      * passed to the output plugin. We can't directly heap_fill_tuple() into
    4882                 :      * the tuplebuf because attrs[] will point back into the current content.
    4883 ECB             :      */
    4884 GIC         245 :     tmphtup = heap_form_tuple(desc, attrs, isnull);
    4885             245 :     Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
    4886 CBC         245 :     Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
    4887                 : 
    4888 GIC         245 :     memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
    4889 CBC         245 :     newtup->tuple.t_len = tmphtup->t_len;
    4890 ECB             : 
    4891                 :     /*
    4892                 :      * free resources we won't further need, more persistent stuff will be
    4893 EUB             :      * free'd in ReorderBufferToastReset().
    4894                 :      */
    4895 GIC         245 :     RelationClose(toast_rel);
    4896 CBC         245 :     pfree(tmphtup);
    4897             755 :     for (natt = 0; natt < desc->natts; natt++)
    4898                 :     {
    4899 GIC         510 :         if (free[natt])
    4900 CBC          47 :             pfree(DatumGetPointer(attrs[natt]));
    4901 ECB             :     }
    4902 GIC         245 :     pfree(attrs);
    4903             245 :     pfree(free);
    4904 CBC         245 :     pfree(isnull);
    4905                 : 
    4906 GIC         245 :     MemoryContextSwitchTo(oldcontext);
    4907 ECB             : 
    4908                 :     /* subtract the old change size */
    4909 GIC         245 :     ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
    4910 ECB             :     /* now add the change back, with the correct size */
    4911 GIC         245 :     ReorderBufferChangeMemoryUpdate(rb, change, true,
    4912                 :                                     ReorderBufferChangeSize(change));
    4913                 : }
    4914                 : 
    4915                 : /*
    4916 ECB             :  * Free all resources allocated for toast reconstruction.
    4917                 :  */
    4918                 : static void
    4919 GIC      336731 : ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
    4920 ECB             : {
    4921                 :     HASH_SEQ_STATUS hstat;
    4922                 :     ReorderBufferToastEnt *ent;
    4923                 : 
    4924 CBC      336731 :     if (txn->toast_hash == NULL)
    4925 GIC      336698 :         return;
    4926 ECB             : 
    4927                 :     /* sequentially walk over the hash and free everything */
    4928 CBC          33 :     hash_seq_init(&hstat, txn->toast_hash);
    4929 GIC          80 :     while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
    4930 ECB             :     {
    4931                 :         dlist_mutable_iter it;
    4932                 : 
    4933 CBC          47 :         if (ent->reconstructed != NULL)
    4934 GIC          47 :             pfree(ent->reconstructed);
    4935                 : 
    4936            1767 :         dlist_foreach_modify(it, &ent->chunks)
    4937                 :         {
    4938            1720 :             ReorderBufferChange *change =
    4939            1720 :             dlist_container(ReorderBufferChange, node, it.cur);
    4940 ECB             : 
    4941 CBC        1720 :             dlist_delete(&change->node);
    4942            1720 :             ReorderBufferReturnChange(rb, change, true);
    4943                 :         }
    4944 ECB             :     }
    4945                 : 
    4946 CBC          33 :     hash_destroy(txn->toast_hash);
    4947 GIC          33 :     txn->toast_hash = NULL;
    4948 ECB             : }
    4949                 : 
    4950                 : 
    4951                 : /* ---------------------------------------
    4952                 :  * Visibility support for logical decoding
    4953                 :  *
    4954                 :  *
    4955                 :  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
    4956                 :  * always rely on stored cmin/cmax values because of two scenarios:
    4957                 :  *
    4958                 :  * * A tuple got changed multiple times during a single transaction and thus
    4959                 :  *   has got a combo CID. Combo CIDs are only valid for the duration of a
    4960                 :  *   single transaction.
    4961                 :  * * A tuple with a cmin but no cmax (and thus no combo CID) got
    4962                 :  *   deleted/updated in another transaction than the one which created it
    4963                 :  *   which we are looking at right now. As only one of cmin, cmax or combo CID
    4964                 :  *   is actually stored in the heap we don't have access to the value we
    4965                 :  *   need anymore.
    4966                 :  *
    4967                 :  * To resolve those problems we have a per-transaction hash of (cmin,
    4968                 :  * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
    4969                 :  * (cmin, cmax) values. That also takes care of combo CIDs by simply
    4970                 :  * not caring about them at all. As we have the real cmin/cmax values
    4971                 :  * combo CIDs aren't interesting.
    4972                 :  *
    4973                 :  * As we only care about catalog tuples here the overhead of this
    4974                 :  * hashtable should be acceptable.
    4975                 :  *
    4976                 :  * Heap rewrites complicate this a bit, check rewriteheap.c for
    4977                 :  * details.
    4978                 :  * -------------------------------------------------------------------------
    4979                 :  */
    4980                 : 
    4981                 : /* struct for sorting mapping files by LSN efficiently */
    4982                 : typedef struct RewriteMappingFile
    4983                 : {
    4984                 :     XLogRecPtr  lsn;
    4985                 :     char        fname[MAXPGPATH];
    4986                 : } RewriteMappingFile;
    4987                 : 
    4988                 : #ifdef NOT_USED
    4989                 : static void
    4990                 : DisplayMapping(HTAB *tuplecid_data)
    4991                 : {
    4992                 :     HASH_SEQ_STATUS hstat;
    4993                 :     ReorderBufferTupleCidEnt *ent;
    4994                 : 
    4995                 :     hash_seq_init(&hstat, tuplecid_data);
    4996                 :     while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
    4997                 :     {
    4998                 :         elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
    4999                 :              ent->key.rlocator.dbOid,
    5000                 :              ent->key.rlocator.spcOid,
    5001                 :              ent->key.rlocator.relNumber,
    5002                 :              ItemPointerGetBlockNumber(&ent->key.tid),
    5003                 :              ItemPointerGetOffsetNumber(&ent->key.tid),
    5004                 :              ent->cmin,
    5005                 :              ent->cmax
    5006                 :             );
    5007                 :     }
    5008                 : }
    5009                 : #endif
    5010                 : 
    5011                 : /*
    5012                 :  * Apply a single mapping file to tuplecid_data.
    5013                 :  *
    5014                 :  * The mapping file has to have been verified to be a) committed b) for our
    5015                 :  * transaction c) applied in LSN order.
    5016                 :  */
    5017                 : static void
    5018 GIC          22 : ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
    5019                 : {
    5020 ECB             :     char        path[MAXPGPATH];
    5021                 :     int         fd;
    5022                 :     int         readBytes;
    5023                 :     LogicalRewriteMappingData map;
    5024                 : 
    5025 CBC          22 :     sprintf(path, "pg_logical/mappings/%s", fname);
    5026              22 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    5027 GIC          22 :     if (fd < 0)
    5028 LBC           0 :         ereport(ERROR,
    5029                 :                 (errcode_for_file_access(),
    5030 ECB             :                  errmsg("could not open file \"%s\": %m", path)));
    5031                 : 
    5032                 :     while (true)
    5033 CBC         119 :     {
    5034 ECB             :         ReorderBufferTupleCidKey key;
    5035                 :         ReorderBufferTupleCidEnt *ent;
    5036                 :         ReorderBufferTupleCidEnt *new_ent;
    5037                 :         bool        found;
    5038                 : 
    5039                 :         /* be careful about padding */
    5040 GIC         141 :         memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
    5041                 : 
    5042                 :         /* read all mappings till the end of the file */
    5043             141 :         pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
    5044             141 :         readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
    5045             141 :         pgstat_report_wait_end();
    5046                 : 
    5047             141 :         if (readBytes < 0)
    5048 UIC           0 :             ereport(ERROR,
    5049                 :                     (errcode_for_file_access(),
    5050                 :                      errmsg("could not read file \"%s\": %m",
    5051                 :                             path)));
    5052 GIC         141 :         else if (readBytes == 0)    /* EOF */
    5053              22 :             break;
    5054             119 :         else if (readBytes != sizeof(LogicalRewriteMappingData))
    5055 UIC           0 :             ereport(ERROR,
    5056                 :                     (errcode_for_file_access(),
    5057                 :                      errmsg("could not read from file \"%s\": read %d instead of %d bytes",
    5058                 :                             path, readBytes,
    5059                 :                             (int32) sizeof(LogicalRewriteMappingData))));
    5060                 : 
    5061 GNC         119 :         key.rlocator = map.old_locator;
    5062 GIC         119 :         ItemPointerCopy(&map.old_tid,
    5063                 :                         &key.tid);
    5064                 : 
    5065                 : 
    5066                 :         ent = (ReorderBufferTupleCidEnt *)
    5067 GNC         119 :             hash_search(tuplecid_data, &key, HASH_FIND, NULL);
    5068                 : 
    5069                 :         /* no existing mapping, no need to update */
    5070 GIC         119 :         if (!ent)
    5071 UIC           0 :             continue;
    5072                 : 
    5073 GNC         119 :         key.rlocator = map.new_locator;
    5074 GIC         119 :         ItemPointerCopy(&map.new_tid,
    5075                 :                         &key.tid);
    5076                 : 
    5077                 :         new_ent = (ReorderBufferTupleCidEnt *)
    5078 GNC         119 :             hash_search(tuplecid_data, &key, HASH_ENTER, &found);
    5079                 : 
    5080 GIC         119 :         if (found)
    5081                 :         {
    5082                 :             /*
    5083                 :              * Make sure the existing mapping makes sense. We sometime update
    5084                 :              * old records that did not yet have a cmax (e.g. pg_class' own
    5085                 :              * entry while rewriting it) during rewrites, so allow that.
    5086                 :              */
    5087               6 :             Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
    5088               6 :             Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
    5089                 :         }
    5090                 :         else
    5091                 :         {
    5092                 :             /* update mapping */
    5093             113 :             new_ent->cmin = ent->cmin;
    5094             113 :             new_ent->cmax = ent->cmax;
    5095             113 :             new_ent->combocid = ent->combocid;
    5096                 :         }
    5097                 :     }
    5098                 : 
    5099              22 :     if (CloseTransientFile(fd) != 0)
    5100 UIC           0 :         ereport(ERROR,
    5101                 :                 (errcode_for_file_access(),
    5102                 :                  errmsg("could not close file \"%s\": %m", path)));
    5103 GIC          22 : }
    5104 ECB             : 
    5105                 : 
    5106                 : /*
    5107                 :  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
    5108                 :  */
    5109                 : static bool
    5110 GIC         290 : TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
    5111 ECB             : {
    5112 CBC         290 :     return bsearch(&xid, xip, num,
    5113             290 :                    sizeof(TransactionId), xidComparator) != NULL;
    5114 EUB             : }
    5115                 : 
    5116                 : /*
    5117                 :  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
    5118                 :  */
    5119 ECB             : static int
    5120 GIC          32 : file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
    5121                 : {
    5122              32 :     RewriteMappingFile *a = (RewriteMappingFile *) lfirst(a_p);
    5123              32 :     RewriteMappingFile *b = (RewriteMappingFile *) lfirst(b_p);
    5124                 : 
    5125              32 :     if (a->lsn < b->lsn)
    5126 CBC           9 :         return -1;
    5127 GIC          23 :     else if (a->lsn > b->lsn)
    5128              23 :         return 1;
    5129 LBC           0 :     return 0;
    5130 ECB             : }
    5131                 : 
    5132                 : /*
    5133                 :  * Apply any existing logical remapping files if there are any targeted at our
    5134 EUB             :  * transaction for relid.
    5135                 :  */
    5136                 : static void
    5137 GIC           5 : UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
    5138 ECB             : {
    5139                 :     DIR        *mapping_dir;
    5140                 :     struct dirent *mapping_de;
    5141 GBC           5 :     List       *files = NIL;
    5142                 :     ListCell   *file;
    5143 GIC           5 :     Oid         dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
    5144                 : 
    5145               5 :     mapping_dir = AllocateDir("pg_logical/mappings");
    5146             460 :     while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
    5147 ECB             :     {
    5148                 :         Oid         f_dboid;
    5149                 :         Oid         f_relid;
    5150                 :         TransactionId f_mapped_xid;
    5151                 :         TransactionId f_create_xid;
    5152                 :         XLogRecPtr  f_lsn;
    5153                 :         uint32      f_hi,
    5154                 :                     f_lo;
    5155                 :         RewriteMappingFile *f;
    5156                 : 
    5157 GBC         455 :         if (strcmp(mapping_de->d_name, ".") == 0 ||
    5158 GIC         450 :             strcmp(mapping_de->d_name, "..") == 0)
    5159 CBC         433 :             continue;
    5160 ECB             : 
    5161                 :         /* Ignore files that aren't ours */
    5162 GIC         445 :         if (strncmp(mapping_de->d_name, "map-", 4) != 0)
    5163 UIC           0 :             continue;
    5164 ECB             : 
    5165 GIC         445 :         if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
    5166 ECB             :                    &f_dboid, &f_relid, &f_hi, &f_lo,
    5167                 :                    &f_mapped_xid, &f_create_xid) != 6)
    5168 UIC           0 :             elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
    5169                 : 
    5170 GIC         445 :         f_lsn = ((uint64) f_hi) << 32 | f_lo;
    5171                 : 
    5172                 :         /* mapping for another database */
    5173 CBC         445 :         if (f_dboid != dboid)
    5174 LBC           0 :             continue;
    5175                 : 
    5176                 :         /* mapping for another relation */
    5177 GIC         445 :         if (f_relid != relid)
    5178              45 :             continue;
    5179 ECB             : 
    5180                 :         /* did the creating transaction abort? */
    5181 CBC         400 :         if (!TransactionIdDidCommit(f_create_xid))
    5182 GIC         110 :             continue;
    5183                 : 
    5184                 :         /* not for our transaction */
    5185 CBC         290 :         if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
    5186 GBC         268 :             continue;
    5187                 : 
    5188                 :         /* ok, relevant, queue for apply */
    5189 CBC          22 :         f = palloc(sizeof(RewriteMappingFile));
    5190 GIC          22 :         f->lsn = f_lsn;
    5191              22 :         strcpy(f->fname, mapping_de->d_name);
    5192              22 :         files = lappend(files, f);
    5193                 :     }
    5194               5 :     FreeDir(mapping_dir);
    5195                 : 
    5196 ECB             :     /* sort files so we apply them in LSN order */
    5197 GIC           5 :     list_sort(files, file_sort_by_lsn);
    5198 ECB             : 
    5199 CBC          27 :     foreach(file, files)
    5200                 :     {
    5201 GIC          22 :         RewriteMappingFile *f = (RewriteMappingFile *) lfirst(file);
    5202                 : 
    5203              22 :         elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
    5204                 :              snapshot->subxip[0]);
    5205              22 :         ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
    5206 CBC          22 :         pfree(f);
    5207                 :     }
    5208               5 : }
    5209 ECB             : 
    5210                 : /*
    5211                 :  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
    5212                 :  * combo CIDs.
    5213                 :  */
    5214                 : bool
    5215 GBC         601 : ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
    5216                 :                               Snapshot snapshot,
    5217                 :                               HeapTuple htup, Buffer buffer,
    5218                 :                               CommandId *cmin, CommandId *cmax)
    5219                 : {
    5220                 :     ReorderBufferTupleCidKey key;
    5221                 :     ReorderBufferTupleCidEnt *ent;
    5222                 :     ForkNumber  forkno;
    5223 ECB             :     BlockNumber blockno;
    5224 GIC         601 :     bool        updated_mapping = false;
    5225                 : 
    5226                 :     /*
    5227 ECB             :      * Return unresolved if tuplecid_data is not valid.  That's because when
    5228                 :      * streaming in-progress transactions we may run into tuples with the CID
    5229                 :      * before actually decoding them.  Think e.g. about INSERT followed by
    5230                 :      * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
    5231                 :      * INSERT.  So in such cases, we assume the CID is from the future
    5232                 :      * command.
    5233                 :      */
    5234 GIC         601 :     if (tuplecid_data == NULL)
    5235               9 :         return false;
    5236                 : 
    5237                 :     /* be careful about padding */
    5238             592 :     memset(&key, 0, sizeof(key));
    5239                 : 
    5240             592 :     Assert(!BufferIsLocal(buffer));
    5241                 : 
    5242                 :     /*
    5243                 :      * get relfilelocator from the buffer, no convenient way to access it
    5244                 :      * other than that.
    5245 ECB             :      */
    5246 GNC         592 :     BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
    5247                 : 
    5248 ECB             :     /* tuples can only be in the main fork */
    5249 GBC         592 :     Assert(forkno == MAIN_FORKNUM);
    5250 GIC         592 :     Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
    5251 ECB             : 
    5252 GIC         592 :     ItemPointerCopy(&htup->t_self,
    5253                 :                     &key.tid);
    5254 EUB             : 
    5255 GIC         597 : restart:
    5256 ECB             :     ent = (ReorderBufferTupleCidEnt *)
    5257 GNC         597 :         hash_search(tuplecid_data, &key, HASH_FIND, NULL);
    5258                 : 
    5259                 :     /*
    5260 ECB             :      * failed to find a mapping, check whether the table was rewritten and
    5261                 :      * apply mapping if so, but only do that once - there can be no new
    5262                 :      * mappings while we are in here since we have to hold a lock on the
    5263                 :      * relation.
    5264                 :      */
    5265 CBC         597 :     if (ent == NULL && !updated_mapping)
    5266                 :     {
    5267 GIC           5 :         UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
    5268 ECB             :         /* now check but don't update for a mapping again */
    5269 CBC           5 :         updated_mapping = true;
    5270 GIC           5 :         goto restart;
    5271                 :     }
    5272 CBC         592 :     else if (ent == NULL)
    5273 LBC           0 :         return false;
    5274 ECB             : 
    5275 CBC         592 :     if (cmin)
    5276 GIC         592 :         *cmin = ent->cmin;
    5277 CBC         592 :     if (cmax)
    5278 GIC         592 :         *cmax = ent->cmax;
    5279             592 :     return true;
    5280 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a