Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * reorderbuffer.c
4 : : * PostgreSQL logical replay/reorder buffer management
5 : : *
6 : : *
7 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/replication/logical/reorderbuffer.c
12 : : *
13 : : * NOTES
14 : : * This module gets handed individual pieces of transactions in the order
15 : : * they are written to the WAL and is responsible to reassemble them into
16 : : * toplevel transaction sized pieces. When a transaction is completely
17 : : * reassembled - signaled by reading the transaction commit record - it
18 : : * will then call the output plugin (cf. ReorderBufferCommit()) with the
19 : : * individual changes. The output plugins rely on snapshots built by
20 : : * snapbuild.c which hands them to us.
21 : : *
22 : : * Transactions and subtransactions/savepoints in postgres are not
23 : : * immediately linked to each other from outside the performing
24 : : * backend. Only at commit/abort (or special xact_assignment records) they
25 : : * are linked together. Which means that we will have to splice together a
26 : : * toplevel transaction from its subtransactions. To do that efficiently we
27 : : * build a binary heap indexed by the smallest current lsn of the individual
28 : : * subtransactions' changestreams. As the individual streams are inherently
29 : : * ordered by LSN - since that is where we build them from - the transaction
30 : : * can easily be reassembled by always using the subtransaction with the
31 : : * smallest current LSN from the heap.
32 : : *
33 : : * In order to cope with large transactions - which can be several times as
34 : : * big as the available memory - this module supports spooling the contents
35 : : * of large transactions to disk. When the transaction is replayed the
36 : : * contents of individual (sub-)transactions will be read from disk in
37 : : * chunks.
38 : : *
39 : : * This module also has to deal with reassembling toast records from the
40 : : * individual chunks stored in WAL. When a new (or initial) version of a
41 : : * tuple is stored in WAL it will always be preceded by the toast chunks
42 : : * emitted for the columns stored out of line. Within a single toplevel
43 : : * transaction there will be no other data carrying records between a row's
44 : : * toast chunks and the row data itself. See ReorderBufferToast* for
45 : : * details.
46 : : *
47 : : * ReorderBuffer uses two special memory context types - SlabContext for
48 : : * allocations of fixed-length structures (changes and transactions), and
49 : : * GenerationContext for the variable-length transaction data (allocated
50 : : * and freed in groups with similar lifespans).
51 : : *
52 : : * To limit the amount of memory used by decoded changes, we track memory
53 : : * used at the reorder buffer level (i.e. total amount of memory), and for
54 : : * each transaction. When the total amount of used memory exceeds the
55 : : * limit, the transaction consuming the most memory is then serialized to
56 : : * disk.
57 : : *
58 : : * Only decoded changes are evicted from memory (spilled to disk), not the
59 : : * transaction records. The number of toplevel transactions is limited,
60 : : * but a transaction with many subtransactions may still consume significant
61 : : * amounts of memory. However, the transaction records are fairly small and
62 : : * are not included in the memory limit.
63 : : *
64 : : * The current eviction algorithm is very simple - the transaction is
65 : : * picked merely by size, while it might be useful to also consider age
66 : : * (LSN) of the changes for example. With the new Generational memory
67 : : * allocator, evicting the oldest changes would make it more likely the
68 : : * memory gets actually freed.
69 : : *
70 : : * We use a max-heap with transaction size as the key to efficiently find
71 : : * the largest transaction. We update the max-heap whenever the memory
72 : : * counter is updated; however transactions with size 0 are not stored in
73 : : * the heap, because they have no changes to evict.
74 : : *
75 : : * We still rely on max_changes_in_memory when loading serialized changes
76 : : * back into memory. At that point we can't use the memory limit directly
77 : : * as we load the subxacts independently. One option to deal with this
78 : : * would be to count the subxacts, and allow each to allocate 1/N of the
79 : : * memory limit. That however does not seem very appealing, because with
80 : : * many subtransactions it may easily cause thrashing (short cycles of
81 : : * deserializing and applying very few changes). We probably should give
82 : : * a bit more memory to the oldest subtransactions, because it's likely
83 : : * they are the source for the next sequence of changes.
84 : : *
85 : : * -------------------------------------------------------------------------
86 : : */
87 : : #include "postgres.h"
88 : :
89 : : #include <unistd.h>
90 : : #include <sys/stat.h>
91 : :
92 : : #include "access/detoast.h"
93 : : #include "access/heapam.h"
94 : : #include "access/rewriteheap.h"
95 : : #include "access/transam.h"
96 : : #include "access/xact.h"
97 : : #include "access/xlog_internal.h"
98 : : #include "catalog/catalog.h"
99 : : #include "common/int.h"
100 : : #include "lib/binaryheap.h"
101 : : #include "miscadmin.h"
102 : : #include "pgstat.h"
103 : : #include "replication/logical.h"
104 : : #include "replication/reorderbuffer.h"
105 : : #include "replication/slot.h"
106 : : #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
107 : : #include "storage/bufmgr.h"
108 : : #include "storage/fd.h"
109 : : #include "storage/sinval.h"
110 : : #include "utils/builtins.h"
111 : : #include "utils/memutils.h"
112 : : #include "utils/rel.h"
113 : : #include "utils/relfilenumbermap.h"
114 : :
115 : : /* entry for a hash table we use to map from xid to our transaction state */
116 : : typedef struct ReorderBufferTXNByIdEnt
117 : : {
118 : : TransactionId xid;
119 : : ReorderBufferTXN *txn;
120 : : } ReorderBufferTXNByIdEnt;
121 : :
122 : : /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
123 : : typedef struct ReorderBufferTupleCidKey
124 : : {
125 : : RelFileLocator rlocator;
126 : : ItemPointerData tid;
127 : : } ReorderBufferTupleCidKey;
128 : :
129 : : typedef struct ReorderBufferTupleCidEnt
130 : : {
131 : : ReorderBufferTupleCidKey key;
132 : : CommandId cmin;
133 : : CommandId cmax;
134 : : CommandId combocid; /* just for debugging */
135 : : } ReorderBufferTupleCidEnt;
136 : :
137 : : /* Virtual file descriptor with file offset tracking */
138 : : typedef struct TXNEntryFile
139 : : {
140 : : File vfd; /* -1 when the file is closed */
141 : : off_t curOffset; /* offset for next write or read. Reset to 0
142 : : * when vfd is opened. */
143 : : } TXNEntryFile;
144 : :
145 : : /* k-way in-order change iteration support structures */
146 : : typedef struct ReorderBufferIterTXNEntry
147 : : {
148 : : XLogRecPtr lsn;
149 : : ReorderBufferChange *change;
150 : : ReorderBufferTXN *txn;
151 : : TXNEntryFile file;
152 : : XLogSegNo segno;
153 : : } ReorderBufferIterTXNEntry;
154 : :
155 : : typedef struct ReorderBufferIterTXNState
156 : : {
157 : : binaryheap *heap;
158 : : Size nr_txns;
159 : : dlist_head old_change;
160 : : ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
161 : : } ReorderBufferIterTXNState;
162 : :
163 : : /* toast datastructures */
164 : : typedef struct ReorderBufferToastEnt
165 : : {
166 : : Oid chunk_id; /* toast_table.chunk_id */
167 : : int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
168 : : * have seen */
169 : : Size num_chunks; /* number of chunks we've already seen */
170 : : Size size; /* combined size of chunks seen */
171 : : dlist_head chunks; /* linked list of chunks */
172 : : struct varlena *reconstructed; /* reconstructed varlena now pointed to in
173 : : * main tup */
174 : : } ReorderBufferToastEnt;
175 : :
176 : : /* Disk serialization support datastructures */
177 : : typedef struct ReorderBufferDiskChange
178 : : {
179 : : Size size;
180 : : ReorderBufferChange change;
181 : : /* data follows */
182 : : } ReorderBufferDiskChange;
183 : :
184 : : #define IsSpecInsert(action) \
185 : : ( \
186 : : ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
187 : : )
188 : : #define IsSpecConfirmOrAbort(action) \
189 : : ( \
190 : : (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
191 : : ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
192 : : )
193 : : #define IsInsertOrUpdate(action) \
194 : : ( \
195 : : (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
196 : : ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
197 : : ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
198 : : )
199 : :
200 : : /*
201 : : * Maximum number of changes kept in memory, per transaction. After that,
202 : : * changes are spooled to disk.
203 : : *
204 : : * The current value should be sufficient to decode the entire transaction
205 : : * without hitting disk in OLTP workloads, while starting to spool to disk in
206 : : * other workloads reasonably fast.
207 : : *
208 : : * At some point in the future it probably makes sense to have a more elaborate
209 : : * resource management here, but it's not entirely clear what that would look
210 : : * like.
211 : : */
212 : : int logical_decoding_work_mem;
213 : : static const Size max_changes_in_memory = 4096; /* XXX for restore only */
214 : :
215 : : /* GUC variable */
216 : : int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
217 : :
218 : : /* ---------------------------------------
219 : : * primary reorderbuffer support routines
220 : : * ---------------------------------------
221 : : */
222 : : static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
223 : : static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
224 : : static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
225 : : TransactionId xid, bool create, bool *is_new,
226 : : XLogRecPtr lsn, bool create_as_top);
227 : : static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
228 : : ReorderBufferTXN *subtxn);
229 : :
230 : : static void AssertTXNLsnOrder(ReorderBuffer *rb);
231 : :
232 : : /* ---------------------------------------
233 : : * support functions for lsn-order iterating over the ->changes of a
234 : : * transaction and its subtransactions
235 : : *
236 : : * used for iteration over the k-way heap merge of a transaction and its
237 : : * subtransactions
238 : : * ---------------------------------------
239 : : */
240 : : static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
241 : : ReorderBufferIterTXNState *volatile *iter_state);
242 : : static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
243 : : static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
244 : : ReorderBufferIterTXNState *state);
245 : : static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
246 : :
247 : : /*
248 : : * ---------------------------------------
249 : : * Disk serialization support functions
250 : : * ---------------------------------------
251 : : */
252 : : static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb);
253 : : static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
254 : : static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
255 : : int fd, ReorderBufferChange *change);
256 : : static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
257 : : TXNEntryFile *file, XLogSegNo *segno);
258 : : static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
259 : : char *data);
260 : : static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
261 : : static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
262 : : bool txn_prepared);
263 : : static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
264 : : static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
265 : : TransactionId xid, XLogSegNo segno);
266 : : static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg);
267 : :
268 : : static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
269 : : static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
270 : : ReorderBufferTXN *txn, CommandId cid);
271 : :
272 : : /*
273 : : * ---------------------------------------
274 : : * Streaming support functions
275 : : * ---------------------------------------
276 : : */
277 : : static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
278 : : static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
279 : : static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
280 : : static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
281 : :
282 : : /* ---------------------------------------
283 : : * toast reassembly support
284 : : * ---------------------------------------
285 : : */
286 : : static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
287 : : static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
288 : : static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
289 : : Relation relation, ReorderBufferChange *change);
290 : : static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
291 : : Relation relation, ReorderBufferChange *change);
292 : :
293 : : /*
294 : : * ---------------------------------------
295 : : * memory accounting
296 : : * ---------------------------------------
297 : : */
298 : : static Size ReorderBufferChangeSize(ReorderBufferChange *change);
299 : : static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
300 : : ReorderBufferChange *change,
301 : : ReorderBufferTXN *txn,
302 : : bool addition, Size sz);
303 : :
304 : : /*
305 : : * Allocate a new ReorderBuffer and clean out any old serialized state from
306 : : * prior ReorderBuffer instances for the same slot.
307 : : */
308 : : ReorderBuffer *
3695 rhaas@postgresql.org 309 :CBC 951 : ReorderBufferAllocate(void)
310 : : {
311 : : ReorderBuffer *buffer;
312 : : HASHCTL hash_ctl;
313 : : MemoryContext new_ctx;
314 : :
2231 alvherre@alvh.no-ip. 315 [ - + ]: 951 : Assert(MyReplicationSlot != NULL);
316 : :
317 : : /* allocate memory in own context, to have better accountability */
3695 rhaas@postgresql.org 318 : 951 : new_ctx = AllocSetContextCreate(CurrentMemoryContext,
319 : : "ReorderBuffer",
320 : : ALLOCSET_DEFAULT_SIZES);
321 : :
322 : : buffer =
323 : 951 : (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324 : :
325 : 951 : memset(&hash_ctl, 0, sizeof(hash_ctl));
326 : :
327 : 951 : buffer->context = new_ctx;
328 : :
2603 andres@anarazel.de 329 : 951 : buffer->change_context = SlabContextCreate(new_ctx,
330 : : "Change",
331 : : SLAB_DEFAULT_BLOCK_SIZE,
332 : : sizeof(ReorderBufferChange));
333 : :
334 : 951 : buffer->txn_context = SlabContextCreate(new_ctx,
335 : : "TXN",
336 : : SLAB_DEFAULT_BLOCK_SIZE,
337 : : sizeof(ReorderBufferTXN));
338 : :
339 : : /*
340 : : * XXX the allocation sizes used below pre-date generation context's block
341 : : * growing code. These values should likely be benchmarked and set to
342 : : * more suitable values.
343 : : */
2334 simon@2ndQuadrant.co 344 : 951 : buffer->tup_context = GenerationContextCreate(new_ctx,
345 : : "Tuples",
346 : : SLAB_LARGE_BLOCK_SIZE,
347 : : SLAB_LARGE_BLOCK_SIZE,
348 : : SLAB_LARGE_BLOCK_SIZE);
349 : :
3695 rhaas@postgresql.org 350 : 951 : hash_ctl.keysize = sizeof(TransactionId);
351 : 951 : hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
352 : 951 : hash_ctl.hcxt = buffer->context;
353 : :
354 : 951 : buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
355 : : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
356 : :
357 : 951 : buffer->by_txn_last_xid = InvalidTransactionId;
358 : 951 : buffer->by_txn_last_txn = NULL;
359 : :
360 : 951 : buffer->outbuf = NULL;
361 : 951 : buffer->outbufsize = 0;
1611 akapila@postgresql.o 362 : 951 : buffer->size = 0;
363 : :
364 : : /* txn_heap is ordered by transaction size */
3 msawada@postgresql.o 365 :GNC 951 : buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL);
366 : :
1284 akapila@postgresql.o 367 :CBC 951 : buffer->spillTxns = 0;
368 : 951 : buffer->spillCount = 0;
369 : 951 : buffer->spillBytes = 0;
1263 370 : 951 : buffer->streamTxns = 0;
371 : 951 : buffer->streamCount = 0;
372 : 951 : buffer->streamBytes = 0;
1094 373 : 951 : buffer->totalTxns = 0;
374 : 951 : buffer->totalBytes = 0;
375 : :
3695 rhaas@postgresql.org 376 : 951 : buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
377 : :
378 : 951 : dlist_init(&buffer->toplevel_by_lsn);
2119 alvherre@alvh.no-ip. 379 : 951 : dlist_init(&buffer->txns_by_base_snapshot_lsn);
529 drowley@postgresql.o 380 : 951 : dclist_init(&buffer->catchange_txns);
381 : :
382 : : /*
383 : : * Ensure there's no stale data from prior uses of this slot, in case some
384 : : * prior exit avoided calling ReorderBufferFree. Failure to do this can
385 : : * produce duplicated txns, and it's very cheap if there's nothing there.
386 : : */
2231 alvherre@alvh.no-ip. 387 : 951 : ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
388 : :
3695 rhaas@postgresql.org 389 : 951 : return buffer;
390 : : }
391 : :
392 : : /*
393 : : * Free a ReorderBuffer
394 : : */
395 : : void
396 : 764 : ReorderBufferFree(ReorderBuffer *rb)
397 : : {
398 : 764 : MemoryContext context = rb->context;
399 : :
400 : : /*
401 : : * We free separately allocated data by entirely scrapping reorderbuffer's
402 : : * memory context.
403 : : */
404 : 764 : MemoryContextDelete(context);
405 : :
406 : : /* Free disk space used by unconsumed reorder buffers */
2231 alvherre@alvh.no-ip. 407 : 764 : ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
3695 rhaas@postgresql.org 408 : 764 : }
409 : :
410 : : /*
411 : : * Get an unused, possibly preallocated, ReorderBufferTXN.
412 : : */
413 : : static ReorderBufferTXN *
414 : 3580 : ReorderBufferGetTXN(ReorderBuffer *rb)
415 : : {
416 : : ReorderBufferTXN *txn;
417 : :
418 : : txn = (ReorderBufferTXN *)
2603 andres@anarazel.de 419 : 3580 : MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
420 : :
3695 rhaas@postgresql.org 421 : 3580 : memset(txn, 0, sizeof(ReorderBufferTXN));
422 : :
423 : 3580 : dlist_init(&txn->changes);
424 : 3580 : dlist_init(&txn->tuplecids);
425 : 3580 : dlist_init(&txn->subtxns);
426 : :
427 : : /* InvalidCommandId is not zero, so set it explicitly */
1345 akapila@postgresql.o 428 : 3580 : txn->command_id = InvalidCommandId;
1244 429 : 3580 : txn->output_plugin_private = NULL;
430 : :
3695 rhaas@postgresql.org 431 : 3580 : return txn;
432 : : }
433 : :
434 : : /*
435 : : * Free a ReorderBufferTXN.
436 : : */
437 : : static void
438 : 3530 : ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
439 : : {
440 : : /* clean the lookup cache if we were cached (quite likely) */
441 [ + + ]: 3530 : if (rb->by_txn_last_xid == txn->xid)
442 : : {
443 : 3345 : rb->by_txn_last_xid = InvalidTransactionId;
444 : 3345 : rb->by_txn_last_txn = NULL;
445 : : }
446 : :
447 : : /* free data that's contained */
448 : :
1196 akapila@postgresql.o 449 [ + + ]: 3530 : if (txn->gid != NULL)
450 : : {
451 : 38 : pfree(txn->gid);
452 : 38 : txn->gid = NULL;
453 : : }
454 : :
3695 rhaas@postgresql.org 455 [ + + ]: 3530 : if (txn->tuplecid_hash != NULL)
456 : : {
457 : 501 : hash_destroy(txn->tuplecid_hash);
458 : 501 : txn->tuplecid_hash = NULL;
459 : : }
460 : :
461 [ + + ]: 3530 : if (txn->invalidations)
462 : : {
463 : 1056 : pfree(txn->invalidations);
464 : 1056 : txn->invalidations = NULL;
465 : : }
466 : :
467 : : /* Reset the toast hash */
1034 akapila@postgresql.o 468 : 3530 : ReorderBufferToastReset(rb, txn);
469 : :
2603 andres@anarazel.de 470 : 3530 : pfree(txn);
3695 rhaas@postgresql.org 471 : 3530 : }
472 : :
473 : : /*
474 : : * Get a fresh ReorderBufferChange.
475 : : */
476 : : ReorderBufferChange *
477 : 1731554 : ReorderBufferGetChange(ReorderBuffer *rb)
478 : : {
479 : : ReorderBufferChange *change;
480 : :
481 : : change = (ReorderBufferChange *)
2603 andres@anarazel.de 482 : 1731554 : MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
483 : :
3695 rhaas@postgresql.org 484 : 1731554 : memset(change, 0, sizeof(ReorderBufferChange));
485 : 1731554 : return change;
486 : : }
487 : :
488 : : /*
489 : : * Free a ReorderBufferChange and update memory accounting, if requested.
490 : : */
491 : : void
1345 akapila@postgresql.o 492 : 1731326 : ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
493 : : bool upd_mem)
494 : : {
495 : : /* update memory accounting info */
496 [ + + ]: 1731326 : if (upd_mem)
11 msawada@postgresql.o 497 :GNC 195993 : ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
498 : : ReorderBufferChangeSize(change));
499 : :
500 : : /* free contained data */
3691 tgl@sss.pgh.pa.us 501 [ + + + + :CBC 1731326 : switch (change->action)
+ + - ]
502 : : {
503 : 1660693 : case REORDER_BUFFER_CHANGE_INSERT:
504 : : case REORDER_BUFFER_CHANGE_UPDATE:
505 : : case REORDER_BUFFER_CHANGE_DELETE:
506 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
507 [ + + ]: 1660693 : if (change->data.tp.newtuple)
508 : : {
76 msawada@postgresql.o 509 :GNC 1446860 : ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
3691 tgl@sss.pgh.pa.us 510 :CBC 1446860 : change->data.tp.newtuple = NULL;
511 : : }
512 : :
513 [ + + ]: 1660693 : if (change->data.tp.oldtuple)
514 : : {
76 msawada@postgresql.o 515 :GNC 146124 : ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
3691 tgl@sss.pgh.pa.us 516 :CBC 146124 : change->data.tp.oldtuple = NULL;
517 : : }
3695 rhaas@postgresql.org 518 : 1660693 : break;
2930 simon@2ndQuadrant.co 519 : 39 : case REORDER_BUFFER_CHANGE_MESSAGE:
520 [ + - ]: 39 : if (change->data.msg.prefix != NULL)
521 : 39 : pfree(change->data.msg.prefix);
522 : 39 : change->data.msg.prefix = NULL;
523 [ + - ]: 39 : if (change->data.msg.message != NULL)
524 : 39 : pfree(change->data.msg.message);
525 : 39 : change->data.msg.message = NULL;
526 : 39 : break;
1277 akapila@postgresql.o 527 : 4707 : case REORDER_BUFFER_CHANGE_INVALIDATION:
528 [ + - ]: 4707 : if (change->data.inval.invalidations)
529 : 4707 : pfree(change->data.inval.invalidations);
530 : 4707 : change->data.inval.invalidations = NULL;
531 : 4707 : break;
3695 rhaas@postgresql.org 532 : 1089 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
3691 tgl@sss.pgh.pa.us 533 [ + - ]: 1089 : if (change->data.snapshot)
534 : : {
535 : 1089 : ReorderBufferFreeSnap(rb, change->data.snapshot);
536 : 1089 : change->data.snapshot = NULL;
537 : : }
3695 rhaas@postgresql.org 538 : 1089 : break;
539 : : /* no data in addition to the struct itself */
2050 tomas.vondra@postgre 540 : 65 : case REORDER_BUFFER_CHANGE_TRUNCATE:
541 [ + - ]: 65 : if (change->data.truncate.relids != NULL)
542 : : {
543 : 65 : ReorderBufferReturnRelids(rb, change->data.truncate.relids);
544 : 65 : change->data.truncate.relids = NULL;
545 : : }
546 : 65 : break;
3264 andres@anarazel.de 547 : 64733 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
548 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
549 : : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
550 : : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
3695 rhaas@postgresql.org 551 : 64733 : break;
552 : : }
553 : :
2603 andres@anarazel.de 554 : 1731326 : pfree(change);
3695 rhaas@postgresql.org 555 : 1731326 : }
556 : :
557 : : /*
558 : : * Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
559 : : * overhead).
560 : : */
561 : : HeapTuple
2962 andres@anarazel.de 562 : 1593078 : ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
563 : : {
564 : : HeapTuple tuple;
565 : : Size alloc_len;
566 : :
567 : 1593078 : alloc_len = tuple_len + SizeofHeapTupleHeader;
568 : :
76 msawada@postgresql.o 569 :GNC 1593078 : tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context,
570 : : HEAPTUPLESIZE + alloc_len);
571 : 1593078 : tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
572 : :
3695 rhaas@postgresql.org 573 :CBC 1593078 : return tuple;
574 : : }
575 : :
576 : : /*
577 : : * Free a HeapTuple returned by ReorderBufferGetTupleBuf().
578 : : */
579 : : void
76 msawada@postgresql.o 580 :GNC 1592984 : ReorderBufferReturnTupleBuf(HeapTuple tuple)
581 : : {
2334 simon@2ndQuadrant.co 582 :CBC 1592984 : pfree(tuple);
3695 rhaas@postgresql.org 583 : 1592984 : }
584 : :
585 : : /*
586 : : * Get an array for relids of truncated relations.
587 : : *
588 : : * We use the global memory context (for the whole reorder buffer), because
589 : : * none of the existing ones seems like a good match (some are SLAB, so we
590 : : * can't use those, and tup_context is meant for tuple data, not relids). We
591 : : * could add yet another context, but it seems like an overkill - TRUNCATE is
592 : : * not particularly common operation, so it does not seem worth it.
593 : : */
594 : : Oid *
2050 tomas.vondra@postgre 595 : 69 : ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
596 : : {
597 : : Oid *relids;
598 : : Size alloc_len;
599 : :
600 : 69 : alloc_len = sizeof(Oid) * nrelids;
601 : :
602 : 69 : relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
603 : :
604 : 69 : return relids;
605 : : }
606 : :
607 : : /*
608 : : * Free an array of relids.
609 : : */
610 : : void
611 : 65 : ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
612 : : {
613 : 65 : pfree(relids);
614 : 65 : }
615 : :
616 : : /*
617 : : * Return the ReorderBufferTXN from the given buffer, specified by Xid.
618 : : * If create is true, and a transaction doesn't already exist, create it
619 : : * (with the given LSN, and as top transaction if that's specified);
620 : : * when this happens, is_new is set to true.
621 : : */
622 : : static ReorderBufferTXN *
3695 rhaas@postgresql.org 623 : 5767037 : ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
624 : : bool *is_new, XLogRecPtr lsn, bool create_as_top)
625 : : {
626 : : ReorderBufferTXN *txn;
627 : : ReorderBufferTXNByIdEnt *ent;
628 : : bool found;
629 : :
630 [ - + ]: 5767037 : Assert(TransactionIdIsValid(xid));
631 : :
632 : : /*
633 : : * Check the one-entry lookup cache first
634 : : */
635 [ + + ]: 5767037 : if (TransactionIdIsValid(rb->by_txn_last_xid) &&
636 [ + + ]: 5763659 : rb->by_txn_last_xid == xid)
637 : : {
638 : 4774047 : txn = rb->by_txn_last_txn;
639 : :
640 [ + + ]: 4774047 : if (txn != NULL)
641 : : {
642 : : /* found it, and it's valid */
643 [ + + ]: 4774035 : if (is_new)
644 : 2805 : *is_new = false;
645 : 4774035 : return txn;
646 : : }
647 : :
648 : : /*
649 : : * cached as non-existent, and asked not to create? Then nothing else
650 : : * to do.
651 : : */
652 [ + + ]: 12 : if (!create)
653 : 9 : return NULL;
654 : : /* otherwise fall through to create it */
655 : : }
656 : :
657 : : /*
658 : : * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
659 : : * create an entry.
660 : : */
661 : :
662 : : /* search the lookup table */
663 : : ent = (ReorderBufferTXNByIdEnt *)
664 : 992993 : hash_search(rb->by_txn,
665 : : &xid,
666 : : create ? HASH_ENTER : HASH_FIND,
667 : : &found);
668 [ + + ]: 992993 : if (found)
669 : 988130 : txn = ent->txn;
670 [ + + ]: 4863 : else if (create)
671 : : {
672 : : /* initialize the new entry, if creation was requested */
673 [ - + ]: 3580 : Assert(ent != NULL);
2119 alvherre@alvh.no-ip. 674 [ - + ]: 3580 : Assert(lsn != InvalidXLogRecPtr);
675 : :
3695 rhaas@postgresql.org 676 : 3580 : ent->txn = ReorderBufferGetTXN(rb);
677 : 3580 : ent->txn->xid = xid;
678 : 3580 : txn = ent->txn;
679 : 3580 : txn->first_lsn = lsn;
680 : 3580 : txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
681 : :
682 [ + + ]: 3580 : if (create_as_top)
683 : : {
684 : 2903 : dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
685 : 2903 : AssertTXNLsnOrder(rb);
686 : : }
687 : : }
688 : : else
689 : 1283 : txn = NULL; /* not found and not asked to create */
690 : :
691 : : /* update cache */
692 : 992993 : rb->by_txn_last_xid = xid;
693 : 992993 : rb->by_txn_last_txn = txn;
694 : :
695 [ + + ]: 992993 : if (is_new)
696 : 1787 : *is_new = !found;
697 : :
2940 andres@anarazel.de 698 [ + + - + ]: 992993 : Assert(!create || txn != NULL);
3695 rhaas@postgresql.org 699 : 992993 : return txn;
700 : : }
701 : :
702 : : /*
703 : : * Record the partial change for the streaming of in-progress transactions. We
704 : : * can stream only complete changes so if we have a partial change like toast
705 : : * table insert or speculative insert then we mark such a 'txn' so that it
706 : : * can't be streamed. We also ensure that if the changes in such a 'txn' can
707 : : * be streamed and are above logical_decoding_work_mem threshold then we stream
708 : : * them as soon as we have a complete change.
709 : : */
710 : : static void
1345 akapila@postgresql.o 711 : 1535468 : ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
712 : : ReorderBufferChange *change,
713 : : bool toast_insert)
714 : : {
715 : : ReorderBufferTXN *toptxn;
716 : :
717 : : /*
718 : : * The partial changes need to be processed only while streaming
719 : : * in-progress transactions.
720 : : */
721 [ + + ]: 1535468 : if (!ReorderBufferCanStream(rb))
722 : 1235844 : return;
723 : :
724 : : /* Get the top transaction. */
394 725 [ + + ]: 299624 : toptxn = rbtxn_get_toptxn(txn);
726 : :
727 : : /*
728 : : * Indicate a partial change for toast inserts. The change will be
729 : : * considered as complete once we get the insert or update on the main
730 : : * table and we are sure that the pending toast chunks are not required
731 : : * anymore.
732 : : *
733 : : * If we allow streaming when there are pending toast chunks then such
734 : : * chunks won't be released till the insert (multi_insert) is complete and
735 : : * we expect the txn to have streamed all changes after streaming. This
736 : : * restriction is mainly to ensure the correctness of streamed
737 : : * transactions and it doesn't seem worth uplifting such a restriction
738 : : * just to allow this case because anyway we will stream the transaction
739 : : * once such an insert is complete.
740 : : */
1345 741 [ + + ]: 299624 : if (toast_insert)
1053 742 : 1459 : toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
743 [ + + ]: 298165 : else if (rbtxn_has_partial_change(toptxn) &&
744 [ - + - - : 33 : IsInsertOrUpdate(change->action) &&
- - ]
745 [ + + ]: 33 : change->data.tp.clear_toast_afterwards)
746 : 23 : toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
747 : :
748 : : /*
749 : : * Indicate a partial change for speculative inserts. The change will be
750 : : * considered as complete once we get the speculative confirm or abort
751 : : * token.
752 : : */
1345 753 [ - + ]: 299624 : if (IsSpecInsert(change->action))
1053 akapila@postgresql.o 754 :UBC 0 : toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
1053 akapila@postgresql.o 755 [ + + ]:CBC 299624 : else if (rbtxn_has_partial_change(toptxn) &&
1019 756 [ + - - + ]: 1469 : IsSpecConfirmOrAbort(change->action))
1053 akapila@postgresql.o 757 :UBC 0 : toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
758 : :
759 : : /*
760 : : * Stream the transaction if it is serialized before and the changes are
761 : : * now complete in the top-level transaction.
762 : : *
763 : : * The reason for doing the streaming of such a transaction as soon as we
764 : : * get the complete change for it is that previously it would have reached
765 : : * the memory threshold and wouldn't get streamed because of incomplete
766 : : * changes. Delaying such transactions would increase apply lag for them.
767 : : */
1345 akapila@postgresql.o 768 [ + + ]:CBC 299624 : if (ReorderBufferCanStartStreaming(rb) &&
1053 769 [ + + ]: 162856 : !(rbtxn_has_partial_change(toptxn)) &&
493 770 [ + + ]: 161427 : rbtxn_is_serialized(txn) &&
771 [ + - ]: 5 : rbtxn_has_streamable_change(toptxn))
1345 772 : 5 : ReorderBufferStreamTXN(rb, toptxn);
773 : : }
774 : :
775 : : /*
776 : : * Queue a change into a transaction so it can be replayed upon commit or will be
777 : : * streamed when we reach logical_decoding_work_mem threshold.
778 : : */
779 : : void
3695 rhaas@postgresql.org 780 : 1535536 : ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
781 : : ReorderBufferChange *change, bool toast_insert)
782 : : {
783 : : ReorderBufferTXN *txn;
784 : :
785 : 1535536 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
786 : :
787 : : /*
788 : : * While streaming the previous changes we have detected that the
789 : : * transaction is aborted. So there is no point in collecting further
790 : : * changes for it.
791 : : */
1345 akapila@postgresql.o 792 [ + + ]: 1535536 : if (txn->concurrent_abort)
793 : : {
794 : : /*
795 : : * We don't need to update memory accounting for this change as we
796 : : * have not added it to the queue yet.
797 : : */
798 : 68 : ReorderBufferReturnChange(rb, change, false);
799 : 68 : return;
800 : : }
801 : :
802 : : /*
803 : : * The changes that are sent downstream are considered streamable. We
804 : : * remember such transactions so that only those will later be considered
805 : : * for streaming.
806 : : */
493 807 [ + + ]: 1535468 : if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
808 [ + + ]: 425836 : change->action == REORDER_BUFFER_CHANGE_UPDATE ||
809 [ + + ]: 266095 : change->action == REORDER_BUFFER_CHANGE_DELETE ||
810 [ + + ]: 64214 : change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
811 [ + + ]: 46298 : change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
812 [ + + ]: 46232 : change->action == REORDER_BUFFER_CHANGE_MESSAGE)
813 : : {
394 814 [ + + ]: 1489274 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
815 : :
493 816 : 1489274 : toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
817 : : }
818 : :
3695 rhaas@postgresql.org 819 : 1535468 : change->lsn = lsn;
1611 akapila@postgresql.o 820 : 1535468 : change->txn = txn;
821 : :
3695 rhaas@postgresql.org 822 [ - + ]: 1535468 : Assert(InvalidXLogRecPtr != lsn);
823 : 1535468 : dlist_push_tail(&txn->changes, &change->node);
824 : 1535468 : txn->nentries++;
825 : 1535468 : txn->nentries_mem++;
826 : :
827 : : /* update memory accounting information */
11 msawada@postgresql.o 828 :GNC 1535468 : ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
829 : : ReorderBufferChangeSize(change));
830 : :
831 : : /* process partial change */
1345 akapila@postgresql.o 832 :CBC 1535468 : ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
833 : :
834 : : /* check the memory limits and evict something if needed */
1611 835 : 1535468 : ReorderBufferCheckMemoryLimit(rb);
836 : : }
837 : :
838 : : /*
839 : : * A transactional message is queued to be processed upon commit and a
840 : : * non-transactional message gets processed immediately.
841 : : */
842 : : void
2930 simon@2ndQuadrant.co 843 : 46 : ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
844 : : Snapshot snap, XLogRecPtr lsn,
845 : : bool transactional, const char *prefix,
846 : : Size message_size, const char *message)
847 : : {
848 [ + + ]: 46 : if (transactional)
849 : : {
850 : : MemoryContext oldcontext;
851 : : ReorderBufferChange *change;
852 : :
853 [ - + ]: 38 : Assert(xid != InvalidTransactionId);
854 : :
855 : : /*
856 : : * We don't expect snapshots for transactional changes - we'll use the
857 : : * snapshot derived later during apply (unless the change gets
858 : : * skipped).
859 : : */
417 tomas.vondra@postgre 860 [ - + ]: 38 : Assert(!snap);
861 : :
2930 simon@2ndQuadrant.co 862 : 38 : oldcontext = MemoryContextSwitchTo(rb->context);
863 : :
864 : 38 : change = ReorderBufferGetChange(rb);
865 : 38 : change->action = REORDER_BUFFER_CHANGE_MESSAGE;
866 : 38 : change->data.msg.prefix = pstrdup(prefix);
867 : 38 : change->data.msg.message_size = message_size;
868 : 38 : change->data.msg.message = palloc(message_size);
869 : 38 : memcpy(change->data.msg.message, message, message_size);
870 : :
1345 akapila@postgresql.o 871 : 38 : ReorderBufferQueueChange(rb, xid, lsn, change, false);
872 : :
2930 simon@2ndQuadrant.co 873 : 38 : MemoryContextSwitchTo(oldcontext);
874 : : }
875 : : else
876 : : {
2866 rhaas@postgresql.org 877 : 8 : ReorderBufferTXN *txn = NULL;
575 pg@bowt.ie 878 : 8 : volatile Snapshot snapshot_now = snap;
879 : :
880 : : /* Non-transactional changes require a valid snapshot. */
417 tomas.vondra@postgre 881 [ - + ]: 8 : Assert(snapshot_now);
882 : :
2930 simon@2ndQuadrant.co 883 [ + + ]: 8 : if (xid != InvalidTransactionId)
884 : 3 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
885 : :
886 : : /* setup snapshot to allow catalog access */
887 : 8 : SetupHistoricSnapshot(snapshot_now, NULL);
888 [ + - ]: 8 : PG_TRY();
889 : : {
890 : 8 : rb->message(rb, txn, lsn, false, prefix, message_size, message);
891 : :
892 : 8 : TeardownHistoricSnapshot(false);
893 : : }
2930 simon@2ndQuadrant.co 894 :UBC 0 : PG_CATCH();
895 : : {
896 : 0 : TeardownHistoricSnapshot(true);
897 : 0 : PG_RE_THROW();
898 : : }
2930 simon@2ndQuadrant.co 899 [ - + ]:CBC 8 : PG_END_TRY();
900 : : }
901 : 46 : }
902 : :
903 : : /*
904 : : * AssertTXNLsnOrder
905 : : * Verify LSN ordering of transaction lists in the reorderbuffer
906 : : *
907 : : * Other LSN-related invariants are checked too.
908 : : *
909 : : * No-op if assertions are not in use.
910 : : */
911 : : static void
3695 rhaas@postgresql.org 912 : 7247 : AssertTXNLsnOrder(ReorderBuffer *rb)
913 : : {
914 : : #ifdef USE_ASSERT_CHECKING
542 akapila@postgresql.o 915 : 7247 : LogicalDecodingContext *ctx = rb->private_data;
916 : : dlist_iter iter;
3695 rhaas@postgresql.org 917 : 7247 : XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
2119 alvherre@alvh.no-ip. 918 : 7247 : XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
919 : :
920 : : /*
921 : : * Skip the verification if we don't reach the LSN at which we start
922 : : * decoding the contents of transactions yet because until we reach the
923 : : * LSN, we could have transactions that don't have the association between
924 : : * the top-level transaction and subtransaction yet and consequently have
925 : : * the same LSN. We don't guarantee this association until we try to
926 : : * decode the actual contents of transaction. The ordering of the records
927 : : * prior to the start_decoding_at LSN should have been checked before the
928 : : * restart.
929 : : */
542 akapila@postgresql.o 930 [ + + ]: 7247 : if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr))
931 : 3533 : return;
932 : :
3695 rhaas@postgresql.org 933 [ + - + + ]: 6983 : dlist_foreach(iter, &rb->toplevel_by_lsn)
934 : : {
2119 alvherre@alvh.no-ip. 935 : 3269 : ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
936 : : iter.cur);
937 : :
938 : : /* start LSN must be set */
3695 rhaas@postgresql.org 939 [ - + ]: 3269 : Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
940 : :
941 : : /* If there is an end LSN, it must be higher than start LSN */
942 [ + + ]: 3269 : if (cur_txn->end_lsn != InvalidXLogRecPtr)
943 [ - + ]: 20 : Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
944 : :
945 : : /* Current initial LSN must be strictly higher than previous */
946 [ + + ]: 3269 : if (prev_first_lsn != InvalidXLogRecPtr)
947 [ - + ]: 240 : Assert(prev_first_lsn < cur_txn->first_lsn);
948 : :
949 : : /* known-as-subtxn txns must not be listed */
1556 alvherre@alvh.no-ip. 950 [ - + ]: 3269 : Assert(!rbtxn_is_known_subxact(cur_txn));
951 : :
3695 rhaas@postgresql.org 952 : 3269 : prev_first_lsn = cur_txn->first_lsn;
953 : : }
954 : :
2119 alvherre@alvh.no-ip. 955 [ + - + + ]: 5538 : dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
956 : : {
957 : 1824 : ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
958 : : base_snapshot_node,
959 : : iter.cur);
960 : :
961 : : /* base snapshot (and its LSN) must be set */
962 [ - + ]: 1824 : Assert(cur_txn->base_snapshot != NULL);
963 [ - + ]: 1824 : Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
964 : :
965 : : /* current LSN must be strictly higher than previous */
966 [ + + ]: 1824 : if (prev_base_snap_lsn != InvalidXLogRecPtr)
967 [ - + ]: 179 : Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
968 : :
969 : : /* known-as-subtxn txns must not be listed */
1556 970 [ - + ]: 1824 : Assert(!rbtxn_is_known_subxact(cur_txn));
971 : :
2119 972 : 1824 : prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
973 : : }
974 : : #endif
975 : : }
976 : :
977 : : /*
978 : : * AssertChangeLsnOrder
979 : : *
980 : : * Check ordering of changes in the (sub)transaction.
981 : : */
982 : : static void
1345 akapila@postgresql.o 983 : 2378 : AssertChangeLsnOrder(ReorderBufferTXN *txn)
984 : : {
985 : : #ifdef USE_ASSERT_CHECKING
986 : : dlist_iter iter;
987 : 2378 : XLogRecPtr prev_lsn = txn->first_lsn;
988 : :
989 [ + - + + ]: 197168 : dlist_foreach(iter, &txn->changes)
990 : : {
991 : : ReorderBufferChange *cur_change;
992 : :
993 : 194790 : cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
994 : :
995 [ - + ]: 194790 : Assert(txn->first_lsn != InvalidXLogRecPtr);
996 [ - + ]: 194790 : Assert(cur_change->lsn != InvalidXLogRecPtr);
997 [ - + ]: 194790 : Assert(txn->first_lsn <= cur_change->lsn);
998 : :
999 [ + + ]: 194790 : if (txn->end_lsn != InvalidXLogRecPtr)
1000 [ - + ]: 36671 : Assert(cur_change->lsn <= txn->end_lsn);
1001 : :
1002 [ - + ]: 194790 : Assert(prev_lsn <= cur_change->lsn);
1003 : :
1004 : 194790 : prev_lsn = cur_change->lsn;
1005 : : }
1006 : : #endif
1007 : 2378 : }
1008 : :
1009 : : /*
1010 : : * ReorderBufferGetOldestTXN
1011 : : * Return oldest transaction in reorderbuffer
1012 : : */
1013 : : ReorderBufferTXN *
3695 rhaas@postgresql.org 1014 : 392 : ReorderBufferGetOldestTXN(ReorderBuffer *rb)
1015 : : {
1016 : : ReorderBufferTXN *txn;
1017 : :
2119 alvherre@alvh.no-ip. 1018 : 392 : AssertTXNLsnOrder(rb);
1019 : :
3695 rhaas@postgresql.org 1020 [ + + ]: 392 : if (dlist_is_empty(&rb->toplevel_by_lsn))
1021 : 346 : return NULL;
1022 : :
1023 : 46 : txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
1024 : :
1556 alvherre@alvh.no-ip. 1025 [ - + ]: 46 : Assert(!rbtxn_is_known_subxact(txn));
3695 rhaas@postgresql.org 1026 [ - + ]: 46 : Assert(txn->first_lsn != InvalidXLogRecPtr);
1027 : 46 : return txn;
1028 : : }
1029 : :
1030 : : /*
1031 : : * ReorderBufferGetOldestXmin
1032 : : * Return oldest Xmin in reorderbuffer
1033 : : *
1034 : : * Returns oldest possibly running Xid from the point of view of snapshots
1035 : : * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
1036 : : * there are none.
1037 : : *
1038 : : * Since snapshots are assigned monotonically, this equals the Xmin of the
1039 : : * base snapshot with minimal base_snapshot_lsn.
1040 : : */
1041 : : TransactionId
2119 alvherre@alvh.no-ip. 1042 : 409 : ReorderBufferGetOldestXmin(ReorderBuffer *rb)
1043 : : {
1044 : : ReorderBufferTXN *txn;
1045 : :
1046 : 409 : AssertTXNLsnOrder(rb);
1047 : :
1048 [ + + ]: 409 : if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
1049 : 373 : return InvalidTransactionId;
1050 : :
1051 : 36 : txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1052 : : &rb->txns_by_base_snapshot_lsn);
1053 : 36 : return txn->base_snapshot->xmin;
1054 : : }
1055 : :
1056 : : void
3695 rhaas@postgresql.org 1057 : 437 : ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
1058 : : {
1059 : 437 : rb->current_restart_decoding_lsn = ptr;
1060 : 437 : }
1061 : :
1062 : : /*
1063 : : * ReorderBufferAssignChild
1064 : : *
1065 : : * Make note that we know that subxid is a subtransaction of xid, seen as of
1066 : : * the given lsn.
1067 : : */
1068 : : void
1069 : 863 : ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
1070 : : TransactionId subxid, XLogRecPtr lsn)
1071 : : {
1072 : : ReorderBufferTXN *txn;
1073 : : ReorderBufferTXN *subtxn;
1074 : : bool new_top;
1075 : : bool new_sub;
1076 : :
1077 : 863 : txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1078 : 863 : subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1079 : :
2119 alvherre@alvh.no-ip. 1080 [ + + ]: 863 : if (!new_sub)
1081 : : {
1556 1082 [ + - ]: 186 : if (rbtxn_is_known_subxact(subtxn))
1083 : : {
1084 : : /* already associated, nothing to do */
2119 1085 : 186 : return;
1086 : : }
1087 : : else
1088 : : {
1089 : : /*
1090 : : * We already saw this transaction, but initially added it to the
1091 : : * list of top-level txns. Now that we know it's not top-level,
1092 : : * remove it from there.
1093 : : */
2119 alvherre@alvh.no-ip. 1094 :UBC 0 : dlist_delete(&subtxn->node);
1095 : : }
1096 : : }
1097 : :
1556 alvherre@alvh.no-ip. 1098 :CBC 677 : subtxn->txn_flags |= RBTXN_IS_SUBXACT;
2119 1099 : 677 : subtxn->toplevel_xid = xid;
1100 [ - + ]: 677 : Assert(subtxn->nsubtxns == 0);
1101 : :
1102 : : /* set the reference to top-level transaction */
1361 akapila@postgresql.o 1103 : 677 : subtxn->toptxn = txn;
1104 : :
1105 : : /* add to subtransaction list */
2119 alvherre@alvh.no-ip. 1106 : 677 : dlist_push_tail(&txn->subtxns, &subtxn->node);
1107 : 677 : txn->nsubtxns++;
1108 : :
1109 : : /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1110 : 677 : ReorderBufferTransferSnapToParent(txn, subtxn);
1111 : :
1112 : : /* Verify LSN-ordering invariant */
1113 : 677 : AssertTXNLsnOrder(rb);
1114 : : }
1115 : :
1116 : : /*
1117 : : * ReorderBufferTransferSnapToParent
1118 : : * Transfer base snapshot from subtxn to top-level txn, if needed
1119 : : *
1120 : : * This is done if the top-level txn doesn't have a base snapshot, or if the
1121 : : * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1122 : : * snapshot's LSN. This can happen if there are no changes in the toplevel
1123 : : * txn but there are some in the subtxn, or the first change in subtxn has
1124 : : * earlier LSN than first change in the top-level txn and we learned about
1125 : : * their kinship only now.
1126 : : *
1127 : : * The subtransaction's snapshot is cleared regardless of the transfer
1128 : : * happening, since it's not needed anymore in either case.
1129 : : *
1130 : : * We do this as soon as we become aware of their kinship, to avoid queueing
1131 : : * extra snapshots to txns known-as-subtxns -- only top-level txns will
1132 : : * receive further snapshots.
1133 : : */
1134 : : static void
1135 : 681 : ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
1136 : : ReorderBufferTXN *subtxn)
1137 : : {
1138 [ - + ]: 681 : Assert(subtxn->toplevel_xid == txn->xid);
1139 : :
1140 [ - + ]: 681 : if (subtxn->base_snapshot != NULL)
1141 : : {
2119 alvherre@alvh.no-ip. 1142 [ # # ]:UBC 0 : if (txn->base_snapshot == NULL ||
1143 [ # # ]: 0 : subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1144 : : {
1145 : : /*
1146 : : * If the toplevel transaction already has a base snapshot but
1147 : : * it's newer than the subxact's, purge it.
1148 : : */
1149 [ # # ]: 0 : if (txn->base_snapshot != NULL)
1150 : : {
1151 : 0 : SnapBuildSnapDecRefcount(txn->base_snapshot);
1152 : 0 : dlist_delete(&txn->base_snapshot_node);
1153 : : }
1154 : :
1155 : : /*
1156 : : * The snapshot is now the top transaction's; transfer it, and
1157 : : * adjust the list position of the top transaction in the list by
1158 : : * moving it to where the subtransaction is.
1159 : : */
1160 : 0 : txn->base_snapshot = subtxn->base_snapshot;
1161 : 0 : txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1162 : 0 : dlist_insert_before(&subtxn->base_snapshot_node,
1163 : : &txn->base_snapshot_node);
1164 : :
1165 : : /*
1166 : : * The subtransaction doesn't have a snapshot anymore (so it
1167 : : * mustn't be in the list.)
1168 : : */
1169 : 0 : subtxn->base_snapshot = NULL;
1170 : 0 : subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1171 : 0 : dlist_delete(&subtxn->base_snapshot_node);
1172 : : }
1173 : : else
1174 : : {
1175 : : /* Base snap of toplevel is fine, so subxact's is not needed */
1176 : 0 : SnapBuildSnapDecRefcount(subtxn->base_snapshot);
1177 : 0 : dlist_delete(&subtxn->base_snapshot_node);
1178 : 0 : subtxn->base_snapshot = NULL;
1179 : 0 : subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1180 : : }
1181 : : }
3695 rhaas@postgresql.org 1182 :CBC 681 : }
1183 : :
1184 : : /*
1185 : : * Associate a subtransaction with its toplevel transaction at commit
1186 : : * time. There may be no further changes added after this.
1187 : : */
1188 : : void
1189 : 267 : ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
1190 : : TransactionId subxid, XLogRecPtr commit_lsn,
1191 : : XLogRecPtr end_lsn)
1192 : : {
1193 : : ReorderBufferTXN *subtxn;
1194 : :
1195 : 267 : subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1196 : : InvalidXLogRecPtr, false);
1197 : :
1198 : : /*
1199 : : * No need to do anything if that subtxn didn't contain any changes
1200 : : */
1201 [ + + ]: 267 : if (!subtxn)
1202 : 81 : return;
1203 : :
1204 : 186 : subtxn->final_lsn = commit_lsn;
1205 : 186 : subtxn->end_lsn = end_lsn;
1206 : :
1207 : : /*
1208 : : * Assign this subxact as a child of the toplevel xact (no-op if already
1209 : : * done.)
1210 : : */
2119 alvherre@alvh.no-ip. 1211 : 186 : ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1212 : : }
1213 : :
1214 : :
1215 : : /*
1216 : : * Support for efficiently iterating over a transaction's and its
1217 : : * subtransactions' changes.
1218 : : *
1219 : : * We do by doing a k-way merge between transactions/subtransactions. For that
1220 : : * we model the current heads of the different transactions as a binary heap
1221 : : * so we easily know which (sub-)transaction has the change with the smallest
1222 : : * lsn next.
1223 : : *
1224 : : * We assume the changes in individual transactions are already sorted by LSN.
1225 : : */
1226 : :
1227 : : /*
1228 : : * Binary heap comparison function.
1229 : : */
1230 : : static int
3695 rhaas@postgresql.org 1231 : 52082 : ReorderBufferIterCompare(Datum a, Datum b, void *arg)
1232 : : {
1233 : 52082 : ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
1234 : 52082 : XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1235 : 52082 : XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1236 : :
1237 [ + + ]: 52082 : if (pos_a < pos_b)
1238 : 50756 : return 1;
1239 [ - + ]: 1326 : else if (pos_a == pos_b)
3695 rhaas@postgresql.org 1240 :UBC 0 : return 0;
3695 rhaas@postgresql.org 1241 :CBC 1326 : return -1;
1242 : : }
1243 : :
1244 : : /*
1245 : : * Allocate & initialize an iterator which iterates in lsn order over a
1246 : : * transaction and all its subtransactions.
1247 : : *
1248 : : * Note: The iterator state is returned through iter_state parameter rather
1249 : : * than the function's return value. This is because the state gets cleaned up
1250 : : * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1251 : : * back the state even if this function throws an exception.
1252 : : */
1253 : : static void
1583 akapila@postgresql.o 1254 : 1916 : ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
1255 : : ReorderBufferIterTXNState *volatile *iter_state)
1256 : : {
3695 rhaas@postgresql.org 1257 : 1916 : Size nr_txns = 0;
1258 : : ReorderBufferIterTXNState *state;
1259 : : dlist_iter cur_txn_i;
1260 : : int32 off;
1261 : :
1583 akapila@postgresql.o 1262 : 1916 : *iter_state = NULL;
1263 : :
1264 : : /* Check ordering of changes in the toplevel transaction. */
1345 1265 : 1916 : AssertChangeLsnOrder(txn);
1266 : :
1267 : : /*
1268 : : * Calculate the size of our heap: one element for every transaction that
1269 : : * contains changes. (Besides the transactions already in the reorder
1270 : : * buffer, we count the one we were directly passed.)
1271 : : */
3695 rhaas@postgresql.org 1272 [ + + ]: 1916 : if (txn->nentries > 0)
1273 : 1735 : nr_txns++;
1274 : :
1275 [ + - + + ]: 2378 : dlist_foreach(cur_txn_i, &txn->subtxns)
1276 : : {
1277 : : ReorderBufferTXN *cur_txn;
1278 : :
1279 : 462 : cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1280 : :
1281 : : /* Check ordering of changes in this subtransaction. */
1345 akapila@postgresql.o 1282 : 462 : AssertChangeLsnOrder(cur_txn);
1283 : :
3695 rhaas@postgresql.org 1284 [ + + ]: 462 : if (cur_txn->nentries > 0)
1285 : 300 : nr_txns++;
1286 : : }
1287 : :
1288 : : /* allocate iteration state */
1289 : : state = (ReorderBufferIterTXNState *)
1290 : 1916 : MemoryContextAllocZero(rb->context,
1291 : : sizeof(ReorderBufferIterTXNState) +
1292 : 1916 : sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1293 : :
1294 : 1916 : state->nr_txns = nr_txns;
1295 : 1916 : dlist_init(&state->old_change);
1296 : :
1297 [ + + ]: 3951 : for (off = 0; off < state->nr_txns; off++)
1298 : : {
1583 akapila@postgresql.o 1299 : 2035 : state->entries[off].file.vfd = -1;
3695 rhaas@postgresql.org 1300 : 2035 : state->entries[off].segno = 0;
1301 : : }
1302 : :
1303 : : /* allocate heap */
1304 : 1916 : state->heap = binaryheap_allocate(state->nr_txns,
1305 : : ReorderBufferIterCompare,
1306 : : state);
1307 : :
1308 : : /* Now that the state fields are initialized, it is safe to return it. */
1583 akapila@postgresql.o 1309 : 1916 : *iter_state = state;
1310 : :
1311 : : /*
1312 : : * Now insert items into the binary heap, in an unordered fashion. (We
1313 : : * will run a heap assembly step at the end; this is more efficient.)
1314 : : */
1315 : :
3695 rhaas@postgresql.org 1316 : 1916 : off = 0;
1317 : :
1318 : : /* add toplevel transaction if it contains changes */
1319 [ + + ]: 1916 : if (txn->nentries > 0)
1320 : : {
1321 : : ReorderBufferChange *cur_change;
1322 : :
1556 alvherre@alvh.no-ip. 1323 [ + + ]: 1735 : if (rbtxn_is_serialized(txn))
1324 : : {
1325 : : /* serialize remaining changes */
2750 andres@anarazel.de 1326 : 20 : ReorderBufferSerializeTXN(rb, txn);
1583 akapila@postgresql.o 1327 : 20 : ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1328 : : &state->entries[off].segno);
1329 : : }
1330 : :
3695 rhaas@postgresql.org 1331 : 1735 : cur_change = dlist_head_element(ReorderBufferChange, node,
1332 : : &txn->changes);
1333 : :
1334 : 1735 : state->entries[off].lsn = cur_change->lsn;
1335 : 1735 : state->entries[off].change = cur_change;
1336 : 1735 : state->entries[off].txn = txn;
1337 : :
1338 : 1735 : binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1339 : : }
1340 : :
1341 : : /* add subtransactions if they contain changes */
1342 [ + - + + ]: 2378 : dlist_foreach(cur_txn_i, &txn->subtxns)
1343 : : {
1344 : : ReorderBufferTXN *cur_txn;
1345 : :
1346 : 462 : cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1347 : :
1348 [ + + ]: 462 : if (cur_txn->nentries > 0)
1349 : : {
1350 : : ReorderBufferChange *cur_change;
1351 : :
1556 alvherre@alvh.no-ip. 1352 [ + + ]: 300 : if (rbtxn_is_serialized(cur_txn))
1353 : : {
1354 : : /* serialize remaining changes */
2750 andres@anarazel.de 1355 : 16 : ReorderBufferSerializeTXN(rb, cur_txn);
3695 rhaas@postgresql.org 1356 : 16 : ReorderBufferRestoreChanges(rb, cur_txn,
1357 : : &state->entries[off].file,
1358 : : &state->entries[off].segno);
1359 : : }
1360 : 300 : cur_change = dlist_head_element(ReorderBufferChange, node,
1361 : : &cur_txn->changes);
1362 : :
1363 : 300 : state->entries[off].lsn = cur_change->lsn;
1364 : 300 : state->entries[off].change = cur_change;
1365 : 300 : state->entries[off].txn = cur_txn;
1366 : :
1367 : 300 : binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1368 : : }
1369 : : }
1370 : :
1371 : : /* assemble a valid binary heap */
1372 : 1916 : binaryheap_build(state->heap);
1373 : 1916 : }
1374 : :
1375 : : /*
1376 : : * Return the next change when iterating over a transaction and its
1377 : : * subtransactions.
1378 : : *
1379 : : * Returns NULL when no further changes exist.
1380 : : */
1381 : : static ReorderBufferChange *
1382 : 365253 : ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1383 : : {
1384 : : ReorderBufferChange *change;
1385 : : ReorderBufferIterTXNEntry *entry;
1386 : : int32 off;
1387 : :
1388 : : /* nothing there anymore */
1389 [ + + ]: 365253 : if (state->heap->bh_size == 0)
1390 : 1906 : return NULL;
1391 : :
1392 : 363347 : off = DatumGetInt32(binaryheap_first(state->heap));
1393 : 363347 : entry = &state->entries[off];
1394 : :
1395 : : /* free memory we might have "leaked" in the previous *Next call */
1396 [ + + ]: 363347 : if (!dlist_is_empty(&state->old_change))
1397 : : {
1398 : 44 : change = dlist_container(ReorderBufferChange, node,
1399 : : dlist_pop_head_node(&state->old_change));
1345 akapila@postgresql.o 1400 : 44 : ReorderBufferReturnChange(rb, change, true);
3695 rhaas@postgresql.org 1401 [ - + ]: 44 : Assert(dlist_is_empty(&state->old_change));
1402 : : }
1403 : :
1404 : 363347 : change = entry->change;
1405 : :
1406 : : /*
1407 : : * update heap with information about which transaction has the next
1408 : : * relevant change in LSN order
1409 : : */
1410 : :
1411 : : /* there are in-memory changes */
1412 [ + + ]: 363347 : if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1413 : : {
1414 : 361280 : dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1415 : 361280 : ReorderBufferChange *next_change =
331 tgl@sss.pgh.pa.us 1416 : 361280 : dlist_container(ReorderBufferChange, node, next);
1417 : :
1418 : : /* txn stays the same */
3695 rhaas@postgresql.org 1419 : 361280 : state->entries[off].lsn = next_change->lsn;
1420 : 361280 : state->entries[off].change = next_change;
1421 : :
1422 : 361280 : binaryheap_replace_first(state->heap, Int32GetDatum(off));
1423 : 361280 : return change;
1424 : : }
1425 : :
1426 : : /* try to load changes from disk */
1427 [ + + ]: 2067 : if (entry->txn->nentries != entry->txn->nentries_mem)
1428 : : {
1429 : : /*
1430 : : * Ugly: restoring changes will reuse *Change records, thus delete the
1431 : : * current one from the per-tx list and only free in the next call.
1432 : : */
1433 : 63 : dlist_delete(&change->node);
1434 : 63 : dlist_push_tail(&state->old_change, &change->node);
1435 : :
1436 : : /*
1437 : : * Update the total bytes processed by the txn for which we are
1438 : : * releasing the current set of changes and restoring the new set of
1439 : : * changes.
1440 : : */
1077 akapila@postgresql.o 1441 : 63 : rb->totalBytes += entry->txn->size;
1583 1442 [ + + ]: 63 : if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1443 : : &state->entries[off].segno))
1444 : : {
1445 : : /* successfully restored changes from disk */
1446 : : ReorderBufferChange *next_change =
331 tgl@sss.pgh.pa.us 1447 : 35 : dlist_head_element(ReorderBufferChange, node,
1448 : : &entry->txn->changes);
1449 : :
3695 rhaas@postgresql.org 1450 [ - + ]: 35 : elog(DEBUG2, "restored %u/%u changes from disk",
1451 : : (uint32) entry->txn->nentries_mem,
1452 : : (uint32) entry->txn->nentries);
1453 : :
1454 [ - + ]: 35 : Assert(entry->txn->nentries_mem);
1455 : : /* txn stays the same */
1456 : 35 : state->entries[off].lsn = next_change->lsn;
1457 : 35 : state->entries[off].change = next_change;
1458 : 35 : binaryheap_replace_first(state->heap, Int32GetDatum(off));
1459 : :
1460 : 35 : return change;
1461 : : }
1462 : : }
1463 : :
1464 : : /* ok, no changes there anymore, remove */
1465 : 2032 : binaryheap_remove_first(state->heap);
1466 : :
1467 : 2032 : return change;
1468 : : }
1469 : :
1470 : : /*
1471 : : * Deallocate the iterator
1472 : : */
1473 : : static void
1474 : 1916 : ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1475 : : ReorderBufferIterTXNState *state)
1476 : : {
1477 : : int32 off;
1478 : :
1479 [ + + ]: 3951 : for (off = 0; off < state->nr_txns; off++)
1480 : : {
1583 akapila@postgresql.o 1481 [ - + ]: 2035 : if (state->entries[off].file.vfd != -1)
1583 akapila@postgresql.o 1482 :UBC 0 : FileClose(state->entries[off].file.vfd);
1483 : : }
1484 : :
1485 : : /* free memory we might have "leaked" in the last *Next call */
3695 rhaas@postgresql.org 1486 [ + + ]:CBC 1916 : if (!dlist_is_empty(&state->old_change))
1487 : : {
1488 : : ReorderBufferChange *change;
1489 : :
1490 : 18 : change = dlist_container(ReorderBufferChange, node,
1491 : : dlist_pop_head_node(&state->old_change));
1345 akapila@postgresql.o 1492 : 18 : ReorderBufferReturnChange(rb, change, true);
3695 rhaas@postgresql.org 1493 [ - + ]: 18 : Assert(dlist_is_empty(&state->old_change));
1494 : : }
1495 : :
1496 : 1916 : binaryheap_free(state->heap);
1497 : 1916 : pfree(state);
1498 : 1916 : }
1499 : :
1500 : : /*
1501 : : * Cleanup the contents of a transaction, usually after the transaction
1502 : : * committed or aborted.
1503 : : */
1504 : : static void
1505 : 3530 : ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1506 : : {
1507 : : bool found;
1508 : : dlist_mutable_iter iter;
1509 : :
1510 : : /* cleanup subtransactions & their changes */
1511 [ + - + + ]: 3715 : dlist_foreach_modify(iter, &txn->subtxns)
1512 : : {
1513 : : ReorderBufferTXN *subtxn;
1514 : :
1515 : 185 : subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1516 : :
1517 : : /*
1518 : : * Subtransactions are always associated to the toplevel TXN, even if
1519 : : * they originally were happening inside another subtxn, so we won't
1520 : : * ever recurse more than one level deep here.
1521 : : */
1556 alvherre@alvh.no-ip. 1522 [ - + ]: 185 : Assert(rbtxn_is_known_subxact(subtxn));
3695 rhaas@postgresql.org 1523 [ - + ]: 185 : Assert(subtxn->nsubtxns == 0);
1524 : :
1525 : 185 : ReorderBufferCleanupTXN(rb, subtxn);
1526 : : }
1527 : :
1528 : : /* cleanup changes in the txn */
1529 [ + - + + ]: 82071 : dlist_foreach_modify(iter, &txn->changes)
1530 : : {
1531 : : ReorderBufferChange *change;
1532 : :
1533 : 78541 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1534 : :
1535 : : /* Check we're not mixing changes from different transactions. */
1611 akapila@postgresql.o 1536 [ - + ]: 78541 : Assert(change->txn == txn);
1537 : :
11 msawada@postgresql.o 1538 :GNC 78541 : ReorderBufferReturnChange(rb, change, false);
1539 : : }
1540 : :
1541 : : /*
1542 : : * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1543 : : * They are always stored in the toplevel transaction.
1544 : : */
3695 rhaas@postgresql.org 1545 [ + - + + ]:CBC 25860 : dlist_foreach_modify(iter, &txn->tuplecids)
1546 : : {
1547 : : ReorderBufferChange *change;
1548 : :
1549 : 22330 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1550 : :
1551 : : /* Check we're not mixing changes from different transactions. */
1611 akapila@postgresql.o 1552 [ - + ]: 22330 : Assert(change->txn == txn);
3691 tgl@sss.pgh.pa.us 1553 [ - + ]: 22330 : Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1554 : :
1345 akapila@postgresql.o 1555 : 22330 : ReorderBufferReturnChange(rb, change, true);
1556 : : }
1557 : :
1558 : : /*
1559 : : * Cleanup the base snapshot, if set.
1560 : : */
3695 rhaas@postgresql.org 1561 [ + + ]: 3530 : if (txn->base_snapshot != NULL)
1562 : : {
1563 : 2825 : SnapBuildSnapDecRefcount(txn->base_snapshot);
2119 alvherre@alvh.no-ip. 1564 : 2825 : dlist_delete(&txn->base_snapshot_node);
1565 : : }
1566 : :
1567 : : /*
1568 : : * Cleanup the snapshot for the last streamed run.
1569 : : */
1345 akapila@postgresql.o 1570 [ + + ]: 3530 : if (txn->snapshot_now != NULL)
1571 : : {
1572 [ - + ]: 66 : Assert(rbtxn_is_streamed(txn));
1573 : 66 : ReorderBufferFreeSnap(rb, txn->snapshot_now);
1574 : : }
1575 : :
1576 : : /*
1577 : : * Remove TXN from its containing lists.
1578 : : *
1579 : : * Note: if txn is known as subxact, we are deleting the TXN from its
1580 : : * parent's list of known subxacts; this leaves the parent's nsubxacts
1581 : : * count too high, but we don't care. Otherwise, we are deleting the TXN
1582 : : * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1583 : : * list of catalog modifying transactions as well.
1584 : : */
2779 tgl@sss.pgh.pa.us 1585 : 3530 : dlist_delete(&txn->node);
612 akapila@postgresql.o 1586 [ + + ]: 3530 : if (rbtxn_has_catalog_changes(txn))
529 drowley@postgresql.o 1587 : 1109 : dclist_delete_from(&rb->catchange_txns, &txn->catchange_node);
1588 : :
1589 : : /* now remove reference from buffer */
331 tgl@sss.pgh.pa.us 1590 : 3530 : hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
3695 rhaas@postgresql.org 1591 [ - + ]: 3530 : Assert(found);
1592 : :
1593 : : /* remove entries spilled to disk */
1556 alvherre@alvh.no-ip. 1594 [ + + ]: 3530 : if (rbtxn_is_serialized(txn))
3695 rhaas@postgresql.org 1595 : 270 : ReorderBufferRestoreCleanup(rb, txn);
1596 : :
1597 : : /* Update the memory counter */
11 msawada@postgresql.o 1598 :GNC 3530 : ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
1599 : :
1600 : : /* deallocate */
3695 rhaas@postgresql.org 1601 :CBC 3530 : ReorderBufferReturnTXN(rb, txn);
1602 : 3530 : }
1603 : :
1604 : : /*
1605 : : * Discard changes from a transaction (and subtransactions), either after
1606 : : * streaming or decoding them at PREPARE. Keep the remaining info -
1607 : : * transactions, tuplecids, invalidations and snapshots.
1608 : : *
1609 : : * We additionally remove tuplecids after decoding the transaction at prepare
1610 : : * time as we only need to perform invalidation at rollback or commit prepared.
1611 : : *
1612 : : * 'txn_prepared' indicates that we have decoded the transaction at prepare
1613 : : * time.
1614 : : */
1615 : : static void
1196 akapila@postgresql.o 1616 : 1036 : ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
1617 : : {
1618 : : dlist_mutable_iter iter;
1619 : :
1620 : : /* cleanup subtransactions & their changes */
1345 1621 [ + - + + ]: 1332 : dlist_foreach_modify(iter, &txn->subtxns)
1622 : : {
1623 : : ReorderBufferTXN *subtxn;
1624 : :
1625 : 296 : subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1626 : :
1627 : : /*
1628 : : * Subtransactions are always associated to the toplevel TXN, even if
1629 : : * they originally were happening inside another subtxn, so we won't
1630 : : * ever recurse more than one level deep here.
1631 : : */
1632 [ - + ]: 296 : Assert(rbtxn_is_known_subxact(subtxn));
1633 [ - + ]: 296 : Assert(subtxn->nsubtxns == 0);
1634 : :
1196 1635 : 296 : ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1636 : : }
1637 : :
1638 : : /* cleanup changes in the txn */
1345 1639 [ + - + + ]: 162439 : dlist_foreach_modify(iter, &txn->changes)
1640 : : {
1641 : : ReorderBufferChange *change;
1642 : :
1643 : 161403 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1644 : :
1645 : : /* Check we're not mixing changes from different transactions. */
1646 [ - + ]: 161403 : Assert(change->txn == txn);
1647 : :
1648 : : /* remove the change from it's containing list */
1649 : 161403 : dlist_delete(&change->node);
1650 : :
11 msawada@postgresql.o 1651 :GNC 161403 : ReorderBufferReturnChange(rb, change, false);
1652 : : }
1653 : :
1654 : : /* Update the memory counter */
1655 : 1036 : ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
1656 : :
1657 : : /*
1658 : : * Mark the transaction as streamed.
1659 : : *
1660 : : * The top-level transaction, is marked as streamed always, even if it
1661 : : * does not contain any changes (that is, when all the changes are in
1662 : : * subtransactions).
1663 : : *
1664 : : * For subtransactions, we only mark them as streamed when there are
1665 : : * changes in them.
1666 : : *
1667 : : * We do it this way because of aborts - we don't want to send aborts for
1668 : : * XIDs the downstream is not aware of. And of course, it always knows
1669 : : * about the toplevel xact (we send the XID in all messages), but we never
1670 : : * stream XIDs of empty subxacts.
1671 : : */
394 akapila@postgresql.o 1672 [ + + + + :CBC 1036 : if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
+ + ]
1345 1673 : 819 : txn->txn_flags |= RBTXN_IS_STREAMED;
1674 : :
1196 1675 [ + + ]: 1036 : if (txn_prepared)
1676 : : {
1677 : : /*
1678 : : * If this is a prepared txn, cleanup the tuplecids we stored for
1679 : : * decoding catalog snapshot access. They are always stored in the
1680 : : * toplevel transaction.
1681 : : */
1682 [ + - + + ]: 179 : dlist_foreach_modify(iter, &txn->tuplecids)
1683 : : {
1684 : : ReorderBufferChange *change;
1685 : :
1686 : 123 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1687 : :
1688 : : /* Check we're not mixing changes from different transactions. */
1689 [ - + ]: 123 : Assert(change->txn == txn);
1690 [ - + ]: 123 : Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1691 : :
1692 : : /* Remove the change from its containing list. */
1693 : 123 : dlist_delete(&change->node);
1694 : :
1695 : 123 : ReorderBufferReturnChange(rb, change, true);
1696 : : }
1697 : : }
1698 : :
1699 : : /*
1700 : : * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1701 : : * memory. We could also keep the hash table and update it with new ctid
1702 : : * values, but this seems simpler and good enough for now.
1703 : : */
1345 1704 [ + + ]: 1036 : if (txn->tuplecid_hash != NULL)
1705 : : {
1706 : 22 : hash_destroy(txn->tuplecid_hash);
1707 : 22 : txn->tuplecid_hash = NULL;
1708 : : }
1709 : :
1710 : : /* If this txn is serialized then clean the disk space. */
1711 [ + + ]: 1036 : if (rbtxn_is_serialized(txn))
1712 : : {
1713 : 5 : ReorderBufferRestoreCleanup(rb, txn);
1714 : 5 : txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1715 : :
1716 : : /*
1717 : : * We set this flag to indicate if the transaction is ever serialized.
1718 : : * We need this to accurately update the stats as otherwise the same
1719 : : * transaction can be counted as serialized multiple times.
1720 : : */
1284 1721 : 5 : txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
1722 : : }
1723 : :
1724 : : /* also reset the number of entries in the transaction */
1345 1725 : 1036 : txn->nentries_mem = 0;
1726 : 1036 : txn->nentries = 0;
1727 : 1036 : }
1728 : :
1729 : : /*
1730 : : * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
1731 : : * HeapTupleSatisfiesHistoricMVCC.
1732 : : */
1733 : : static void
3695 rhaas@postgresql.org 1734 : 1916 : ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1735 : : {
1736 : : dlist_iter iter;
1737 : : HASHCTL hash_ctl;
1738 : :
1556 alvherre@alvh.no-ip. 1739 [ + + + + ]: 1916 : if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
3695 rhaas@postgresql.org 1740 : 1393 : return;
1741 : :
1742 : 523 : hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1743 : 523 : hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1744 : 523 : hash_ctl.hcxt = rb->context;
1745 : :
1746 : : /*
1747 : : * create the hash with the exact number of to-be-stored tuplecids from
1748 : : * the start
1749 : : */
1750 : 523 : txn->tuplecid_hash =
1751 : 523 : hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1752 : : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1753 : :
1754 [ + - + + ]: 10664 : dlist_foreach(iter, &txn->tuplecids)
1755 : : {
1756 : : ReorderBufferTupleCidKey key;
1757 : : ReorderBufferTupleCidEnt *ent;
1758 : : bool found;
1759 : : ReorderBufferChange *change;
1760 : :
1761 : 10141 : change = dlist_container(ReorderBufferChange, node, iter.cur);
1762 : :
3691 tgl@sss.pgh.pa.us 1763 [ - + ]: 10141 : Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1764 : :
1765 : : /* be careful about padding */
3695 rhaas@postgresql.org 1766 : 10141 : memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1767 : :
648 1768 : 10141 : key.rlocator = change->data.tuplecid.locator;
1769 : :
3691 tgl@sss.pgh.pa.us 1770 : 10141 : ItemPointerCopy(&change->data.tuplecid.tid,
1771 : : &key.tid);
1772 : :
1773 : : ent = (ReorderBufferTupleCidEnt *)
433 peter@eisentraut.org 1774 : 10141 : hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
3695 rhaas@postgresql.org 1775 [ + + ]: 10141 : if (!found)
1776 : : {
3691 tgl@sss.pgh.pa.us 1777 : 8595 : ent->cmin = change->data.tuplecid.cmin;
1778 : 8595 : ent->cmax = change->data.tuplecid.cmax;
1779 : 8595 : ent->combocid = change->data.tuplecid.combocid;
1780 : : }
1781 : : else
1782 : : {
1783 : : /*
1784 : : * Maybe we already saw this tuple before in this transaction, but
1785 : : * if so it must have the same cmin.
1786 : : */
1787 [ - + ]: 1546 : Assert(ent->cmin == change->data.tuplecid.cmin);
1788 : :
1789 : : /*
1790 : : * cmax may be initially invalid, but once set it can only grow,
1791 : : * and never become invalid again.
1792 : : */
1888 alvherre@alvh.no-ip. 1793 [ + + + - : 1546 : Assert((ent->cmax == InvalidCommandId) ||
- + ]
1794 : : ((change->data.tuplecid.cmax != InvalidCommandId) &&
1795 : : (change->data.tuplecid.cmax > ent->cmax)));
3691 tgl@sss.pgh.pa.us 1796 : 1546 : ent->cmax = change->data.tuplecid.cmax;
1797 : : }
1798 : : }
1799 : : }
1800 : :
1801 : : /*
1802 : : * Copy a provided snapshot so we can modify it privately. This is needed so
1803 : : * that catalog modifying transactions can look into intermediate catalog
1804 : : * states.
1805 : : */
1806 : : static Snapshot
3695 rhaas@postgresql.org 1807 : 1712 : ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1808 : : ReorderBufferTXN *txn, CommandId cid)
1809 : : {
1810 : : Snapshot snap;
1811 : : dlist_iter iter;
1812 : 1712 : int i = 0;
1813 : : Size size;
1814 : :
1815 : 1712 : size = sizeof(SnapshotData) +
1816 : 1712 : sizeof(TransactionId) * orig_snap->xcnt +
1817 : 1712 : sizeof(TransactionId) * (txn->nsubtxns + 1);
1818 : :
1819 : 1712 : snap = MemoryContextAllocZero(rb->context, size);
1820 : 1712 : memcpy(snap, orig_snap, sizeof(SnapshotData));
1821 : :
1822 : 1712 : snap->copied = true;
3286 heikki.linnakangas@i 1823 : 1712 : snap->active_count = 1; /* mark as active so nobody frees it */
1824 : 1712 : snap->regd_count = 0;
3695 rhaas@postgresql.org 1825 : 1712 : snap->xip = (TransactionId *) (snap + 1);
1826 : :
1827 : 1712 : memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1828 : :
1829 : : /*
1830 : : * snap->subxip contains all txids that belong to our transaction which we
1831 : : * need to check via cmin/cmax. That's why we store the toplevel
1832 : : * transaction in there as well.
1833 : : */
1834 : 1712 : snap->subxip = snap->xip + snap->xcnt;
1835 : 1712 : snap->subxip[i++] = txn->xid;
1836 : :
1837 : : /*
1838 : : * subxcnt isn't decreased when subtransactions abort, so count manually.
1839 : : * Since it's an upper boundary it is safe to use it for the allocation
1840 : : * above.
1841 : : */
1842 : 1712 : snap->subxcnt = 1;
1843 : :
1844 [ + - + + ]: 2020 : dlist_foreach(iter, &txn->subtxns)
1845 : : {
1846 : : ReorderBufferTXN *sub_txn;
1847 : :
1848 : 308 : sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1849 : 308 : snap->subxip[i++] = sub_txn->xid;
1850 : 308 : snap->subxcnt++;
1851 : : }
1852 : :
1853 : : /* sort so we can bsearch() later */
1854 : 1712 : qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1855 : :
1856 : : /* store the specified current CommandId */
1857 : 1712 : snap->curcid = cid;
1858 : :
1859 : 1712 : return snap;
1860 : : }
1861 : :
1862 : : /*
1863 : : * Free a previously ReorderBufferCopySnap'ed snapshot
1864 : : */
1865 : : static void
1866 : 2796 : ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1867 : : {
1868 [ + + ]: 2796 : if (snap->copied)
1869 : 1709 : pfree(snap);
1870 : : else
1871 : 1087 : SnapBuildSnapDecRefcount(snap);
1872 : 2796 : }
1873 : :
1874 : : /*
1875 : : * If the transaction was (partially) streamed, we need to prepare or commit
1876 : : * it in a 'streamed' way. That is, we first stream the remaining part of the
1877 : : * transaction, and then invoke stream_prepare or stream_commit message as per
1878 : : * the case.
1879 : : */
1880 : : static void
1345 akapila@postgresql.o 1881 : 66 : ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1882 : : {
1883 : : /* we should only call this for previously streamed transactions */
1884 [ - + ]: 66 : Assert(rbtxn_is_streamed(txn));
1885 : :
1886 : 66 : ReorderBufferStreamTXN(rb, txn);
1887 : :
1196 1888 [ + + ]: 66 : if (rbtxn_prepared(txn))
1889 : : {
1890 : : /*
1891 : : * Note, we send stream prepare even if a concurrent abort is
1892 : : * detected. See DecodePrepare for more information.
1893 : : */
1894 : 16 : rb->stream_prepare(rb, txn, txn->final_lsn);
1895 : :
1896 : : /*
1897 : : * This is a PREPARED transaction, part of a two-phase commit. The
1898 : : * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1899 : : * just truncate txn by removing changes and tuplecids.
1900 : : */
1901 : 16 : ReorderBufferTruncateTXN(rb, txn, true);
1902 : : /* Reset the CheckXidAlive */
1903 : 16 : CheckXidAlive = InvalidTransactionId;
1904 : : }
1905 : : else
1906 : : {
1907 : 50 : rb->stream_commit(rb, txn, txn->final_lsn);
1908 : 50 : ReorderBufferCleanupTXN(rb, txn);
1909 : : }
1345 1910 : 66 : }
1911 : :
1912 : : /*
1913 : : * Set xid to detect concurrent aborts.
1914 : : *
1915 : : * While streaming an in-progress transaction or decoding a prepared
1916 : : * transaction there is a possibility that the (sub)transaction might get
1917 : : * aborted concurrently. In such case if the (sub)transaction has catalog
1918 : : * update then we might decode the tuple using wrong catalog version. For
1919 : : * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1920 : : * the transaction 501 updates the catalog tuple and after that we will have
1921 : : * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1922 : : * aborted and some other transaction say 502 updates the same catalog tuple
1923 : : * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1924 : : * problem is that when we try to decode the tuple inserted/updated in 501
1925 : : * after the catalog update, we will see the catalog tuple with (xmin: 500,
1926 : : * xmax: 502) as visible because it will consider that the tuple is deleted by
1927 : : * xid 502 which is not visible to our snapshot. And when we will try to
1928 : : * decode with that catalog tuple, it can lead to a wrong result or a crash.
1929 : : * So, it is necessary to detect concurrent aborts to allow streaming of
1930 : : * in-progress transactions or decoding of prepared transactions.
1931 : : *
1932 : : * For detecting the concurrent abort we set CheckXidAlive to the current
1933 : : * (sub)transaction's xid for which this change belongs to. And, during
1934 : : * catalog scan we can check the status of the xid and if it is aborted we will
1935 : : * report a specific error so that we can stop streaming current transaction
1936 : : * and discard the already streamed changes on such an error. We might have
1937 : : * already streamed some of the changes for the aborted (sub)transaction, but
1938 : : * that is fine because when we decode the abort we will stream abort message
1939 : : * to truncate the changes in the subscriber. Similarly, for prepared
1940 : : * transactions, we stop decoding if concurrent abort is detected and then
1941 : : * rollback the changes when rollback prepared is encountered. See
1942 : : * DecodePrepare.
1943 : : */
1944 : : static inline void
1945 : 177705 : SetupCheckXidLive(TransactionId xid)
1946 : : {
1947 : : /*
1948 : : * If the input transaction id is already set as a CheckXidAlive then
1949 : : * nothing to do.
1950 : : */
1951 [ + + ]: 177705 : if (TransactionIdEquals(CheckXidAlive, xid))
3695 rhaas@postgresql.org 1952 : 107518 : return;
1953 : :
1954 : : /*
1955 : : * setup CheckXidAlive if it's not committed yet. We don't check if the
1956 : : * xid is aborted. That will happen during catalog access.
1957 : : */
1345 akapila@postgresql.o 1958 [ + + ]: 70187 : if (!TransactionIdDidCommit(xid))
1959 : 404 : CheckXidAlive = xid;
1960 : : else
1961 : 69783 : CheckXidAlive = InvalidTransactionId;
1962 : : }
1963 : :
1964 : : /*
1965 : : * Helper function for ReorderBufferProcessTXN for applying change.
1966 : : */
1967 : : static inline void
1968 : 343076 : ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
1969 : : Relation relation, ReorderBufferChange *change,
1970 : : bool streaming)
1971 : : {
1972 [ + + ]: 343076 : if (streaming)
1973 : 176010 : rb->stream_change(rb, txn, relation, change);
1974 : : else
1975 : 167066 : rb->apply_change(rb, txn, relation, change);
1976 : 343073 : }
1977 : :
1978 : : /*
1979 : : * Helper function for ReorderBufferProcessTXN for applying the truncate.
1980 : : */
1981 : : static inline void
1982 : 42 : ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
1983 : : int nrelations, Relation *relations,
1984 : : ReorderBufferChange *change, bool streaming)
1985 : : {
1986 [ - + ]: 42 : if (streaming)
1345 akapila@postgresql.o 1987 :UBC 0 : rb->stream_truncate(rb, txn, nrelations, relations, change);
1988 : : else
1345 akapila@postgresql.o 1989 :CBC 42 : rb->apply_truncate(rb, txn, nrelations, relations, change);
1990 : 42 : }
1991 : :
1992 : : /*
1993 : : * Helper function for ReorderBufferProcessTXN for applying the message.
1994 : : */
1995 : : static inline void
1996 : 11 : ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
1997 : : ReorderBufferChange *change, bool streaming)
1998 : : {
1999 [ + + ]: 11 : if (streaming)
2000 : 3 : rb->stream_message(rb, txn, change->lsn, true,
2001 : 3 : change->data.msg.prefix,
2002 : : change->data.msg.message_size,
2003 : 3 : change->data.msg.message);
2004 : : else
2005 : 8 : rb->message(rb, txn, change->lsn, true,
2006 : 8 : change->data.msg.prefix,
2007 : : change->data.msg.message_size,
2008 : 8 : change->data.msg.message);
2009 : 11 : }
2010 : :
2011 : : /*
2012 : : * Function to store the command id and snapshot at the end of the current
2013 : : * stream so that we can reuse the same while sending the next stream.
2014 : : */
2015 : : static inline void
2016 : 702 : ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
2017 : : Snapshot snapshot_now, CommandId command_id)
2018 : : {
2019 : 702 : txn->command_id = command_id;
2020 : :
2021 : : /* Avoid copying if it's already copied. */
2022 [ + - ]: 702 : if (snapshot_now->copied)
2023 : 702 : txn->snapshot_now = snapshot_now;
2024 : : else
1345 akapila@postgresql.o 2025 :UBC 0 : txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2026 : : txn, command_id);
1345 akapila@postgresql.o 2027 :CBC 702 : }
2028 : :
2029 : : /*
2030 : : * Helper function for ReorderBufferProcessTXN to handle the concurrent
2031 : : * abort of the streaming transaction. This resets the TXN such that it
2032 : : * can be used to stream the remaining data of transaction being processed.
2033 : : * This can happen when the subtransaction is aborted and we still want to
2034 : : * continue processing the main or other subtransactions data.
2035 : : */
2036 : : static void
2037 : 7 : ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2038 : : Snapshot snapshot_now,
2039 : : CommandId command_id,
2040 : : XLogRecPtr last_lsn,
2041 : : ReorderBufferChange *specinsert)
2042 : : {
2043 : : /* Discard the changes that we just streamed */
1196 2044 : 7 : ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
2045 : :
2046 : : /* Free all resources allocated for toast reconstruction */
1345 2047 : 7 : ReorderBufferToastReset(rb, txn);
2048 : :
2049 : : /* Return the spec insert change if it is not NULL */
2050 [ - + ]: 7 : if (specinsert != NULL)
2051 : : {
1345 akapila@postgresql.o 2052 :UBC 0 : ReorderBufferReturnChange(rb, specinsert, true);
2053 : 0 : specinsert = NULL;
2054 : : }
2055 : :
2056 : : /*
2057 : : * For the streaming case, stop the stream and remember the command ID and
2058 : : * snapshot for the streaming run.
2059 : : */
1196 akapila@postgresql.o 2060 [ + - ]:CBC 7 : if (rbtxn_is_streamed(txn))
2061 : : {
2062 : 7 : rb->stream_stop(rb, txn, last_lsn);
2063 : 7 : ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2064 : : }
1345 2065 : 7 : }
2066 : :
2067 : : /*
2068 : : * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2069 : : *
2070 : : * Send data of a transaction (and its subtransactions) to the
2071 : : * output plugin. We iterate over the top and subtransactions (using a k-way
2072 : : * merge) and replay the changes in lsn order.
2073 : : *
2074 : : * If streaming is true then data will be sent using stream API.
2075 : : *
2076 : : * Note: "volatile" markers on some parameters are to avoid trouble with
2077 : : * PG_TRY inside the function.
2078 : : */
2079 : : static void
2080 : 1916 : ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2081 : : XLogRecPtr commit_lsn,
2082 : : volatile Snapshot snapshot_now,
2083 : : volatile CommandId command_id,
2084 : : bool streaming)
2085 : : {
2086 : : bool using_subtxn;
2087 : 1916 : MemoryContext ccxt = CurrentMemoryContext;
2088 : 1916 : ReorderBufferIterTXNState *volatile iterstate = NULL;
2089 : 1916 : volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2090 : 1916 : ReorderBufferChange *volatile specinsert = NULL;
2091 : 1916 : volatile bool stream_started = false;
2092 : 1916 : ReorderBufferTXN *volatile curtxn = NULL;
2093 : :
2094 : : /* build data to be able to lookup the CommandIds of catalog tuples */
3695 rhaas@postgresql.org 2095 : 1916 : ReorderBufferBuildTupleCidHash(rb, txn);
2096 : :
2097 : : /* setup the initial snapshot */
2098 : 1916 : SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2099 : :
2100 : : /*
2101 : : * Decoding needs access to syscaches et al., which in turn use
2102 : : * heavyweight locks and such. Thus we need to have enough state around to
2103 : : * keep track of those. The easiest way is to simply use a transaction
2104 : : * internally. That also allows us to easily enforce that nothing writes
2105 : : * to the database by checking for xid assignments.
2106 : : *
2107 : : * When we're called via the SQL SRF there's already a transaction
2108 : : * started, so start an explicit subtransaction there.
2109 : : */
3367 tgl@sss.pgh.pa.us 2110 : 1916 : using_subtxn = IsTransactionOrTransactionBlock();
2111 : :
3695 rhaas@postgresql.org 2112 [ + + ]: 1916 : PG_TRY();
2113 : : {
2114 : : ReorderBufferChange *change;
431 akapila@postgresql.o 2115 : 1916 : int changes_count = 0; /* used to accumulate the number of
2116 : : * changes */
2117 : :
3440 andres@anarazel.de 2118 [ + + ]: 1916 : if (using_subtxn)
1345 akapila@postgresql.o 2119 [ + + ]: 441 : BeginInternalSubTransaction(streaming ? "stream" : "replay");
2120 : : else
3695 rhaas@postgresql.org 2121 : 1475 : StartTransactionCommand();
2122 : :
2123 : : /*
2124 : : * We only need to send begin/begin-prepare for non-streamed
2125 : : * transactions.
2126 : : */
1345 akapila@postgresql.o 2127 [ + + ]: 1916 : if (!streaming)
2128 : : {
1196 2129 [ + + ]: 1214 : if (rbtxn_prepared(txn))
2130 : 22 : rb->begin_prepare(rb, txn);
2131 : : else
2132 : 1192 : rb->begin(rb, txn);
2133 : : }
2134 : :
1583 2135 : 1916 : ReorderBufferIterTXNInit(rb, txn, &iterstate);
3367 tgl@sss.pgh.pa.us 2136 [ + + ]: 367169 : while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2137 : : {
3695 rhaas@postgresql.org 2138 : 363347 : Relation relation = NULL;
2139 : : Oid reloid;
2140 : :
600 akapila@postgresql.o 2141 [ - + ]: 363347 : CHECK_FOR_INTERRUPTS();
2142 : :
2143 : : /*
2144 : : * We can't call start stream callback before processing first
2145 : : * change.
2146 : : */
1345 2147 [ + + ]: 363347 : if (prev_lsn == InvalidXLogRecPtr)
2148 : : {
2149 [ + + ]: 1877 : if (streaming)
2150 : : {
2151 : 663 : txn->origin_id = change->origin_id;
2152 : 663 : rb->stream_start(rb, txn, change->lsn);
2153 : 663 : stream_started = true;
2154 : : }
2155 : : }
2156 : :
2157 : : /*
2158 : : * Enforce correct ordering of changes, merged from multiple
2159 : : * subtransactions. The changes may have the same LSN due to
2160 : : * MULTI_INSERT xlog records.
2161 : : */
2162 [ + + - + ]: 363347 : Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2163 : :
2164 : 363347 : prev_lsn = change->lsn;
2165 : :
2166 : : /*
2167 : : * Set the current xid to detect concurrent aborts. This is
2168 : : * required for the cases when we decode the changes before the
2169 : : * COMMIT record is processed.
2170 : : */
1196 2171 [ + + + + ]: 363347 : if (streaming || rbtxn_prepared(change->txn))
2172 : : {
1345 2173 : 177705 : curtxn = change->txn;
2174 : 177705 : SetupCheckXidLive(curtxn->xid);
2175 : : }
2176 : :
3691 tgl@sss.pgh.pa.us 2177 [ + + + - : 363347 : switch (change->action)
+ + + + +
- - ]
2178 : : {
3264 andres@anarazel.de 2179 : 1782 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2180 : :
2181 : : /*
2182 : : * Confirmation for speculative insertion arrived. Simply
2183 : : * use as a normal record. It'll be cleaned up at the end
2184 : : * of INSERT processing.
2185 : : */
2110 alvherre@alvh.no-ip. 2186 [ - + ]: 1782 : if (specinsert == NULL)
2110 alvherre@alvh.no-ip. 2187 [ # # ]:UBC 0 : elog(ERROR, "invalid ordering of speculative insertion changes");
3264 andres@anarazel.de 2188 [ - + ]:CBC 1782 : Assert(specinsert->data.tp.oldtuple == NULL);
2189 : 1782 : change = specinsert;
2190 : 1782 : change->action = REORDER_BUFFER_CHANGE_INSERT;
2191 : :
2192 : : /* intentionally fall through */
3691 tgl@sss.pgh.pa.us 2193 : 348911 : case REORDER_BUFFER_CHANGE_INSERT:
2194 : : case REORDER_BUFFER_CHANGE_UPDATE:
2195 : : case REORDER_BUFFER_CHANGE_DELETE:
3695 rhaas@postgresql.org 2196 [ - + ]: 348911 : Assert(snapshot_now);
2197 : :
648 2198 : 348911 : reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2199 : : change->data.tp.rlocator.relNumber);
2200 : :
2201 : : /*
2202 : : * Mapped catalog tuple without data, emitted while
2203 : : * catalog table was in the process of being rewritten. We
2204 : : * can fail to look up the relfilenumber, because the
2205 : : * relmapper has no "historic" view, in contrast to the
2206 : : * normal catalog during decoding. Thus repeated rewrites
2207 : : * can cause a lookup failure. That's OK because we do not
2208 : : * decode catalog changes anyway. Normally such tuples
2209 : : * would be skipped over below, but we can't identify
2210 : : * whether the table should be logically logged without
2211 : : * mapping the relfilenumber to the oid.
2212 : : */
3695 2213 [ + + ]: 348904 : if (reloid == InvalidOid &&
3691 tgl@sss.pgh.pa.us 2214 [ + - ]: 76 : change->data.tp.newtuple == NULL &&
2215 [ + - ]: 76 : change->data.tp.oldtuple == NULL)
3264 andres@anarazel.de 2216 : 76 : goto change_done;
3695 rhaas@postgresql.org 2217 [ - + ]: 348828 : else if (reloid == InvalidOid)
648 rhaas@postgresql.org 2218 [ # # ]:UBC 0 : elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2219 : : relpathperm(change->data.tp.rlocator,
2220 : : MAIN_FORKNUM));
2221 : :
3695 rhaas@postgresql.org 2222 :CBC 348828 : relation = RelationIdGetRelation(reloid);
2223 : :
1680 tgl@sss.pgh.pa.us 2224 [ - + ]: 348828 : if (!RelationIsValid(relation))
648 rhaas@postgresql.org 2225 [ # # ]:UBC 0 : elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2226 : : reloid,
2227 : : relpathperm(change->data.tp.rlocator,
2228 : : MAIN_FORKNUM));
2229 : :
3264 andres@anarazel.de 2230 [ + - + - :CBC 348828 : if (!RelationIsLogicallyLogged(relation))
- + - - -
- + - +
+ ]
2231 : 3769 : goto change_done;
2232 : :
2233 : : /*
2234 : : * Ignore temporary heaps created during DDL unless the
2235 : : * plugin has asked for them.
2236 : : */
2216 peter_e@gmx.net 2237 [ + + + + ]: 345059 : if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2238 : 24 : goto change_done;
2239 : :
2240 : : /*
2241 : : * For now ignore sequence changes entirely. Most of the
2242 : : * time they don't log changes using records we
2243 : : * understand, so it doesn't make sense to handle the few
2244 : : * cases we do.
2245 : : */
3264 andres@anarazel.de 2246 [ - + ]: 345035 : if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
3264 andres@anarazel.de 2247 :UBC 0 : goto change_done;
2248 : :
2249 : : /* user-triggered change */
3264 andres@anarazel.de 2250 [ + + ]:CBC 345035 : if (!IsToastRelation(relation))
2251 : : {
2252 : 343076 : ReorderBufferToastReplace(rb, txn, relation, change);
1345 akapila@postgresql.o 2253 : 343076 : ReorderBufferApplyChange(rb, txn, relation, change,
2254 : : streaming);
2255 : :
2256 : : /*
2257 : : * Only clear reassembled toast chunks if we're sure
2258 : : * they're not required anymore. The creator of the
2259 : : * tuple tells us.
2260 : : */
3264 andres@anarazel.de 2261 [ + + ]: 343073 : if (change->data.tp.clear_toast_afterwards)
2262 : 342843 : ReorderBufferToastReset(rb, txn);
2263 : : }
2264 : : /* we're not interested in toast deletions */
2265 [ + + ]: 1959 : else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2266 : : {
2267 : : /*
2268 : : * Need to reassemble the full toasted Datum in
2269 : : * memory, to ensure the chunks don't get reused till
2270 : : * we're done remove it from the list of this
2271 : : * transaction's changes. Otherwise it will get
2272 : : * freed/reused while restoring spooled data from
2273 : : * disk.
2274 : : */
1964 tomas.vondra@postgre 2275 [ - + ]: 1728 : Assert(change->data.tp.newtuple != NULL);
2276 : :
2277 : 1728 : dlist_delete(&change->node);
2278 : 1728 : ReorderBufferToastAppendChunk(rb, txn, relation,
2279 : : change);
2280 : : }
2281 : :
3249 bruce@momjian.us 2282 : 231 : change_done:
2283 : :
2284 : : /*
2285 : : * If speculative insertion was confirmed, the record
2286 : : * isn't needed anymore.
2287 : : */
3264 andres@anarazel.de 2288 [ + + ]: 348901 : if (specinsert != NULL)
2289 : : {
1345 akapila@postgresql.o 2290 : 1782 : ReorderBufferReturnChange(rb, specinsert, true);
3264 andres@anarazel.de 2291 : 1782 : specinsert = NULL;
2292 : : }
2293 : :
1345 akapila@postgresql.o 2294 [ + + ]: 348901 : if (RelationIsValid(relation))
2295 : : {
3264 andres@anarazel.de 2296 : 348825 : RelationClose(relation);
2297 : 348825 : relation = NULL;
2298 : : }
3695 rhaas@postgresql.org 2299 : 348901 : break;
2300 : :
3264 andres@anarazel.de 2301 : 1782 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2302 : :
2303 : : /*
2304 : : * Speculative insertions are dealt with by delaying the
2305 : : * processing of the insert until the confirmation record
2306 : : * arrives. For that we simply unlink the record from the
2307 : : * chain, so it does not get freed/reused while restoring
2308 : : * spooled data from disk.
2309 : : *
2310 : : * This is safe in the face of concurrent catalog changes
2311 : : * because the relevant relation can't be changed between
2312 : : * speculative insertion and confirmation due to
2313 : : * CheckTableNotInUse() and locking.
2314 : : */
2315 : :
2316 : : /* clear out a pending (and thus failed) speculation */
2317 [ - + ]: 1782 : if (specinsert != NULL)
2318 : : {
1345 akapila@postgresql.o 2319 :UBC 0 : ReorderBufferReturnChange(rb, specinsert, true);
3264 andres@anarazel.de 2320 : 0 : specinsert = NULL;
2321 : : }
2322 : :
2323 : : /* and memorize the pending insertion */
3264 andres@anarazel.de 2324 :CBC 1782 : dlist_delete(&change->node);
2325 : 1782 : specinsert = change;
2326 : 1782 : break;
2327 : :
1034 akapila@postgresql.o 2328 :UBC 0 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2329 : :
2330 : : /*
2331 : : * Abort for speculative insertion arrived. So cleanup the
2332 : : * specinsert tuple and toast hash.
2333 : : *
2334 : : * Note that we get the spec abort change for each toast
2335 : : * entry but we need to perform the cleanup only the first
2336 : : * time we get it for the main table.
2337 : : */
2338 [ # # ]: 0 : if (specinsert != NULL)
2339 : : {
2340 : : /*
2341 : : * We must clean the toast hash before processing a
2342 : : * completely new tuple to avoid confusion about the
2343 : : * previous tuple's toast chunks.
2344 : : */
2345 [ # # ]: 0 : Assert(change->data.tp.clear_toast_afterwards);
2346 : 0 : ReorderBufferToastReset(rb, txn);
2347 : :
2348 : : /* We don't need this record anymore. */
2349 : 0 : ReorderBufferReturnChange(rb, specinsert, true);
2350 : 0 : specinsert = NULL;
2351 : : }
2352 : 0 : break;
2353 : :
2199 peter_e@gmx.net 2354 :CBC 42 : case REORDER_BUFFER_CHANGE_TRUNCATE:
2355 : : {
2356 : : int i;
2180 tgl@sss.pgh.pa.us 2357 : 42 : int nrelids = change->data.truncate.nrelids;
2358 : 42 : int nrelations = 0;
2359 : : Relation *relations;
2360 : :
2361 : 42 : relations = palloc0(nrelids * sizeof(Relation));
2362 [ + + ]: 118 : for (i = 0; i < nrelids; i++)
2363 : : {
2364 : 76 : Oid relid = change->data.truncate.relids[i];
2365 : : Relation rel;
2366 : :
557 drowley@postgresql.o 2367 : 76 : rel = RelationIdGetRelation(relid);
2368 : :
2369 [ - + ]: 76 : if (!RelationIsValid(rel))
2180 tgl@sss.pgh.pa.us 2370 [ # # ]:UBC 0 : elog(ERROR, "could not open relation with OID %u", relid);
2371 : :
557 drowley@postgresql.o 2372 [ + - + - :CBC 76 : if (!RelationIsLogicallyLogged(rel))
- + - - -
- + - -
+ ]
2180 tgl@sss.pgh.pa.us 2373 :UBC 0 : continue;
2374 : :
557 drowley@postgresql.o 2375 :CBC 76 : relations[nrelations++] = rel;
2376 : : }
2377 : :
2378 : : /* Apply the truncate. */
1345 akapila@postgresql.o 2379 : 42 : ReorderBufferApplyTruncate(rb, txn, nrelations,
2380 : : relations, change,
2381 : : streaming);
2382 : :
2180 tgl@sss.pgh.pa.us 2383 [ + + ]: 118 : for (i = 0; i < nrelations; i++)
2384 : 76 : RelationClose(relations[i]);
2385 : :
2386 : 42 : break;
2387 : : }
2388 : :
2930 simon@2ndQuadrant.co 2389 : 11 : case REORDER_BUFFER_CHANGE_MESSAGE:
1345 akapila@postgresql.o 2390 : 11 : ReorderBufferApplyMessage(rb, txn, change, streaming);
2930 simon@2ndQuadrant.co 2391 : 11 : break;
2392 : :
1277 akapila@postgresql.o 2393 : 2117 : case REORDER_BUFFER_CHANGE_INVALIDATION:
2394 : : /* Execute the invalidation messages locally */
702 alvherre@alvh.no-ip. 2395 : 2117 : ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2396 : : change->data.inval.invalidations);
1277 akapila@postgresql.o 2397 : 2117 : break;
2398 : :
3695 rhaas@postgresql.org 2399 : 527 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2400 : : /* get rid of the old */
2401 : 527 : TeardownHistoricSnapshot(false);
2402 : :
2403 [ + + ]: 527 : if (snapshot_now->copied)
2404 : : {
2405 : 507 : ReorderBufferFreeSnap(rb, snapshot_now);
2406 : 507 : snapshot_now =
3691 tgl@sss.pgh.pa.us 2407 : 507 : ReorderBufferCopySnap(rb, change->data.snapshot,
2408 : : txn, command_id);
2409 : : }
2410 : :
2411 : : /*
2412 : : * Restored from disk, need to be careful not to double
2413 : : * free. We could introduce refcounting for that, but for
2414 : : * now this seems infrequent enough not to care.
2415 : : */
2416 [ - + ]: 20 : else if (change->data.snapshot->copied)
2417 : : {
3695 rhaas@postgresql.org 2418 :UBC 0 : snapshot_now =
3691 tgl@sss.pgh.pa.us 2419 : 0 : ReorderBufferCopySnap(rb, change->data.snapshot,
2420 : : txn, command_id);
2421 : : }
2422 : : else
2423 : : {
3691 tgl@sss.pgh.pa.us 2424 :CBC 20 : snapshot_now = change->data.snapshot;
2425 : : }
2426 : :
2427 : : /* and continue with the new one */
3695 rhaas@postgresql.org 2428 : 527 : SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2429 : 527 : break;
2430 : :
2431 : 9957 : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
3691 tgl@sss.pgh.pa.us 2432 [ - + ]: 9957 : Assert(change->data.command_id != InvalidCommandId);
2433 : :
2434 [ + + ]: 9957 : if (command_id < change->data.command_id)
2435 : : {
2436 : 1790 : command_id = change->data.command_id;
2437 : :
3695 rhaas@postgresql.org 2438 [ + + ]: 1790 : if (!snapshot_now->copied)
2439 : : {
2440 : : /* we don't use the global one anymore */
2441 : 503 : snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2442 : : txn, command_id);
2443 : : }
2444 : :
2445 : 1790 : snapshot_now->curcid = command_id;
2446 : :
2447 : 1790 : TeardownHistoricSnapshot(false);
2448 : 1790 : SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2449 : : }
2450 : :
2451 : 9957 : break;
2452 : :
3695 rhaas@postgresql.org 2453 :UBC 0 : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2454 [ # # ]: 0 : elog(ERROR, "tuplecid value in changequeue");
2455 : : break;
2456 : : }
2457 : :
2458 : : /*
2459 : : * It is possible that the data is not sent to downstream for a
2460 : : * long time either because the output plugin filtered it or there
2461 : : * is a DDL that generates a lot of data that is not processed by
2462 : : * the plugin. So, in such cases, the downstream can timeout. To
2463 : : * avoid that we try to send a keepalive message if required.
2464 : : * Trying to send a keepalive message after every change has some
2465 : : * overhead, but testing showed there is no noticeable overhead if
2466 : : * we do it after every ~100 changes.
2467 : : */
2468 : : #define CHANGES_THRESHOLD 100
2469 : :
431 akapila@postgresql.o 2470 [ + + ]:CBC 363337 : if (++changes_count >= CHANGES_THRESHOLD)
2471 : : {
2472 : 3187 : rb->update_progress_txn(rb, txn, change->lsn);
2473 : 3187 : changes_count = 0;
2474 : : }
2475 : : }
2476 : :
2477 : : /* speculative insertion record must be freed by now */
1034 2478 [ - + ]: 1906 : Assert(!specinsert);
2479 : :
2480 : : /* clean up the iterator */
3695 rhaas@postgresql.org 2481 : 1906 : ReorderBufferIterTXNFinish(rb, iterstate);
3368 tgl@sss.pgh.pa.us 2482 : 1906 : iterstate = NULL;
2483 : :
2484 : : /*
2485 : : * Update total transaction count and total bytes processed by the
2486 : : * transaction and its subtransactions. Ensure to not count the
2487 : : * streamed transaction multiple times.
2488 : : *
2489 : : * Note that the statistics computation has to be done after
2490 : : * ReorderBufferIterTXNFinish as it releases the serialized change
2491 : : * which we have already accounted in ReorderBufferIterTXNNext.
2492 : : */
1094 akapila@postgresql.o 2493 [ + + ]: 1906 : if (!rbtxn_is_streamed(txn))
2494 : 1278 : rb->totalTxns++;
2495 : :
1077 2496 : 1906 : rb->totalBytes += txn->total_size;
2497 : :
2498 : : /*
2499 : : * Done with current changes, send the last message for this set of
2500 : : * changes depending upon streaming mode.
2501 : : */
1345 2502 [ + + ]: 1906 : if (streaming)
2503 : : {
2504 [ + + ]: 695 : if (stream_started)
2505 : : {
2506 : 656 : rb->stream_stop(rb, txn, prev_lsn);
2507 : 656 : stream_started = false;
2508 : : }
2509 : : }
2510 : : else
2511 : : {
2512 : : /*
2513 : : * Call either PREPARE (for two-phase transactions) or COMMIT (for
2514 : : * regular ones).
2515 : : */
1196 2516 [ + + ]: 1211 : if (rbtxn_prepared(txn))
2517 : 22 : rb->prepare(rb, txn, commit_lsn);
2518 : : else
2519 : 1189 : rb->commit(rb, txn, commit_lsn);
2520 : : }
2521 : :
2522 : : /* this is just a sanity check against bad output plugin behaviour */
3695 rhaas@postgresql.org 2523 [ - + ]: 1900 : if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
3637 tgl@sss.pgh.pa.us 2524 [ # # ]:UBC 0 : elog(ERROR, "output plugin used XID %u",
2525 : : GetCurrentTransactionId());
2526 : :
2527 : : /*
2528 : : * Remember the command ID and snapshot for the next set of changes in
2529 : : * streaming mode.
2530 : : */
1345 akapila@postgresql.o 2531 [ + + ]:CBC 1900 : if (streaming)
2532 : 695 : ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2533 [ + + ]: 1205 : else if (snapshot_now->copied)
2534 : 503 : ReorderBufferFreeSnap(rb, snapshot_now);
2535 : :
2536 : : /* cleanup */
3695 rhaas@postgresql.org 2537 : 1900 : TeardownHistoricSnapshot(false);
2538 : :
2539 : : /*
2540 : : * Aborting the current (sub-)transaction as a whole has the right
2541 : : * semantics. We want all locks acquired in here to be released, not
2542 : : * reassigned to the parent and we do not want any database access
2543 : : * have persistent effects.
2544 : : */
3440 andres@anarazel.de 2545 : 1900 : AbortCurrentTransaction();
2546 : :
2547 : : /* make sure there's no cache pollution */
1277 akapila@postgresql.o 2548 : 1900 : ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
2549 : :
3440 andres@anarazel.de 2550 [ + + ]: 1900 : if (using_subtxn)
3695 rhaas@postgresql.org 2551 : 438 : RollbackAndReleaseCurrentSubTransaction();
2552 : :
2553 : : /*
2554 : : * We are here due to one of the four reasons: 1. Decoding an
2555 : : * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2556 : : * prepared txn that was (partially) streamed. 4. Decoding a committed
2557 : : * txn.
2558 : : *
2559 : : * For 1, we allow truncation of txn data by removing the changes
2560 : : * already streamed but still keeping other things like invalidations,
2561 : : * snapshot, and tuplecids. For 2 and 3, we indicate
2562 : : * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2563 : : * data as the entire transaction has been decoded except for commit.
2564 : : * For 4, as the entire txn has been decoded, we can fully clean up
2565 : : * the TXN reorder buffer.
2566 : : */
1196 akapila@postgresql.o 2567 [ + + + + ]: 1900 : if (streaming || rbtxn_prepared(txn))
2568 : : {
2569 : 717 : ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
2570 : : /* Reset the CheckXidAlive */
1345 2571 : 717 : CheckXidAlive = InvalidTransactionId;
2572 : : }
2573 : : else
2574 : 1183 : ReorderBufferCleanupTXN(rb, txn);
2575 : : }
3695 rhaas@postgresql.org 2576 : 10 : PG_CATCH();
2577 : : {
1345 akapila@postgresql.o 2578 : 10 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2579 : 10 : ErrorData *errdata = CopyErrorData();
2580 : :
2581 : : /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
3695 rhaas@postgresql.org 2582 [ + - ]: 10 : if (iterstate)
2583 : 10 : ReorderBufferIterTXNFinish(rb, iterstate);
2584 : :
2585 : 10 : TeardownHistoricSnapshot(true);
2586 : :
2587 : : /*
2588 : : * Force cache invalidation to happen outside of a valid transaction
2589 : : * to prevent catalog access as we just caught an error.
2590 : : */
3440 andres@anarazel.de 2591 : 10 : AbortCurrentTransaction();
2592 : :
2593 : : /* make sure there's no cache pollution */
1277 akapila@postgresql.o 2594 : 10 : ReorderBufferExecuteInvalidations(txn->ninvalidations,
2595 : : txn->invalidations);
2596 : :
3440 andres@anarazel.de 2597 [ + + ]: 10 : if (using_subtxn)
2598 : 3 : RollbackAndReleaseCurrentSubTransaction();
2599 : :
2600 : : /*
2601 : : * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2602 : : * abort of the (sub)transaction we are streaming or preparing. We
2603 : : * need to do the cleanup and return gracefully on this error, see
2604 : : * SetupCheckXidLive.
2605 : : *
2606 : : * This error code can be thrown by one of the callbacks we call
2607 : : * during decoding so we need to ensure that we return gracefully only
2608 : : * when we are sending the data in streaming mode and the streaming is
2609 : : * not finished yet or when we are sending the data out on a PREPARE
2610 : : * during a two-phase commit.
2611 : : */
1074 akapila@postgresql.o 2612 [ + + ]: 10 : if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2613 [ - + - - ]: 7 : (stream_started || rbtxn_prepared(txn)))
2614 : : {
2615 : : /* curtxn must be set for streaming or prepared transactions */
2616 [ - + ]: 7 : Assert(curtxn);
2617 : :
2618 : : /* Cleanup the temporary error state. */
1345 2619 : 7 : FlushErrorState();
2620 : 7 : FreeErrorData(errdata);
2621 : 7 : errdata = NULL;
2622 : 7 : curtxn->concurrent_abort = true;
2623 : :
2624 : : /* Reset the TXN so that it is allowed to stream remaining data. */
2625 : 7 : ReorderBufferResetTXN(rb, txn, snapshot_now,
2626 : : command_id, prev_lsn,
2627 : : specinsert);
2628 : : }
2629 : : else
2630 : : {
2631 : 3 : ReorderBufferCleanupTXN(rb, txn);
2632 : 3 : MemoryContextSwitchTo(ecxt);
2633 : 3 : PG_RE_THROW();
2634 : : }
2635 : : }
2636 [ - + ]: 1907 : PG_END_TRY();
2637 : 1907 : }
2638 : :
2639 : : /*
2640 : : * Perform the replay of a transaction and its non-aborted subtransactions.
2641 : : *
2642 : : * Subtransactions previously have to be processed by
2643 : : * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2644 : : * transaction with ReorderBufferAssignChild.
2645 : : *
2646 : : * This interface is called once a prepare or toplevel commit is read for both
2647 : : * streamed as well as non-streamed transactions.
2648 : : */
2649 : : static void
1196 2650 : 1283 : ReorderBufferReplay(ReorderBufferTXN *txn,
2651 : : ReorderBuffer *rb, TransactionId xid,
2652 : : XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2653 : : TimestampTz commit_time,
2654 : : RepOriginId origin_id, XLogRecPtr origin_lsn)
2655 : : {
2656 : : Snapshot snapshot_now;
1345 2657 : 1283 : CommandId command_id = FirstCommandId;
2658 : :
2659 : 1283 : txn->final_lsn = commit_lsn;
2660 : 1283 : txn->end_lsn = end_lsn;
1005 2661 : 1283 : txn->xact_time.commit_time = commit_time;
1345 2662 : 1283 : txn->origin_id = origin_id;
2663 : 1283 : txn->origin_lsn = origin_lsn;
2664 : :
2665 : : /*
2666 : : * If the transaction was (partially) streamed, we need to commit it in a
2667 : : * 'streamed' way. That is, we first stream the remaining part of the
2668 : : * transaction, and then invoke stream_commit message.
2669 : : *
2670 : : * Called after everything (origin ID, LSN, ...) is stored in the
2671 : : * transaction to avoid passing that information directly.
2672 : : */
2673 [ + + ]: 1283 : if (rbtxn_is_streamed(txn))
2674 : : {
2675 : 66 : ReorderBufferStreamCommit(rb, txn);
2676 : 66 : return;
2677 : : }
2678 : :
2679 : : /*
2680 : : * If this transaction has no snapshot, it didn't make any changes to the
2681 : : * database, so there's nothing to decode. Note that
2682 : : * ReorderBufferCommitChild will have transferred any snapshots from
2683 : : * subtransactions if there were any.
2684 : : */
2685 [ + + ]: 1217 : if (txn->base_snapshot == NULL)
2686 : : {
2687 [ - + ]: 3 : Assert(txn->ninvalidations == 0);
2688 : :
2689 : : /*
2690 : : * Removing this txn before a commit might result in the computation
2691 : : * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2692 : : */
1196 2693 [ + - ]: 3 : if (!rbtxn_prepared(txn))
2694 : 3 : ReorderBufferCleanupTXN(rb, txn);
1345 2695 : 3 : return;
2696 : : }
2697 : :
2698 : 1214 : snapshot_now = txn->base_snapshot;
2699 : :
2700 : : /* Process and send the changes to output plugin. */
2701 : 1214 : ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2702 : : command_id, false);
2703 : : }
2704 : :
2705 : : /*
2706 : : * Commit a transaction.
2707 : : *
2708 : : * See comments for ReorderBufferReplay().
2709 : : */
2710 : : void
1196 2711 : 1246 : ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
2712 : : XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2713 : : TimestampTz commit_time,
2714 : : RepOriginId origin_id, XLogRecPtr origin_lsn)
2715 : : {
2716 : : ReorderBufferTXN *txn;
2717 : :
2718 : 1246 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2719 : : false);
2720 : :
2721 : : /* unknown transaction, nothing to replay */
2722 [ + + ]: 1246 : if (txn == NULL)
2723 : 1 : return;
2724 : :
2725 : 1245 : ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2726 : : origin_id, origin_lsn);
2727 : : }
2728 : :
2729 : : /*
2730 : : * Record the prepare information for a transaction.
2731 : : */
2732 : : bool
2733 : 129 : ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
2734 : : XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2735 : : TimestampTz prepare_time,
2736 : : RepOriginId origin_id, XLogRecPtr origin_lsn)
2737 : : {
2738 : : ReorderBufferTXN *txn;
2739 : :
2740 : 129 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2741 : :
2742 : : /* unknown transaction, nothing to do */
2743 [ - + ]: 129 : if (txn == NULL)
1196 akapila@postgresql.o 2744 :UBC 0 : return false;
2745 : :
2746 : : /*
2747 : : * Remember the prepare information to be later used by commit prepared in
2748 : : * case we skip doing prepare.
2749 : : */
1196 akapila@postgresql.o 2750 :CBC 129 : txn->final_lsn = prepare_lsn;
2751 : 129 : txn->end_lsn = end_lsn;
1005 2752 : 129 : txn->xact_time.prepare_time = prepare_time;
1196 2753 : 129 : txn->origin_id = origin_id;
2754 : 129 : txn->origin_lsn = origin_lsn;
2755 : :
2756 : 129 : return true;
2757 : : }
2758 : :
2759 : : /* Remember that we have skipped prepare */
2760 : : void
2761 : 92 : ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
2762 : : {
2763 : : ReorderBufferTXN *txn;
2764 : :
2765 : 92 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2766 : :
2767 : : /* unknown transaction, nothing to do */
2768 [ - + ]: 92 : if (txn == NULL)
1196 akapila@postgresql.o 2769 :UBC 0 : return;
2770 : :
1196 akapila@postgresql.o 2771 :CBC 92 : txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
2772 : : }
2773 : :
2774 : : /*
2775 : : * Prepare a two-phase transaction.
2776 : : *
2777 : : * See comments for ReorderBufferReplay().
2778 : : */
2779 : : void
2780 : 37 : ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
2781 : : char *gid)
2782 : : {
2783 : : ReorderBufferTXN *txn;
2784 : :
2785 : 37 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2786 : : false);
2787 : :
2788 : : /* unknown transaction, nothing to replay */
2789 [ - + ]: 37 : if (txn == NULL)
1196 akapila@postgresql.o 2790 :UBC 0 : return;
2791 : :
1196 akapila@postgresql.o 2792 :CBC 37 : txn->txn_flags |= RBTXN_PREPARE;
2793 : 37 : txn->gid = pstrdup(gid);
2794 : :
2795 : : /* The prepare info must have been updated in txn by now. */
2796 [ - + ]: 37 : Assert(txn->final_lsn != InvalidXLogRecPtr);
2797 : :
2798 : 37 : ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
1005 2799 : 37 : txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2800 : :
2801 : : /*
2802 : : * We send the prepare for the concurrently aborted xacts so that later
2803 : : * when rollback prepared is decoded and sent, the downstream should be
2804 : : * able to rollback such a xact. See comments atop DecodePrepare.
2805 : : *
2806 : : * Note, for the concurrent_abort + streaming case a stream_prepare was
2807 : : * already sent within the ReorderBufferReplay call above.
2808 : : */
1084 2809 [ - + - - ]: 37 : if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
1109 akapila@postgresql.o 2810 :UBC 0 : rb->prepare(rb, txn, txn->final_lsn);
2811 : : }
2812 : :
2813 : : /*
2814 : : * This is used to handle COMMIT/ROLLBACK PREPARED.
2815 : : */
2816 : : void
1196 akapila@postgresql.o 2817 :CBC 38 : ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
2818 : : XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2819 : : XLogRecPtr two_phase_at,
2820 : : TimestampTz commit_time, RepOriginId origin_id,
2821 : : XLogRecPtr origin_lsn, char *gid, bool is_commit)
2822 : : {
2823 : : ReorderBufferTXN *txn;
2824 : : XLogRecPtr prepare_end_lsn;
2825 : : TimestampTz prepare_time;
2826 : :
1146 2827 : 38 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2828 : :
2829 : : /* unknown transaction, nothing to do */
1196 2830 [ - + ]: 38 : if (txn == NULL)
1196 akapila@postgresql.o 2831 :UBC 0 : return;
2832 : :
2833 : : /*
2834 : : * By this time the txn has the prepare record information, remember it to
2835 : : * be later used for rollback.
2836 : : */
1196 akapila@postgresql.o 2837 :CBC 38 : prepare_end_lsn = txn->end_lsn;
1005 2838 : 38 : prepare_time = txn->xact_time.prepare_time;
2839 : :
2840 : : /* add the gid in the txn */
1196 2841 : 38 : txn->gid = pstrdup(gid);
2842 : :
2843 : : /*
2844 : : * It is possible that this transaction is not decoded at prepare time
2845 : : * either because by that time we didn't have a consistent snapshot, or
2846 : : * two_phase was not enabled, or it was decoded earlier but we have
2847 : : * restarted. We only need to send the prepare if it was not decoded
2848 : : * earlier. We don't need to decode the xact for aborts if it is not done
2849 : : * already.
2850 : : */
1005 2851 [ + + + - ]: 38 : if ((txn->final_lsn < two_phase_at) && is_commit)
2852 : : {
1196 2853 : 1 : txn->txn_flags |= RBTXN_PREPARE;
2854 : :
2855 : : /*
2856 : : * The prepare info must have been updated in txn even if we skip
2857 : : * prepare.
2858 : : */
2859 [ - + ]: 1 : Assert(txn->final_lsn != InvalidXLogRecPtr);
2860 : :
2861 : : /*
2862 : : * By this time the txn has the prepare record information and it is
2863 : : * important to use that so that downstream gets the accurate
2864 : : * information. If instead, we have passed commit information here
2865 : : * then downstream can behave as it has already replayed commit
2866 : : * prepared after the restart.
2867 : : */
2868 : 1 : ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
1005 2869 : 1 : txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2870 : : }
2871 : :
1196 2872 : 38 : txn->final_lsn = commit_lsn;
2873 : 38 : txn->end_lsn = end_lsn;
1005 2874 : 38 : txn->xact_time.commit_time = commit_time;
1196 2875 : 38 : txn->origin_id = origin_id;
2876 : 38 : txn->origin_lsn = origin_lsn;
2877 : :
2878 [ + + ]: 38 : if (is_commit)
2879 : 29 : rb->commit_prepared(rb, txn, commit_lsn);
2880 : : else
2881 : 9 : rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2882 : :
2883 : : /* cleanup: make sure there's no cache pollution */
2884 : 38 : ReorderBufferExecuteInvalidations(txn->ninvalidations,
2885 : : txn->invalidations);
2886 : 38 : ReorderBufferCleanupTXN(rb, txn);
2887 : : }
2888 : :
2889 : : /*
2890 : : * Abort a transaction that possibly has previous changes. Needs to be first
2891 : : * called for subtransactions and then for the toplevel xid.
2892 : : *
2893 : : * NB: Transactions handled here have to have actively aborted (i.e. have
2894 : : * produced an abort record). Implicitly aborted transactions are handled via
2895 : : * ReorderBufferAbortOld(); transactions we're just not interested in, but
2896 : : * which have committed are handled in ReorderBufferForget().
2897 : : *
2898 : : * This function purges this transaction and its contents from memory and
2899 : : * disk.
2900 : : */
2901 : : void
461 2902 : 106 : ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
2903 : : TimestampTz abort_time)
2904 : : {
2905 : : ReorderBufferTXN *txn;
2906 : :
3695 rhaas@postgresql.org 2907 : 106 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2908 : : false);
2909 : :
2910 : : /* unknown, nothing to remove */
2911 [ - + ]: 106 : if (txn == NULL)
3695 rhaas@postgresql.org 2912 :UBC 0 : return;
2913 : :
461 akapila@postgresql.o 2914 :CBC 106 : txn->xact_time.abort_time = abort_time;
2915 : :
2916 : : /* For streamed transactions notify the remote node about the abort. */
1345 2917 [ + + ]: 106 : if (rbtxn_is_streamed(txn))
2918 : : {
2919 : 29 : rb->stream_abort(rb, txn, lsn);
2920 : :
2921 : : /*
2922 : : * We might have decoded changes for this transaction that could load
2923 : : * the cache as per the current transaction's view (consider DDL's
2924 : : * happened in this transaction). We don't want the decoding of future
2925 : : * transactions to use those cache entries so execute invalidations.
2926 : : */
2927 [ - + ]: 29 : if (txn->ninvalidations > 0)
1345 akapila@postgresql.o 2928 :UBC 0 : ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
2929 : : txn->invalidations);
2930 : : }
2931 : :
2932 : : /* cosmetic... */
3695 rhaas@postgresql.org 2933 :CBC 106 : txn->final_lsn = lsn;
2934 : :
2935 : : /* remove potential on-disk data, and deallocate */
2936 : 106 : ReorderBufferCleanupTXN(rb, txn);
2937 : : }
2938 : :
2939 : : /*
2940 : : * Abort all transactions that aren't actually running anymore because the
2941 : : * server restarted.
2942 : : *
2943 : : * NB: These really have to be transactions that have aborted due to a server
2944 : : * crash/immediate restart, as we don't deal with invalidations here.
2945 : : */
2946 : : void
2947 : 1294 : ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
2948 : : {
2949 : : dlist_mutable_iter it;
2950 : :
2951 : : /*
2952 : : * Iterate through all (potential) toplevel TXNs and abort all that are
2953 : : * older than what possibly can be running. Once we've found the first
2954 : : * that is alive we stop, there might be some that acquired an xid earlier
2955 : : * but started writing later, but it's unlikely and they will be cleaned
2956 : : * up in a later call to this function.
2957 : : */
2958 [ + - + + ]: 1299 : dlist_foreach_modify(it, &rb->toplevel_by_lsn)
2959 : : {
2960 : : ReorderBufferTXN *txn;
2961 : :
2962 : 52 : txn = dlist_container(ReorderBufferTXN, node, it.cur);
2963 : :
2964 [ + + ]: 52 : if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2965 : : {
2528 andres@anarazel.de 2966 [ - + ]: 5 : elog(DEBUG2, "aborting old transaction %u", txn->xid);
2967 : :
2968 : : /* Notify the remote node about the crash/immediate restart. */
463 akapila@postgresql.o 2969 [ - + ]: 5 : if (rbtxn_is_streamed(txn))
463 akapila@postgresql.o 2970 :UBC 0 : rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2971 : :
2972 : : /* remove potential on-disk data, and deallocate this tx */
3695 rhaas@postgresql.org 2973 :CBC 5 : ReorderBufferCleanupTXN(rb, txn);
2974 : : }
2975 : : else
2976 : 47 : return;
2977 : : }
2978 : : }
2979 : :
2980 : : /*
2981 : : * Forget the contents of a transaction if we aren't interested in its
2982 : : * contents. Needs to be first called for subtransactions and then for the
2983 : : * toplevel xid.
2984 : : *
2985 : : * This is significantly different to ReorderBufferAbort() because
2986 : : * transactions that have committed need to be treated differently from aborted
2987 : : * ones since they may have modified the catalog.
2988 : : *
2989 : : * Note that this is only allowed to be called in the moment a transaction
2990 : : * commit has just been read, not earlier; otherwise later records referring
2991 : : * to this xid might re-create the transaction incompletely.
2992 : : */
2993 : : void
2994 : 2518 : ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2995 : : {
2996 : : ReorderBufferTXN *txn;
2997 : :
2998 : 2518 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2999 : : false);
3000 : :
3001 : : /* unknown, nothing to forget */
3002 [ + + ]: 2518 : if (txn == NULL)
3003 : 561 : return;
3004 : :
3005 : : /* this transaction mustn't be streamed */
493 akapila@postgresql.o 3006 [ - + ]: 1957 : Assert(!rbtxn_is_streamed(txn));
3007 : :
3008 : : /* cosmetic... */
3695 rhaas@postgresql.org 3009 : 1957 : txn->final_lsn = lsn;
3010 : :
3011 : : /*
3012 : : * Process cache invalidation messages if there are any. Even if we're not
3013 : : * interested in the transaction's contents, it could have manipulated the
3014 : : * catalog and we need to update the caches according to that.
3015 : : */
3016 [ + + + + ]: 1957 : if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2913 andres@anarazel.de 3017 : 546 : ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
3018 : : txn->invalidations);
3019 : : else
3695 rhaas@postgresql.org 3020 [ - + ]: 1411 : Assert(txn->ninvalidations == 0);
3021 : :
3022 : : /* remove potential on-disk data, and deallocate */
3023 : 1957 : ReorderBufferCleanupTXN(rb, txn);
3024 : : }
3025 : :
3026 : : /*
3027 : : * Invalidate cache for those transactions that need to be skipped just in case
3028 : : * catalogs were manipulated as part of the transaction.
3029 : : *
3030 : : * Note that this is a special-purpose function for prepared transactions where
3031 : : * we don't want to clean up the TXN even when we decide to skip it. See
3032 : : * DecodePrepare.
3033 : : */
3034 : : void
1196 akapila@postgresql.o 3035 : 89 : ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
3036 : : {
3037 : : ReorderBufferTXN *txn;
3038 : :
3039 : 89 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3040 : : false);
3041 : :
3042 : : /* unknown, nothing to do */
3043 [ - + ]: 89 : if (txn == NULL)
1196 akapila@postgresql.o 3044 :UBC 0 : return;
3045 : :
3046 : : /*
3047 : : * Process cache invalidation messages if there are any. Even if we're not
3048 : : * interested in the transaction's contents, it could have manipulated the
3049 : : * catalog and we need to update the caches according to that.
3050 : : */
1196 akapila@postgresql.o 3051 [ + - + + ]:CBC 89 : if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3052 : 23 : ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
3053 : : txn->invalidations);
3054 : : else
3055 [ - + ]: 66 : Assert(txn->ninvalidations == 0);
3056 : : }
3057 : :
3058 : :
3059 : : /*
3060 : : * Execute invalidations happening outside the context of a decoded
3061 : : * transaction. That currently happens either for xid-less commits
3062 : : * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
3063 : : * transactions (via ReorderBufferForget()).
3064 : : */
3065 : : void
2913 andres@anarazel.de 3066 : 571 : ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
3067 : : SharedInvalidationMessage *invalidations)
3068 : : {
3069 : 571 : bool use_subtxn = IsTransactionOrTransactionBlock();
3070 : : int i;
3071 : :
3072 [ + + ]: 571 : if (use_subtxn)
3073 : 403 : BeginInternalSubTransaction("replay");
3074 : :
3075 : : /*
3076 : : * Force invalidations to happen outside of a valid transaction - that way
3077 : : * entries will just be marked as invalid without accessing the catalog.
3078 : : * That's advantageous because we don't need to setup the full state
3079 : : * necessary for catalog access.
3080 : : */
3081 [ + + ]: 571 : if (use_subtxn)
3082 : 403 : AbortCurrentTransaction();
3083 : :
3084 [ + + ]: 23668 : for (i = 0; i < ninvalidations; i++)
3085 : 23097 : LocalExecuteInvalidationMessage(&invalidations[i]);
3086 : :
3087 [ + + ]: 571 : if (use_subtxn)
3088 : 403 : RollbackAndReleaseCurrentSubTransaction();
3089 : 571 : }
3090 : :
3091 : : /*
3092 : : * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3093 : : * least once for every xid in XLogRecord->xl_xid (other places in records
3094 : : * may, but do not have to be passed through here).
3095 : : *
3096 : : * Reorderbuffer keeps some data structures about transactions in LSN order,
3097 : : * for efficiency. To do that it has to know about when transactions are seen
3098 : : * first in the WAL. As many types of records are not actually interesting for
3099 : : * logical decoding, they do not necessarily pass through here.
3100 : : */
3101 : : void
2962 3102 : 2154405 : ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
3103 : : {
3104 : : /* many records won't have an xid assigned, centralize check here */
3105 [ + + ]: 2154405 : if (xid != InvalidTransactionId)
3106 : 2152350 : ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3695 rhaas@postgresql.org 3107 : 2154405 : }
3108 : :
3109 : : /*
3110 : : * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3111 : : * because the previous snapshot doesn't describe the catalog correctly for
3112 : : * following rows.
3113 : : */
3114 : : void
3115 : 1095 : ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
3116 : : XLogRecPtr lsn, Snapshot snap)
3117 : : {
3118 : 1095 : ReorderBufferChange *change = ReorderBufferGetChange(rb);
3119 : :
3691 tgl@sss.pgh.pa.us 3120 : 1095 : change->data.snapshot = snap;
3121 : 1095 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
3122 : :
1345 akapila@postgresql.o 3123 : 1095 : ReorderBufferQueueChange(rb, xid, lsn, change, false);
3695 rhaas@postgresql.org 3124 : 1095 : }
3125 : :
3126 : : /*
3127 : : * Set up the transaction's base snapshot.
3128 : : *
3129 : : * If we know that xid is a subtransaction, set the base snapshot on the
3130 : : * top-level transaction instead.
3131 : : */
3132 : : void
3133 : 2866 : ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
3134 : : XLogRecPtr lsn, Snapshot snap)
3135 : : {
3136 : : ReorderBufferTXN *txn;
3137 : : bool is_new;
3138 : :
534 peter@eisentraut.org 3139 [ - + ]: 2866 : Assert(snap != NULL);
3140 : :
3141 : : /*
3142 : : * Fetch the transaction to operate on. If we know it's a subtransaction,
3143 : : * operate on its top-level transaction instead.
3144 : : */
3695 rhaas@postgresql.org 3145 : 2866 : txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1556 alvherre@alvh.no-ip. 3146 [ + + ]: 2866 : if (rbtxn_is_known_subxact(txn))
2119 3147 : 120 : txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3148 : : NULL, InvalidXLogRecPtr, false);
3695 rhaas@postgresql.org 3149 [ - + ]: 2866 : Assert(txn->base_snapshot == NULL);
3150 : :
3151 : 2866 : txn->base_snapshot = snap;
3152 : 2866 : txn->base_snapshot_lsn = lsn;
2119 alvherre@alvh.no-ip. 3153 : 2866 : dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
3154 : :
3155 : 2866 : AssertTXNLsnOrder(rb);
3695 rhaas@postgresql.org 3156 : 2866 : }
3157 : :
3158 : : /*
3159 : : * Access the catalog with this CommandId at this point in the changestream.
3160 : : *
3161 : : * May only be called for command ids > 1
3162 : : */
3163 : : void
3164 : 22525 : ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
3165 : : XLogRecPtr lsn, CommandId cid)
3166 : : {
3167 : 22525 : ReorderBufferChange *change = ReorderBufferGetChange(rb);
3168 : :
3691 tgl@sss.pgh.pa.us 3169 : 22525 : change->data.command_id = cid;
3170 : 22525 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
3171 : :
1345 akapila@postgresql.o 3172 : 22525 : ReorderBufferQueueChange(rb, xid, lsn, change, false);
3695 rhaas@postgresql.org 3173 : 22525 : }
3174 : :
3175 : : /*
3176 : : * Update memory counters to account for the new or removed change.
3177 : : *
3178 : : * We update two counters - in the reorder buffer, and in the transaction
3179 : : * containing the change. The reorder buffer counter allows us to quickly
3180 : : * decide if we reached the memory limit, the transaction counter allows
3181 : : * us to quickly pick the largest transaction for eviction.
3182 : : *
3183 : : * Either txn or change must be non-NULL at least. We update the memory
3184 : : * counter of txn if it's non-NULL, otherwise change->txn.
3185 : : *
3186 : : * When streaming is enabled, we need to update the toplevel transaction
3187 : : * counters instead - we don't really care about subtransactions as we
3188 : : * can't stream them individually anyway, and we only pick toplevel
3189 : : * transactions for eviction. So only toplevel transactions matter.
3190 : : */
3191 : : static void
1611 akapila@postgresql.o 3192 : 1913532 : ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3193 : : ReorderBufferChange *change,
3194 : : ReorderBufferTXN *txn,
3195 : : bool addition, Size sz)
3196 : : {
3197 : : ReorderBufferTXN *toptxn;
3198 : :
11 msawada@postgresql.o 3199 [ + + - + ]:GNC 1913532 : Assert(txn || change);
3200 : :
3201 : : /*
3202 : : * Ignore tuple CID changes, because those are not evicted when reaching
3203 : : * memory limit. So we just don't count them, because it might easily
3204 : : * trigger a pointless attempt to spill.
3205 : : */
3206 [ + + + + ]: 1913532 : if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
1611 akapila@postgresql.o 3207 :CBC 22453 : return;
3208 : :
11 msawada@postgresql.o 3209 [ + + ]:GNC 1891079 : if (sz == 0)
3210 : 929 : return;
3211 : :
3212 [ + + ]: 1890150 : if (txn == NULL)
3213 : 1882991 : txn = change->txn;
3214 [ - + ]: 1890150 : Assert(txn != NULL);
3215 : :
3216 : : /*
3217 : : * Update the total size in top level as well. This is later used to
3218 : : * compute the decoding stats.
3219 : : */
394 akapila@postgresql.o 3220 [ + + ]:CBC 1890150 : toptxn = rbtxn_get_toptxn(txn);
3221 : :
1611 3222 [ + + ]: 1890150 : if (addition)
3223 : : {
3 msawada@postgresql.o 3224 :GNC 1709206 : Size oldsize = txn->size;
3225 : :
1345 akapila@postgresql.o 3226 :CBC 1709206 : txn->size += sz;
1611 3227 : 1709206 : rb->size += sz;
3228 : :
3229 : : /* Update the total size in the top transaction. */
1077 3230 : 1709206 : toptxn->total_size += sz;
3231 : :
3232 : : /* Update the max-heap */
3 msawada@postgresql.o 3233 [ + + ]:GNC 1709206 : if (oldsize != 0)
3234 : 1701989 : pairingheap_remove(rb->txn_heap, &txn->txn_node);
3235 : 1709206 : pairingheap_add(rb->txn_heap, &txn->txn_node);
3236 : : }
3237 : : else
3238 : : {
1345 akapila@postgresql.o 3239 [ + - - + ]:CBC 180944 : Assert((rb->size >= sz) && (txn->size >= sz));
3240 : 180944 : txn->size -= sz;
1611 3241 : 180944 : rb->size -= sz;
3242 : :
3243 : : /* Update the total size in the top transaction. */
1077 3244 : 180944 : toptxn->total_size -= sz;
3245 : :
3246 : : /* Update the max-heap */
3 msawada@postgresql.o 3247 :GNC 180944 : pairingheap_remove(rb->txn_heap, &txn->txn_node);
3248 [ + + ]: 180944 : if (txn->size != 0)
3249 : 173757 : pairingheap_add(rb->txn_heap, &txn->txn_node);
3250 : : }
3251 : :
1345 akapila@postgresql.o 3252 [ - + ]:CBC 1890150 : Assert(txn->size <= rb->size);
3253 : : }
3254 : :
3255 : : /*
3256 : : * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
3257 : : *
3258 : : * We do not include this change type in memory accounting, because we
3259 : : * keep CIDs in a separate list and do not evict them when reaching
3260 : : * the memory limit.
3261 : : */
3262 : : void
3695 rhaas@postgresql.org 3263 : 22525 : ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
3264 : : XLogRecPtr lsn, RelFileLocator locator,
3265 : : ItemPointerData tid, CommandId cmin,
3266 : : CommandId cmax, CommandId combocid)
3267 : : {
3268 : 22525 : ReorderBufferChange *change = ReorderBufferGetChange(rb);
3269 : : ReorderBufferTXN *txn;
3270 : :
3271 : 22525 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3272 : :
648 3273 : 22525 : change->data.tuplecid.locator = locator;
3691 tgl@sss.pgh.pa.us 3274 : 22525 : change->data.tuplecid.tid = tid;
3275 : 22525 : change->data.tuplecid.cmin = cmin;
3276 : 22525 : change->data.tuplecid.cmax = cmax;
3277 : 22525 : change->data.tuplecid.combocid = combocid;
3695 rhaas@postgresql.org 3278 : 22525 : change->lsn = lsn;
1611 akapila@postgresql.o 3279 : 22525 : change->txn = txn;
3691 tgl@sss.pgh.pa.us 3280 : 22525 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
3281 : :
3695 rhaas@postgresql.org 3282 : 22525 : dlist_push_tail(&txn->tuplecids, &change->node);
3283 : 22525 : txn->ntuplecids++;
3284 : 22525 : }
3285 : :
3286 : : /*
3287 : : * Accumulate the invalidations for executing them later.
3288 : : *
3289 : : * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3290 : : * accumulates all the invalidation messages in the toplevel transaction, if
3291 : : * available, otherwise in the current transaction, as well as in the form of
3292 : : * change in reorder buffer. We require to record it in form of the change
3293 : : * so that we can execute only the required invalidations instead of executing
3294 : : * all the invalidations on each CommandId increment. We also need to
3295 : : * accumulate these in the txn buffer because in some cases where we skip
3296 : : * processing the transaction (see ReorderBufferForget), we need to execute
3297 : : * all the invalidations together.
3298 : : */
3299 : : void
3300 : 4699 : ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
3301 : : XLogRecPtr lsn, Size nmsgs,
3302 : : SharedInvalidationMessage *msgs)
3303 : : {
3304 : : ReorderBufferTXN *txn;
3305 : : MemoryContext oldcontext;
3306 : : ReorderBufferChange *change;
3307 : :
3308 : 4699 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3309 : :
1277 akapila@postgresql.o 3310 : 4699 : oldcontext = MemoryContextSwitchTo(rb->context);
3311 : :
3312 : : /*
3313 : : * Collect all the invalidations under the top transaction, if available,
3314 : : * so that we can execute them all together. See comments atop this
3315 : : * function.
3316 : : */
394 3317 [ + + ]: 4699 : txn = rbtxn_get_toptxn(txn);
3318 : :
3695 rhaas@postgresql.org 3319 [ - + ]: 4699 : Assert(nmsgs > 0);
3320 : :
3321 : : /* Accumulate invalidations. */
1361 akapila@postgresql.o 3322 [ + + ]: 4699 : if (txn->ninvalidations == 0)
3323 : : {
3324 : 1069 : txn->ninvalidations = nmsgs;
3325 : 1069 : txn->invalidations = (SharedInvalidationMessage *)
1277 3326 : 1069 : palloc(sizeof(SharedInvalidationMessage) * nmsgs);
1361 3327 : 1069 : memcpy(txn->invalidations, msgs,
3328 : : sizeof(SharedInvalidationMessage) * nmsgs);
3329 : : }
3330 : : else
3331 : : {
3332 : 3630 : txn->invalidations = (SharedInvalidationMessage *)
3333 : 3630 : repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
3334 : 3630 : (txn->ninvalidations + nmsgs));
3335 : :
3336 : 3630 : memcpy(txn->invalidations + txn->ninvalidations, msgs,
3337 : : nmsgs * sizeof(SharedInvalidationMessage));
3338 : 3630 : txn->ninvalidations += nmsgs;
3339 : : }
3340 : :
1277 3341 : 4699 : change = ReorderBufferGetChange(rb);
3342 : 4699 : change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
3343 : 4699 : change->data.inval.ninvalidations = nmsgs;
3344 : 4699 : change->data.inval.invalidations = (SharedInvalidationMessage *)
3345 : 4699 : palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3346 : 4699 : memcpy(change->data.inval.invalidations, msgs,
3347 : : sizeof(SharedInvalidationMessage) * nmsgs);
3348 : :
3349 : 4699 : ReorderBufferQueueChange(rb, xid, lsn, change, false);
3350 : :
3351 : 4699 : MemoryContextSwitchTo(oldcontext);
3695 rhaas@postgresql.org 3352 : 4699 : }
3353 : :
3354 : : /*
3355 : : * Apply all invalidations we know. Possibly we only need parts at this point
3356 : : * in the changestream but we don't know which those are.
3357 : : */
3358 : : static void
1277 akapila@postgresql.o 3359 : 4065 : ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
3360 : : {
3361 : : int i;
3362 : :
3363 [ + + ]: 40449 : for (i = 0; i < nmsgs; i++)
3364 : 36384 : LocalExecuteInvalidationMessage(&msgs[i]);
3695 rhaas@postgresql.org 3365 : 4065 : }
3366 : :
3367 : : /*
3368 : : * Mark a transaction as containing catalog changes
3369 : : */
3370 : : void
3371 : 28213 : ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
3372 : : XLogRecPtr lsn)
3373 : : {
3374 : : ReorderBufferTXN *txn;
3375 : :
3376 : 28213 : txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3377 : :
612 akapila@postgresql.o 3378 [ + + ]: 28213 : if (!rbtxn_has_catalog_changes(txn))
3379 : : {
3380 : 1106 : txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
529 drowley@postgresql.o 3381 : 1106 : dclist_push_tail(&rb->catchange_txns, &txn->catchange_node);
3382 : : }
3383 : :
3384 : : /*
3385 : : * Mark top-level transaction as having catalog changes too if one of its
3386 : : * children has so that the ReorderBufferBuildTupleCidHash can
3387 : : * conveniently check just top-level transaction and decide whether to
3388 : : * build the hash table or not.
3389 : : */
394 akapila@postgresql.o 3390 [ + + ]: 28213 : if (rbtxn_is_subtxn(txn))
3391 : : {
3392 [ + - ]: 903 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3393 : :
3394 [ + + ]: 903 : if (!rbtxn_has_catalog_changes(toptxn))
3395 : : {
3396 : 18 : toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3397 : 18 : dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
3398 : : }
3399 : : }
612 3400 : 28213 : }
3401 : :
3402 : : /*
3403 : : * Return palloc'ed array of the transactions that have changed catalogs.
3404 : : * The returned array is sorted in xidComparator order.
3405 : : *
3406 : : * The caller must free the returned array when done with it.
3407 : : */
3408 : : TransactionId *
3409 : 297 : ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
3410 : : {
3411 : : dlist_iter iter;
3412 : 297 : TransactionId *xids = NULL;
3413 : 297 : size_t xcnt = 0;
3414 : :
3415 : : /* Quick return if the list is empty */
529 drowley@postgresql.o 3416 [ + + ]: 297 : if (dclist_count(&rb->catchange_txns) == 0)
612 akapila@postgresql.o 3417 : 289 : return NULL;
3418 : :
3419 : : /* Initialize XID array */
529 drowley@postgresql.o 3420 : 8 : xids = (TransactionId *) palloc(sizeof(TransactionId) *
3421 : 8 : dclist_count(&rb->catchange_txns));
3422 [ + - + + ]: 18 : dclist_foreach(iter, &rb->catchange_txns)
3423 : : {
3424 : 10 : ReorderBufferTXN *txn = dclist_container(ReorderBufferTXN,
3425 : : catchange_node,
3426 : : iter.cur);
3427 : :
612 akapila@postgresql.o 3428 [ - + ]: 10 : Assert(rbtxn_has_catalog_changes(txn));
3429 : :
3430 : 10 : xids[xcnt++] = txn->xid;
3431 : : }
3432 : :
3433 : 8 : qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3434 : :
529 drowley@postgresql.o 3435 [ - + ]: 8 : Assert(xcnt == dclist_count(&rb->catchange_txns));
612 akapila@postgresql.o 3436 : 8 : return xids;
3437 : : }
3438 : :
3439 : : /*
3440 : : * Query whether a transaction is already *known* to contain catalog
3441 : : * changes. This can be wrong until directly before the commit!
3442 : : */
3443 : : bool
3695 rhaas@postgresql.org 3444 : 4059 : ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
3445 : : {
3446 : : ReorderBufferTXN *txn;
3447 : :
3448 : 4059 : txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3449 : : false);
3450 [ + + ]: 4059 : if (txn == NULL)
3451 : 646 : return false;
3452 : :
1556 alvherre@alvh.no-ip. 3453 : 3413 : return rbtxn_has_catalog_changes(txn);
3454 : : }
3455 : :
3456 : : /*
3457 : : * ReorderBufferXidHasBaseSnapshot
3458 : : * Have we already set the base snapshot for the given txn/subtxn?
3459 : : */
3460 : : bool
3695 rhaas@postgresql.org 3461 : 1518477 : ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
3462 : : {
3463 : : ReorderBufferTXN *txn;
3464 : :
2119 alvherre@alvh.no-ip. 3465 : 1518477 : txn = ReorderBufferTXNByXid(rb, xid, false,
3466 : : NULL, InvalidXLogRecPtr, false);
3467 : :
3468 : : /* transaction isn't known yet, ergo no snapshot */
3695 rhaas@postgresql.org 3469 [ + + ]: 1518477 : if (txn == NULL)
3470 : 3 : return false;
3471 : :
3472 : : /* a known subtxn? operate on top-level txn instead */
1556 alvherre@alvh.no-ip. 3473 [ + + ]: 1518474 : if (rbtxn_is_known_subxact(txn))
2119 3474 : 491941 : txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3475 : : NULL, InvalidXLogRecPtr, false);
3476 : :
3695 rhaas@postgresql.org 3477 : 1518474 : return txn->base_snapshot != NULL;
3478 : : }
3479 : :
3480 : :
3481 : : /*
3482 : : * ---------------------------------------
3483 : : * Disk serialization support
3484 : : * ---------------------------------------
3485 : : */
3486 : :
3487 : : /*
3488 : : * Ensure the IO buffer is >= sz.
3489 : : */
3490 : : static void
3491 : 2920530 : ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
3492 : : {
3493 [ + + ]: 2920530 : if (!rb->outbufsize)
3494 : : {
3495 : 48 : rb->outbuf = MemoryContextAlloc(rb->context, sz);
3496 : 48 : rb->outbufsize = sz;
3497 : : }
3498 [ + + ]: 2920482 : else if (rb->outbufsize < sz)
3499 : : {
3500 : 298 : rb->outbuf = repalloc(rb->outbuf, sz);
3501 : 298 : rb->outbufsize = sz;
3502 : : }
3503 : 2920530 : }
3504 : :
3505 : :
3506 : : /* Compare two transactions by size */
3507 : : static int
3 msawada@postgresql.o 3508 :GNC 328640 : ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
3509 : : {
3510 : 328640 : const ReorderBufferTXN *ta = pairingheap_const_container(ReorderBufferTXN, txn_node, a);
3511 : 328640 : const ReorderBufferTXN *tb = pairingheap_const_container(ReorderBufferTXN, txn_node, b);
3512 : :
11 3513 [ + + ]: 328640 : if (ta->size < tb->size)
3514 : 236435 : return -1;
3515 [ + + ]: 92205 : if (ta->size > tb->size)
3516 : 91297 : return 1;
3517 : 908 : return 0;
3518 : : }
3519 : :
3520 : : /*
3521 : : * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3522 : : */
3523 : : static ReorderBufferTXN *
11 msawada@postgresql.o 3524 :CBC 3218 : ReorderBufferLargestTXN(ReorderBuffer *rb)
3525 : : {
3526 : : ReorderBufferTXN *largest;
3527 : :
3528 : : /* Get the largest transaction from the max-heap */
3 msawada@postgresql.o 3529 :GNC 3218 : largest = pairingheap_container(ReorderBufferTXN, txn_node,
3530 : : pairingheap_first(rb->txn_heap));
3531 : :
1611 akapila@postgresql.o 3532 [ - + ]:CBC 3218 : Assert(largest);
3533 [ - + ]: 3218 : Assert(largest->size > 0);
3534 [ - + ]: 3218 : Assert(largest->size <= rb->size);
3535 : :
3536 : 3218 : return largest;
3537 : : }
3538 : :
3539 : : /*
3540 : : * Find the largest streamable toplevel transaction to evict (by streaming).
3541 : : *
3542 : : * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3543 : : * should give us the same transaction (because we don't update memory account
3544 : : * for subtransaction with streaming, so it's always 0). But we can simply
3545 : : * iterate over the limited number of toplevel transactions that have a base
3546 : : * snapshot. There is no use of selecting a transaction that doesn't have base
3547 : : * snapshot because we don't decode such transactions. Also, we do not select
3548 : : * the transaction which doesn't have any streamable change.
3549 : : *
3550 : : * Note that, we skip transactions that contain incomplete changes. There
3551 : : * is a scope of optimization here such that we can select the largest
3552 : : * transaction which has incomplete changes. But that will make the code and
3553 : : * design quite complex and that might not be worth the benefit. If we plan to
3554 : : * stream the transactions that contain incomplete changes then we need to
3555 : : * find a way to partially stream/truncate the transaction changes in-memory
3556 : : * and build a mechanism to partially truncate the spilled files.
3557 : : * Additionally, whenever we partially stream the transaction we need to
3558 : : * maintain the last streamed lsn and next time we need to restore from that
3559 : : * segment and the offset in WAL. As we stream the changes from the top
3560 : : * transaction and restore them subtransaction wise, we need to even remember
3561 : : * the subxact from where we streamed the last change.
3562 : : */
3563 : : static ReorderBufferTXN *
493 3564 : 675 : ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
3565 : : {
3566 : : dlist_iter iter;
1345 3567 : 675 : Size largest_size = 0;
3568 : 675 : ReorderBufferTXN *largest = NULL;
3569 : :
3570 : : /* Find the largest top-level transaction having a base snapshot. */
1080 3571 [ + - + + ]: 1463 : dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
3572 : : {
3573 : : ReorderBufferTXN *txn;
3574 : :
3575 : 788 : txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3576 : :
3577 : : /* must not be a subtxn */
3578 [ - + ]: 788 : Assert(!rbtxn_is_known_subxact(txn));
3579 : : /* base_snapshot must be set */
3580 [ - + ]: 788 : Assert(txn->base_snapshot != NULL);
3581 : :
3582 [ + + + - ]: 788 : if ((largest == NULL || txn->total_size > largest_size) &&
493 3583 [ + + + + ]: 788 : (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3584 [ + - ]: 697 : rbtxn_has_streamable_change(txn))
3585 : : {
1345 3586 : 697 : largest = txn;
3587 : 697 : largest_size = txn->total_size;
3588 : : }
3589 : : }
3590 : :
3591 : 675 : return largest;
3592 : : }
3593 : :
3594 : : /*
3595 : : * Check whether the logical_decoding_work_mem limit was reached, and if yes
3596 : : * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3597 : : * disk or send to the output plugin until we reach under the memory limit.
3598 : : *
3599 : : * If debug_logical_replication_streaming is set to "immediate", stream or
3600 : : * serialize the changes immediately.
3601 : : *
3602 : : * XXX At this point we select the transactions until we reach under the memory
3603 : : * limit, but we might also adapt a more elaborate eviction strategy - for example
3604 : : * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3605 : : * limit.
3606 : : */
3607 : : static void
1611 3608 : 1535468 : ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
3609 : : {
3610 : : ReorderBufferTXN *txn;
3611 : :
3612 : : /*
3613 : : * Bail out if debug_logical_replication_streaming is buffered and we
3614 : : * haven't exceeded the memory limit.
3615 : : */
229 peter@eisentraut.org 3616 [ + + ]: 1535468 : if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED &&
475 akapila@postgresql.o 3617 [ + + ]: 1535034 : rb->size < logical_decoding_work_mem * 1024L)
1611 3618 : 1531619 : return;
3619 : :
3620 : : /*
3621 : : * If debug_logical_replication_streaming is immediate, loop until there's
3622 : : * no change. Otherwise, loop until we reach under the memory limit. One
3623 : : * might think that just by evicting the largest (sub)transaction we will
3624 : : * come under the memory limit based on assumption that the selected
3625 : : * transaction is at least as large as the most recent change (which
3626 : : * caused us to go over the memory limit). However, that is not true
3627 : : * because a user can reduce the logical_decoding_work_mem to a smaller
3628 : : * value before the most recent change.
3629 : : */
475 3630 [ + + ]: 7698 : while (rb->size >= logical_decoding_work_mem * 1024L ||
229 peter@eisentraut.org 3631 [ + + ]: 4283 : (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE &&
475 akapila@postgresql.o 3632 [ + + ]: 868 : rb->size > 0))
3633 : : {
3634 : : /*
3635 : : * Pick the largest transaction and evict it from memory by streaming,
3636 : : * if possible. Otherwise, spill to disk.
3637 : : */
1345 3638 [ + + + + ]: 4524 : if (ReorderBufferCanStartStreaming(rb) &&
493 3639 : 675 : (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3640 : : {
3641 : : /* we know there has to be one, because the size is not zero */
394 3642 [ + - - + ]: 631 : Assert(txn && rbtxn_is_toptxn(txn));
1345 3643 [ - + ]: 631 : Assert(txn->total_size > 0);
3644 [ - + ]: 631 : Assert(rb->size >= txn->total_size);
3645 : :
3646 : 631 : ReorderBufferStreamTXN(rb, txn);
3647 : : }
3648 : : else
3649 : : {
3650 : : /*
3651 : : * Pick the largest transaction (or subtransaction) and evict it
3652 : : * from memory by serializing it to disk.
3653 : : */
3654 : 3218 : txn = ReorderBufferLargestTXN(rb);
3655 : :
3656 : : /* we know there has to be one, because the size is not zero */
3657 [ - + ]: 3218 : Assert(txn);
3658 [ - + ]: 3218 : Assert(txn->size > 0);
3659 [ - + ]: 3218 : Assert(rb->size >= txn->size);
3660 : :
3661 : 3218 : ReorderBufferSerializeTXN(rb, txn);
3662 : : }
3663 : :
3664 : : /*
3665 : : * After eviction, the transaction should have no entries in memory,
3666 : : * and should use 0 bytes for changes.
3667 : : */
1404 3668 [ - + ]: 3849 : Assert(txn->size == 0);
3669 [ - + ]: 3849 : Assert(txn->nentries_mem == 0);
3670 : : }
3671 : :
3672 : : /* We must be under the memory limit now. */
1611 3673 [ - + ]: 3849 : Assert(rb->size < logical_decoding_work_mem * 1024L);
3674 : :
3675 : : }
3676 : :
3677 : : /*
3678 : : * Spill data of a large transaction (and its subtransactions) to disk.
3679 : : */
3680 : : static void
3695 rhaas@postgresql.org 3681 : 3522 : ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
3682 : : {
3683 : : dlist_iter subtxn_i;
3684 : : dlist_mutable_iter change_i;
3685 : 3522 : int fd = -1;
3686 : 3522 : XLogSegNo curOpenSegNo = 0;
3687 : 3522 : Size spilled = 0;
1284 akapila@postgresql.o 3688 : 3522 : Size size = txn->size;
3689 : :
3637 tgl@sss.pgh.pa.us 3690 [ - + ]: 3522 : elog(DEBUG2, "spill %u changes in XID %u to disk",
3691 : : (uint32) txn->nentries_mem, txn->xid);
3692 : :
3693 : : /* do the same to all child TXs */
3695 rhaas@postgresql.org 3694 [ + - + + ]: 3790 : dlist_foreach(subtxn_i, &txn->subtxns)
3695 : : {
3696 : : ReorderBufferTXN *subtxn;
3697 : :
3698 : 268 : subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3699 : 268 : ReorderBufferSerializeTXN(rb, subtxn);
3700 : : }
3701 : :
3702 : : /* serialize changestream */
3703 [ + - + + ]: 1298843 : dlist_foreach_modify(change_i, &txn->changes)
3704 : : {
3705 : : ReorderBufferChange *change;
3706 : :
3707 : 1295321 : change = dlist_container(ReorderBufferChange, node, change_i.cur);
3708 : :
3709 : : /*
3710 : : * store in segment in which it belongs by start lsn, don't split over
3711 : : * multiple segments tho
3712 : : */
2399 andres@anarazel.de 3713 [ + + ]: 1295321 : if (fd == -1 ||
3714 [ + + ]: 1292050 : !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3715 : : {
3716 : : char path[MAXPGPATH];
3717 : :
3695 rhaas@postgresql.org 3718 [ + + ]: 3274 : if (fd != -1)
3719 : 3 : CloseTransientFile(fd);
3720 : :
2399 andres@anarazel.de 3721 : 3274 : XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3722 : :
3723 : : /*
3724 : : * No need to care about TLIs here, only used during a single run,
3725 : : * so each LSN only maps to a specific WAL record.
3726 : : */
2231 alvherre@alvh.no-ip. 3727 : 3274 : ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
3728 : : curOpenSegNo);
3729 : :
3730 : : /* open segment, create it if necessary */
3695 rhaas@postgresql.org 3731 : 3274 : fd = OpenTransientFile(path,
3732 : : O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3733 : :
3734 [ - + ]: 3274 : if (fd < 0)
3695 rhaas@postgresql.org 3735 [ # # ]:UBC 0 : ereport(ERROR,
3736 : : (errcode_for_file_access(),
3737 : : errmsg("could not open file \"%s\": %m", path)));
3738 : : }
3739 : :
3695 rhaas@postgresql.org 3740 :CBC 1295321 : ReorderBufferSerializeChange(rb, txn, fd, change);
3741 : 1295321 : dlist_delete(&change->node);
11 msawada@postgresql.o 3742 :GNC 1295321 : ReorderBufferReturnChange(rb, change, false);
3743 : :
3695 rhaas@postgresql.org 3744 :CBC 1295321 : spilled++;
3745 : : }
3746 : :
3747 : : /* Update the memory counter */
11 msawada@postgresql.o 3748 :GNC 3522 : ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3749 : :
3750 : : /* update the statistics iff we have spilled anything */
1284 akapila@postgresql.o 3751 [ + + ]:CBC 3522 : if (spilled)
3752 : : {
3753 : 3271 : rb->spillCount += 1;
3754 : 3271 : rb->spillBytes += size;
3755 : :
3756 : : /* don't consider already serialized transactions */
3757 [ + + + - ]: 3271 : rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3758 : :
3759 : : /* update the decoding stats */
1074 3760 : 3271 : UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
3761 : : }
3762 : :
3695 rhaas@postgresql.org 3763 [ - + ]: 3522 : Assert(spilled == txn->nentries_mem);
3764 [ - + ]: 3522 : Assert(dlist_is_empty(&txn->changes));
3765 : 3522 : txn->nentries_mem = 0;
1556 alvherre@alvh.no-ip. 3766 : 3522 : txn->txn_flags |= RBTXN_IS_SERIALIZED;
3767 : :
3695 rhaas@postgresql.org 3768 [ + + ]: 3522 : if (fd != -1)
3769 : 3271 : CloseTransientFile(fd);
3770 : 3522 : }
3771 : :
3772 : : /*
3773 : : * Serialize individual change to disk.
3774 : : */
3775 : : static void
3776 : 1295321 : ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
3777 : : int fd, ReorderBufferChange *change)
3778 : : {
3779 : : ReorderBufferDiskChange *ondisk;
3780 : 1295321 : Size sz = sizeof(ReorderBufferDiskChange);
3781 : :
3782 : 1295321 : ReorderBufferSerializeReserve(rb, sz);
3783 : :
3784 : 1295321 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3785 : 1295321 : memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3786 : :
3691 tgl@sss.pgh.pa.us 3787 [ + + + + : 1295321 : switch (change->action)
- + - ]
3788 : : {
3789 : : /* fall through these, they're all similar enough */
3790 : 1278048 : case REORDER_BUFFER_CHANGE_INSERT:
3791 : : case REORDER_BUFFER_CHANGE_UPDATE:
3792 : : case REORDER_BUFFER_CHANGE_DELETE:
3793 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
3794 : : {
3795 : : char *data;
3796 : : HeapTuple oldtup,
3797 : : newtup;
3695 rhaas@postgresql.org 3798 : 1278048 : Size oldlen = 0;
3799 : 1278048 : Size newlen = 0;
3800 : :
3691 tgl@sss.pgh.pa.us 3801 : 1278048 : oldtup = change->data.tp.oldtuple;
3802 : 1278048 : newtup = change->data.tp.newtuple;
3803 : :
3804 [ + + ]: 1278048 : if (oldtup)
3805 : : {
2962 andres@anarazel.de 3806 : 97000 : sz += sizeof(HeapTupleData);
76 msawada@postgresql.o 3807 :GNC 97000 : oldlen = oldtup->t_len;
2962 andres@anarazel.de 3808 :CBC 97000 : sz += oldlen;
3809 : : }
3810 : :
3691 tgl@sss.pgh.pa.us 3811 [ + + ]: 1278048 : if (newtup)
3812 : : {
2962 andres@anarazel.de 3813 : 1127346 : sz += sizeof(HeapTupleData);
76 msawada@postgresql.o 3814 :GNC 1127346 : newlen = newtup->t_len;
2962 andres@anarazel.de 3815 :CBC 1127346 : sz += newlen;
3816 : : }
3817 : :
3818 : : /* make sure we have enough space */
3695 rhaas@postgresql.org 3819 : 1278048 : ReorderBufferSerializeReserve(rb, sz);
3820 : :
3821 : 1278048 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3822 : : /* might have been reallocated above */
3823 : 1278048 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3824 : :
3825 [ + + ]: 1278048 : if (oldlen)
3826 : : {
76 msawada@postgresql.o 3827 :GNC 97000 : memcpy(data, oldtup, sizeof(HeapTupleData));
2962 andres@anarazel.de 3828 :CBC 97000 : data += sizeof(HeapTupleData);
3829 : :
76 msawada@postgresql.o 3830 :GNC 97000 : memcpy(data, oldtup->t_data, oldlen);
3695 rhaas@postgresql.org 3831 :CBC 97000 : data += oldlen;
3832 : : }
3833 : :
3834 [ + + ]: 1278048 : if (newlen)
3835 : : {
76 msawada@postgresql.o 3836 :GNC 1127346 : memcpy(data, newtup, sizeof(HeapTupleData));
2962 andres@anarazel.de 3837 :CBC 1127346 : data += sizeof(HeapTupleData);
3838 : :
76 msawada@postgresql.o 3839 :GNC 1127346 : memcpy(data, newtup->t_data, newlen);
2960 andres@anarazel.de 3840 :CBC 1127346 : data += newlen;
3841 : : }
2930 simon@2ndQuadrant.co 3842 : 1278048 : break;
3843 : : }
3844 : 19 : case REORDER_BUFFER_CHANGE_MESSAGE:
3845 : : {
3846 : : char *data;
3847 : 19 : Size prefix_size = strlen(change->data.msg.prefix) + 1;
3848 : :
3849 : 19 : sz += prefix_size + change->data.msg.message_size +
3850 : : sizeof(Size) + sizeof(Size);
3851 : 19 : ReorderBufferSerializeReserve(rb, sz);
3852 : :
3853 : 19 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3854 : :
3855 : : /* might have been reallocated above */
2755 rhaas@postgresql.org 3856 : 19 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3857 : :
3858 : : /* write the prefix including the size */
2930 simon@2ndQuadrant.co 3859 : 19 : memcpy(data, &prefix_size, sizeof(Size));
3860 : 19 : data += sizeof(Size);
3861 : 19 : memcpy(data, change->data.msg.prefix,
3862 : : prefix_size);
3863 : 19 : data += prefix_size;
3864 : :
3865 : : /* write the message including the size */
3866 : 19 : memcpy(data, &change->data.msg.message_size, sizeof(Size));
3867 : 19 : data += sizeof(Size);
3868 : 19 : memcpy(data, change->data.msg.message,
3869 : : change->data.msg.message_size);
3870 : 19 : data += change->data.msg.message_size;
3871 : :
1277 akapila@postgresql.o 3872 : 19 : break;
3873 : : }
3874 : 117 : case REORDER_BUFFER_CHANGE_INVALIDATION:
3875 : : {
3876 : : char *data;
3877 : 117 : Size inval_size = sizeof(SharedInvalidationMessage) *
331 tgl@sss.pgh.pa.us 3878 : 117 : change->data.inval.ninvalidations;
3879 : :
1277 akapila@postgresql.o 3880 : 117 : sz += inval_size;
3881 : :
3882 : 117 : ReorderBufferSerializeReserve(rb, sz);
3883 : 117 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3884 : :
3885 : : /* might have been reallocated above */
3886 : 117 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3887 : 117 : memcpy(data, change->data.inval.invalidations, inval_size);
3888 : 117 : data += inval_size;
3889 : :
3695 rhaas@postgresql.org 3890 : 117 : break;
3891 : : }
3892 : 2 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
3893 : : {
3894 : : Snapshot snap;
3895 : : char *data;
3896 : :
3691 tgl@sss.pgh.pa.us 3897 : 2 : snap = change->data.snapshot;
3898 : :
3695 rhaas@postgresql.org 3899 : 2 : sz += sizeof(SnapshotData) +
3691 tgl@sss.pgh.pa.us 3900 : 2 : sizeof(TransactionId) * snap->xcnt +
1549 alvherre@alvh.no-ip. 3901 : 2 : sizeof(TransactionId) * snap->subxcnt;
3902 : :
3903 : : /* make sure we have enough space */
3695 rhaas@postgresql.org 3904 : 2 : ReorderBufferSerializeReserve(rb, sz);
3905 : 2 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3906 : : /* might have been reallocated above */
3907 : 2 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3908 : :
3691 tgl@sss.pgh.pa.us 3909 : 2 : memcpy(data, snap, sizeof(SnapshotData));
3695 rhaas@postgresql.org 3910 : 2 : data += sizeof(SnapshotData);
3911 : :
3691 tgl@sss.pgh.pa.us 3912 [ + - ]: 2 : if (snap->xcnt)
3913 : : {
3914 : 2 : memcpy(data, snap->xip,
3628 rhaas@postgresql.org 3915 : 2 : sizeof(TransactionId) * snap->xcnt);
3916 : 2 : data += sizeof(TransactionId) * snap->xcnt;
3917 : : }
3918 : :
3691 tgl@sss.pgh.pa.us 3919 [ - + ]: 2 : if (snap->subxcnt)
3920 : : {
3691 tgl@sss.pgh.pa.us 3921 :UBC 0 : memcpy(data, snap->subxip,
3628 rhaas@postgresql.org 3922 : 0 : sizeof(TransactionId) * snap->subxcnt);
3923 : 0 : data += sizeof(TransactionId) * snap->subxcnt;
3924 : : }
3695 rhaas@postgresql.org 3925 :CBC 2 : break;
3926 : : }
2199 peter_e@gmx.net 3927 :UBC 0 : case REORDER_BUFFER_CHANGE_TRUNCATE:
3928 : : {
3929 : : Size size;
3930 : : char *data;
3931 : :
3932 : : /* account for the OIDs of truncated relations */
2050 tomas.vondra@postgre 3933 : 0 : size = sizeof(Oid) * change->data.truncate.nrelids;
3934 : 0 : sz += size;
3935 : :
3936 : : /* make sure we have enough space */
3937 : 0 : ReorderBufferSerializeReserve(rb, sz);
3938 : :
3939 : 0 : data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3940 : : /* might have been reallocated above */
3941 : 0 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3942 : :
3943 : 0 : memcpy(data, change->data.truncate.relids, size);
3944 : 0 : data += size;
3945 : :
3946 : 0 : break;
3947 : : }
3264 andres@anarazel.de 3948 :CBC 17135 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
3949 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
3950 : : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
3951 : : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
3952 : : /* ReorderBufferChange contains everything important */
3695 rhaas@postgresql.org 3953 : 17135 : break;
3954 : : }
3955 : :
3956 : 1295321 : ondisk->size = sz;
3957 : :
2079 michael@paquier.xyz 3958 : 1295321 : errno = 0;
2584 rhaas@postgresql.org 3959 : 1295321 : pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3695 3960 [ - + ]: 1295321 : if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3961 : : {
2787 tgl@sss.pgh.pa.us 3962 :UBC 0 : int save_errno = errno;
3963 : :
3695 rhaas@postgresql.org 3964 : 0 : CloseTransientFile(fd);
3965 : :
3966 : : /* if write didn't set errno, assume problem is no disk space */
2120 michael@paquier.xyz 3967 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
3695 rhaas@postgresql.org 3968 [ # # ]: 0 : ereport(ERROR,
3969 : : (errcode_for_file_access(),
3970 : : errmsg("could not write to data file for XID %u: %m",
3971 : : txn->xid)));
3972 : : }
2584 rhaas@postgresql.org 3973 :CBC 1295321 : pgstat_report_wait_end();
3974 : :
3975 : : /*
3976 : : * Keep the transaction's final_lsn up to date with each change we send to
3977 : : * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3978 : : * only do this on commit and abort records, but that doesn't work if a
3979 : : * system crash leaves a transaction without its abort record).
3980 : : *
3981 : : * Make sure not to move it backwards.
3982 : : */
1549 alvherre@alvh.no-ip. 3983 [ + + ]: 1295321 : if (txn->final_lsn < change->lsn)
3984 : 1290846 : txn->final_lsn = change->lsn;
3985 : :
3691 tgl@sss.pgh.pa.us 3986 [ - + ]: 1295321 : Assert(ondisk->change.action == change->action);
3695 rhaas@postgresql.org 3987 : 1295321 : }
3988 : :
3989 : : /* Returns true, if the output plugin supports streaming, false, otherwise. */
3990 : : static inline bool
1345 akapila@postgresql.o 3991 : 1838941 : ReorderBufferCanStream(ReorderBuffer *rb)
3992 : : {
3993 : 1838941 : LogicalDecodingContext *ctx = rb->private_data;
3994 : :
3995 : 1838941 : return ctx->streaming;
3996 : : }
3997 : :
3998 : : /* Returns true, if the streaming can be started now, false, otherwise. */
3999 : : static inline bool
4000 : 303473 : ReorderBufferCanStartStreaming(ReorderBuffer *rb)
4001 : : {
4002 : 303473 : LogicalDecodingContext *ctx = rb->private_data;
4003 : 303473 : SnapBuild *builder = ctx->snapshot_builder;
4004 : :
4005 : : /* We can't start streaming unless a consistent state is reached. */
1227 4006 [ - + ]: 303473 : if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
1227 akapila@postgresql.o 4007 :UBC 0 : return false;
4008 : :
4009 : : /*
4010 : : * We can't start streaming immediately even if the streaming is enabled
4011 : : * because we previously decoded this transaction and now just are
4012 : : * restarting.
4013 : : */
1345 akapila@postgresql.o 4014 [ + + ]:CBC 303473 : if (ReorderBufferCanStream(rb) &&
493 4015 [ + + ]: 300813 : !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
1345 4016 : 163531 : return true;
4017 : :
4018 : 139942 : return false;
4019 : : }
4020 : :
4021 : : /*
4022 : : * Send data of a large transaction (and its subtransactions) to the
4023 : : * output plugin, but using the stream API.
4024 : : */
4025 : : static void
4026 : 702 : ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
4027 : : {
4028 : : Snapshot snapshot_now;
4029 : : CommandId command_id;
4030 : : Size stream_bytes;
4031 : : bool txn_is_streamed;
4032 : :
4033 : : /* We can never reach here for a subtransaction. */
394 4034 [ - + ]: 702 : Assert(rbtxn_is_toptxn(txn));
4035 : :
4036 : : /*
4037 : : * We can't make any assumptions about base snapshot here, similar to what
4038 : : * ReorderBufferCommit() does. That relies on base_snapshot getting
4039 : : * transferred from subxact in ReorderBufferCommitChild(), but that was
4040 : : * not yet called as the transaction is in-progress.
4041 : : *
4042 : : * So just walk the subxacts and use the same logic here. But we only need
4043 : : * to do that once, when the transaction is streamed for the first time.
4044 : : * After that we need to reuse the snapshot from the previous run.
4045 : : *
4046 : : * Unlike DecodeCommit which adds xids of all the subtransactions in
4047 : : * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4048 : : * we do add them to subxip array instead via ReorderBufferCopySnap. This
4049 : : * allows the catalog changes made in subtransactions decoded till now to
4050 : : * be visible.
4051 : : */
1345 4052 [ + + ]: 702 : if (txn->snapshot_now == NULL)
4053 : : {
4054 : : dlist_iter subxact_i;
4055 : :
4056 : : /* make sure this transaction is streamed for the first time */
4057 [ - + ]: 71 : Assert(!rbtxn_is_streamed(txn));
4058 : :
4059 : : /* at the beginning we should have invalid command ID */
4060 [ - + ]: 71 : Assert(txn->command_id == InvalidCommandId);
4061 : :
4062 [ + - + + ]: 75 : dlist_foreach(subxact_i, &txn->subtxns)
4063 : : {
4064 : : ReorderBufferTXN *subtxn;
4065 : :
4066 : 4 : subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4067 : 4 : ReorderBufferTransferSnapToParent(txn, subtxn);
4068 : : }
4069 : :
4070 : : /*
4071 : : * If this transaction has no snapshot, it didn't make any changes to
4072 : : * the database till now, so there's nothing to decode.
4073 : : */
4074 [ - + ]: 71 : if (txn->base_snapshot == NULL)
4075 : : {
1345 akapila@postgresql.o 4076 [ # # ]:UBC 0 : Assert(txn->ninvalidations == 0);
4077 : 0 : return;
4078 : : }
4079 : :
1345 akapila@postgresql.o 4080 :CBC 71 : command_id = FirstCommandId;
4081 : 71 : snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4082 : : txn, command_id);
4083 : : }
4084 : : else
4085 : : {
4086 : : /* the transaction must have been already streamed */
4087 [ - + ]: 631 : Assert(rbtxn_is_streamed(txn));
4088 : :
4089 : : /*
4090 : : * Nah, we already have snapshot from the previous streaming run. We
4091 : : * assume new subxacts can't move the LSN backwards, and so can't beat
4092 : : * the LSN condition in the previous branch (so no need to walk
4093 : : * through subxacts again). In fact, we must not do that as we may be
4094 : : * using snapshot half-way through the subxact.
4095 : : */
4096 : 631 : command_id = txn->command_id;
4097 : :
4098 : : /*
4099 : : * We can't use txn->snapshot_now directly because after the last
4100 : : * streaming run, we might have got some new sub-transactions. So we
4101 : : * need to add them to the snapshot.
4102 : : */
4103 : 631 : snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4104 : : txn, command_id);
4105 : :
4106 : : /* Free the previously copied snapshot. */
4107 [ - + ]: 631 : Assert(txn->snapshot_now->copied);
4108 : 631 : ReorderBufferFreeSnap(rb, txn->snapshot_now);
4109 : 631 : txn->snapshot_now = NULL;
4110 : : }
4111 : :
4112 : : /*
4113 : : * Remember this information to be used later to update stats. We can't
4114 : : * update the stats here as an error while processing the changes would
4115 : : * lead to the accumulation of stats even though we haven't streamed all
4116 : : * the changes.
4117 : : */
1263 4118 : 702 : txn_is_streamed = rbtxn_is_streamed(txn);
4119 : 702 : stream_bytes = txn->total_size;
4120 : :
4121 : : /* Process and send the changes to output plugin. */
1345 4122 : 702 : ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4123 : : command_id, true);
4124 : :
1263 4125 : 702 : rb->streamCount += 1;
4126 : 702 : rb->streamBytes += stream_bytes;
4127 : :
4128 : : /* Don't consider already streamed transaction. */
4129 : 702 : rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4130 : :
4131 : : /* update the decoding stats */
1074 4132 : 702 : UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
4133 : :
1345 4134 [ - + ]: 702 : Assert(dlist_is_empty(&txn->changes));
4135 [ - + ]: 702 : Assert(txn->nentries == 0);
4136 [ - + ]: 702 : Assert(txn->nentries_mem == 0);
4137 : : }
4138 : :
4139 : : /*
4140 : : * Size of a change in memory.
4141 : : */
4142 : : static Size
1611 4143 : 1905444 : ReorderBufferChangeSize(ReorderBufferChange *change)
4144 : : {
4145 : 1905444 : Size sz = sizeof(ReorderBufferChange);
4146 : :
4147 [ + + + + : 1905444 : switch (change->action)
+ + - ]
4148 : : {
4149 : : /* fall through these, they're all similar enough */
4150 : 1832903 : case REORDER_BUFFER_CHANGE_INSERT:
4151 : : case REORDER_BUFFER_CHANGE_UPDATE:
4152 : : case REORDER_BUFFER_CHANGE_DELETE:
4153 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
4154 : : {
4155 : : HeapTuple oldtup,
4156 : : newtup;
4157 : 1832903 : Size oldlen = 0;
4158 : 1832903 : Size newlen = 0;
4159 : :
4160 : 1832903 : oldtup = change->data.tp.oldtuple;
4161 : 1832903 : newtup = change->data.tp.newtuple;
4162 : :
4163 [ + + ]: 1832903 : if (oldtup)
4164 : : {
4165 : 151137 : sz += sizeof(HeapTupleData);
76 msawada@postgresql.o 4166 :GNC 151137 : oldlen = oldtup->t_len;
1611 akapila@postgresql.o 4167 :CBC 151137 : sz += oldlen;
4168 : : }
4169 : :
4170 [ + + ]: 1832903 : if (newtup)
4171 : : {
4172 : 1608856 : sz += sizeof(HeapTupleData);
76 msawada@postgresql.o 4173 :GNC 1608856 : newlen = newtup->t_len;
1611 akapila@postgresql.o 4174 :CBC 1608856 : sz += newlen;
4175 : : }
4176 : :
4177 : 1832903 : break;
4178 : : }
4179 : 40 : case REORDER_BUFFER_CHANGE_MESSAGE:
4180 : : {
4181 : 40 : Size prefix_size = strlen(change->data.msg.prefix) + 1;
4182 : :
4183 : 40 : sz += prefix_size + change->data.msg.message_size +
4184 : : sizeof(Size) + sizeof(Size);
4185 : :
4186 : 40 : break;
4187 : : }
1277 4188 : 4722 : case REORDER_BUFFER_CHANGE_INVALIDATION:
4189 : : {
4190 : 4722 : sz += sizeof(SharedInvalidationMessage) *
4191 : 4722 : change->data.inval.ninvalidations;
4192 : 4722 : break;
4193 : : }
1611 4194 : 1099 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4195 : : {
4196 : : Snapshot snap;
4197 : :
4198 : 1099 : snap = change->data.snapshot;
4199 : :
4200 : 1099 : sz += sizeof(SnapshotData) +
4201 : 1099 : sizeof(TransactionId) * snap->xcnt +
4202 : 1099 : sizeof(TransactionId) * snap->subxcnt;
4203 : :
4204 : 1099 : break;
4205 : : }
4206 : 66 : case REORDER_BUFFER_CHANGE_TRUNCATE:
4207 : : {
4208 : 66 : sz += sizeof(Oid) * change->data.truncate.nrelids;
4209 : :
4210 : 66 : break;
4211 : : }
4212 : 66614 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4213 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4214 : : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4215 : : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4216 : : /* ReorderBufferChange contains everything important */
4217 : 66614 : break;
4218 : : }
4219 : :
4220 : 1905444 : return sz;
4221 : : }
4222 : :
4223 : :
4224 : : /*
4225 : : * Restore a number of changes spilled to disk back into memory.
4226 : : */
4227 : : static Size
3695 rhaas@postgresql.org 4228 : 99 : ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
4229 : : TXNEntryFile *file, XLogSegNo *segno)
4230 : : {
4231 : 99 : Size restored = 0;
4232 : : XLogSegNo last_segno;
4233 : : dlist_mutable_iter cleanup_iter;
1583 akapila@postgresql.o 4234 : 99 : File *fd = &file->vfd;
4235 : :
3695 rhaas@postgresql.org 4236 [ - + ]: 99 : Assert(txn->first_lsn != InvalidXLogRecPtr);
4237 [ - + ]: 99 : Assert(txn->final_lsn != InvalidXLogRecPtr);
4238 : :
4239 : : /* free current entries, so we have memory for more */
4240 [ + - + + ]: 170067 : dlist_foreach_modify(cleanup_iter, &txn->changes)
4241 : : {
4242 : 169968 : ReorderBufferChange *cleanup =
331 tgl@sss.pgh.pa.us 4243 : 169968 : dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4244 : :
3695 rhaas@postgresql.org 4245 : 169968 : dlist_delete(&cleanup->node);
1345 akapila@postgresql.o 4246 : 169968 : ReorderBufferReturnChange(rb, cleanup, true);
4247 : : }
3695 rhaas@postgresql.org 4248 : 99 : txn->nentries_mem = 0;
4249 [ - + ]: 99 : Assert(dlist_is_empty(&txn->changes));
4250 : :
2399 andres@anarazel.de 4251 : 99 : XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4252 : :
3695 rhaas@postgresql.org 4253 [ + + + + ]: 173629 : while (restored < max_changes_in_memory && *segno <= last_segno)
4254 : : {
4255 : : int readBytes;
4256 : : ReorderBufferDiskChange *ondisk;
4257 : :
541 akapila@postgresql.o 4258 [ - + ]: 173530 : CHECK_FOR_INTERRUPTS();
4259 : :
3695 rhaas@postgresql.org 4260 [ + + ]: 173530 : if (*fd == -1)
4261 : : {
4262 : : char path[MAXPGPATH];
4263 : :
4264 : : /* first time in */
4265 [ + + ]: 37 : if (*segno == 0)
2399 andres@anarazel.de 4266 : 36 : XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4267 : :
3695 rhaas@postgresql.org 4268 [ - + - - ]: 37 : Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4269 : :
4270 : : /*
4271 : : * No need to care about TLIs here, only used during a single run,
4272 : : * so each LSN only maps to a specific WAL record.
4273 : : */
2231 alvherre@alvh.no-ip. 4274 : 37 : ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
4275 : : *segno);
4276 : :
1583 akapila@postgresql.o 4277 : 37 : *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4278 : :
4279 : : /* No harm in resetting the offset even in case of failure */
4280 : 37 : file->curOffset = 0;
4281 : :
3695 rhaas@postgresql.org 4282 [ - + - - ]: 37 : if (*fd < 0 && errno == ENOENT)
4283 : : {
3695 rhaas@postgresql.org 4284 :UBC 0 : *fd = -1;
4285 : 0 : (*segno)++;
4286 : 0 : continue;
4287 : : }
3695 rhaas@postgresql.org 4288 [ - + ]:CBC 37 : else if (*fd < 0)
3695 rhaas@postgresql.org 4289 [ # # ]:UBC 0 : ereport(ERROR,
4290 : : (errcode_for_file_access(),
4291 : : errmsg("could not open file \"%s\": %m",
4292 : : path)));
4293 : : }
4294 : :
4295 : : /*
4296 : : * Read the statically sized part of a change which has information
4297 : : * about the total size. If we couldn't read a record, we're at the
4298 : : * end of this file.
4299 : : */
3628 rhaas@postgresql.org 4300 :CBC 173530 : ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
1583 akapila@postgresql.o 4301 : 173530 : readBytes = FileRead(file->vfd, rb->outbuf,
4302 : : sizeof(ReorderBufferDiskChange),
4303 : : file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4304 : :
4305 : : /* eof */
3695 rhaas@postgresql.org 4306 [ + + ]: 173530 : if (readBytes == 0)
4307 : : {
1583 akapila@postgresql.o 4308 : 37 : FileClose(*fd);
3695 rhaas@postgresql.org 4309 : 37 : *fd = -1;
4310 : 37 : (*segno)++;
4311 : 37 : continue;
4312 : : }
4313 [ - + ]: 173493 : else if (readBytes < 0)
3695 rhaas@postgresql.org 4314 [ # # ]:UBC 0 : ereport(ERROR,
4315 : : (errcode_for_file_access(),
4316 : : errmsg("could not read from reorderbuffer spill file: %m")));
3695 rhaas@postgresql.org 4317 [ - + ]:CBC 173493 : else if (readBytes != sizeof(ReorderBufferDiskChange))
3695 rhaas@postgresql.org 4318 [ # # ]:UBC 0 : ereport(ERROR,
4319 : : (errcode_for_file_access(),
4320 : : errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4321 : : readBytes,
4322 : : (uint32) sizeof(ReorderBufferDiskChange))));
4323 : :
1583 akapila@postgresql.o 4324 :CBC 173493 : file->curOffset += readBytes;
4325 : :
3695 rhaas@postgresql.org 4326 : 173493 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4327 : :
4328 : 173493 : ReorderBufferSerializeReserve(rb,
2489 tgl@sss.pgh.pa.us 4329 : 173493 : sizeof(ReorderBufferDiskChange) + ondisk->size);
3695 rhaas@postgresql.org 4330 : 173493 : ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4331 : :
1583 akapila@postgresql.o 4332 : 346986 : readBytes = FileRead(file->vfd,
4333 : 173493 : rb->outbuf + sizeof(ReorderBufferDiskChange),
4334 : 173493 : ondisk->size - sizeof(ReorderBufferDiskChange),
4335 : : file->curOffset,
4336 : : WAIT_EVENT_REORDER_BUFFER_READ);
4337 : :
3695 rhaas@postgresql.org 4338 [ - + ]: 173493 : if (readBytes < 0)
3695 rhaas@postgresql.org 4339 [ # # ]:UBC 0 : ereport(ERROR,
4340 : : (errcode_for_file_access(),
4341 : : errmsg("could not read from reorderbuffer spill file: %m")));
3695 rhaas@postgresql.org 4342 [ - + ]:CBC 173493 : else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
3695 rhaas@postgresql.org 4343 [ # # ]:UBC 0 : ereport(ERROR,
4344 : : (errcode_for_file_access(),
4345 : : errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4346 : : readBytes,
4347 : : (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4348 : :
1583 akapila@postgresql.o 4349 :CBC 173493 : file->curOffset += readBytes;
4350 : :
4351 : : /*
4352 : : * ok, read a full change from disk, now restore it into proper
4353 : : * in-memory format
4354 : : */
3695 rhaas@postgresql.org 4355 : 173493 : ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4356 : 173493 : restored++;
4357 : : }
4358 : :
4359 : 99 : return restored;
4360 : : }
4361 : :
4362 : : /*
4363 : : * Convert change from its on-disk format to in-memory format and queue it onto
4364 : : * the TXN's ->changes list.
4365 : : *
4366 : : * Note: although "data" is declared char*, at entry it points to a
4367 : : * maxalign'd buffer, making it safe in most of this function to assume
4368 : : * that the pointed-to data is suitably aligned for direct access.
4369 : : */
4370 : : static void
4371 : 173493 : ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
4372 : : char *data)
4373 : : {
4374 : : ReorderBufferDiskChange *ondisk;
4375 : : ReorderBufferChange *change;
4376 : :
4377 : 173493 : ondisk = (ReorderBufferDiskChange *) data;
4378 : :
4379 : 173493 : change = ReorderBufferGetChange(rb);
4380 : :
4381 : : /* copy static part */
4382 : 173493 : memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4383 : :
4384 : 173493 : data += sizeof(ReorderBufferDiskChange);
4385 : :
4386 : : /* restore individual stuff */
3691 tgl@sss.pgh.pa.us 4387 [ + + + + : 173493 : switch (change->action)
- + - ]
4388 : : {
4389 : : /* fall through these, they're all similar enough */
4390 : 171598 : case REORDER_BUFFER_CHANGE_INSERT:
4391 : : case REORDER_BUFFER_CHANGE_UPDATE:
4392 : : case REORDER_BUFFER_CHANGE_DELETE:
4393 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2962 andres@anarazel.de 4394 [ + + ]: 171598 : if (change->data.tp.oldtuple)
4395 : : {
2922 tgl@sss.pgh.pa.us 4396 : 5006 : uint32 tuplelen = ((HeapTuple) data)->t_len;
4397 : :
2962 andres@anarazel.de 4398 : 5006 : change->data.tp.oldtuple =
4399 : 5006 : ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4400 : :
4401 : : /* restore ->tuple */
76 msawada@postgresql.o 4402 :GNC 5006 : memcpy(change->data.tp.oldtuple, data,
4403 : : sizeof(HeapTupleData));
2962 andres@anarazel.de 4404 :CBC 5006 : data += sizeof(HeapTupleData);
4405 : :
4406 : : /* reset t_data pointer into the new tuplebuf */
76 msawada@postgresql.o 4407 :GNC 5006 : change->data.tp.oldtuple->t_data =
4408 : 5006 : (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4409 : :
4410 : : /* restore tuple data itself */
4411 : 5006 : memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
2962 andres@anarazel.de 4412 :CBC 5006 : data += tuplelen;
4413 : : }
4414 : :
4415 [ + + ]: 171598 : if (change->data.tp.newtuple)
4416 : : {
4417 : : /* here, data might not be suitably aligned! */
4418 : : uint32 tuplelen;
4419 : :
2922 tgl@sss.pgh.pa.us 4420 : 161378 : memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4421 : : sizeof(uint32));
4422 : :
2962 andres@anarazel.de 4423 : 161378 : change->data.tp.newtuple =
4424 : 161378 : ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4425 : :
4426 : : /* restore ->tuple */
76 msawada@postgresql.o 4427 :GNC 161378 : memcpy(change->data.tp.newtuple, data,
4428 : : sizeof(HeapTupleData));
2962 andres@anarazel.de 4429 :CBC 161378 : data += sizeof(HeapTupleData);
4430 : :
4431 : : /* reset t_data pointer into the new tuplebuf */
76 msawada@postgresql.o 4432 :GNC 161378 : change->data.tp.newtuple->t_data =
4433 : 161378 : (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4434 : :
4435 : : /* restore tuple data itself */
4436 : 161378 : memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
2962 andres@anarazel.de 4437 :CBC 161378 : data += tuplelen;
4438 : : }
4439 : :
3695 rhaas@postgresql.org 4440 : 171598 : break;
2930 simon@2ndQuadrant.co 4441 : 1 : case REORDER_BUFFER_CHANGE_MESSAGE:
4442 : : {
4443 : : Size prefix_size;
4444 : :
4445 : : /* read prefix */
4446 : 1 : memcpy(&prefix_size, data, sizeof(Size));
4447 : 1 : data += sizeof(Size);
4448 : 1 : change->data.msg.prefix = MemoryContextAlloc(rb->context,
4449 : : prefix_size);
4450 : 1 : memcpy(change->data.msg.prefix, data, prefix_size);
2866 rhaas@postgresql.org 4451 [ - + ]: 1 : Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2930 simon@2ndQuadrant.co 4452 : 1 : data += prefix_size;
4453 : :
4454 : : /* read the message */
4455 : 1 : memcpy(&change->data.msg.message_size, data, sizeof(Size));
4456 : 1 : data += sizeof(Size);
4457 : 1 : change->data.msg.message = MemoryContextAlloc(rb->context,
4458 : : change->data.msg.message_size);
4459 : 1 : memcpy(change->data.msg.message, data,
4460 : : change->data.msg.message_size);
4461 : 1 : data += change->data.msg.message_size;
4462 : :
1277 akapila@postgresql.o 4463 : 1 : break;
4464 : : }
4465 : 19 : case REORDER_BUFFER_CHANGE_INVALIDATION:
4466 : : {
4467 : 19 : Size inval_size = sizeof(SharedInvalidationMessage) *
331 tgl@sss.pgh.pa.us 4468 : 19 : change->data.inval.ninvalidations;
4469 : :
1277 akapila@postgresql.o 4470 : 19 : change->data.inval.invalidations =
4471 : 19 : MemoryContextAlloc(rb->context, inval_size);
4472 : :
4473 : : /* read the message */
4474 : 19 : memcpy(change->data.inval.invalidations, data, inval_size);
4475 : :
2930 simon@2ndQuadrant.co 4476 : 19 : break;
4477 : : }
3695 rhaas@postgresql.org 4478 : 2 : case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4479 : : {
4480 : : Snapshot oldsnap;
4481 : : Snapshot newsnap;
4482 : : Size size;
4483 : :
3691 tgl@sss.pgh.pa.us 4484 : 2 : oldsnap = (Snapshot) data;
4485 : :
4486 : 2 : size = sizeof(SnapshotData) +
4487 : 2 : sizeof(TransactionId) * oldsnap->xcnt +
4488 : 2 : sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4489 : :
4490 : 2 : change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4491 : :
4492 : 2 : newsnap = change->data.snapshot;
4493 : :
4494 : 2 : memcpy(newsnap, data, size);
4495 : 2 : newsnap->xip = (TransactionId *)
4496 : : (((char *) newsnap) + sizeof(SnapshotData));
4497 : 2 : newsnap->subxip = newsnap->xip + newsnap->xcnt;
4498 : 2 : newsnap->copied = true;
3695 rhaas@postgresql.org 4499 : 2 : break;
4500 : : }
4501 : : /* the base struct contains all the data, easy peasy */
2199 peter_e@gmx.net 4502 :UBC 0 : case REORDER_BUFFER_CHANGE_TRUNCATE:
4503 : : {
4504 : : Oid *relids;
4505 : :
2050 tomas.vondra@postgre 4506 : 0 : relids = ReorderBufferGetRelids(rb,
4507 : 0 : change->data.truncate.nrelids);
4508 : 0 : memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4509 : 0 : change->data.truncate.relids = relids;
4510 : :
4511 : 0 : break;
4512 : : }
3264 andres@anarazel.de 4513 :CBC 1873 : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4514 : : case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4515 : : case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4516 : : case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
3695 rhaas@postgresql.org 4517 : 1873 : break;
4518 : : }
4519 : :
4520 : 173493 : dlist_push_tail(&txn->changes, &change->node);
4521 : 173493 : txn->nentries_mem++;
4522 : :
4523 : : /*
4524 : : * Update memory accounting for the restored change. We need to do this
4525 : : * although we don't check the memory limit when restoring the changes in
4526 : : * this branch (we only do that when initially queueing the changes after
4527 : : * decoding), because we will release the changes later, and that will
4528 : : * update the accounting too (subtracting the size from the counters). And
4529 : : * we don't want to underflow there.
4530 : : */
11 msawada@postgresql.o 4531 :GNC 173493 : ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4532 : : ReorderBufferChangeSize(change));
3695 rhaas@postgresql.org 4533 :CBC 173493 : }
4534 : :
4535 : : /*
4536 : : * Remove all on-disk stored for the passed in transaction.
4537 : : */
4538 : : static void
4539 : 275 : ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
4540 : : {
4541 : : XLogSegNo first;
4542 : : XLogSegNo cur;
4543 : : XLogSegNo last;
4544 : :
4545 [ - + ]: 275 : Assert(txn->first_lsn != InvalidXLogRecPtr);
4546 [ - + ]: 275 : Assert(txn->final_lsn != InvalidXLogRecPtr);
4547 : :
2399 andres@anarazel.de 4548 : 275 : XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4549 : 275 : XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4550 : :
4551 : : /* iterate over all possible filenames, and delete them */
3695 rhaas@postgresql.org 4552 [ + + ]: 554 : for (cur = first; cur <= last; cur++)
4553 : : {
4554 : : char path[MAXPGPATH];
4555 : :
2231 alvherre@alvh.no-ip. 4556 : 279 : ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
3695 rhaas@postgresql.org 4557 [ + + - + ]: 279 : if (unlink(path) != 0 && errno != ENOENT)
3695 rhaas@postgresql.org 4558 [ # # ]:UBC 0 : ereport(ERROR,
4559 : : (errcode_for_file_access(),
4560 : : errmsg("could not remove file \"%s\": %m", path)));
4561 : : }
3695 rhaas@postgresql.org 4562 :CBC 275 : }
4563 : :
4564 : : /*
4565 : : * Remove any leftover serialized reorder buffers from a slot directory after a
4566 : : * prior crash or decoding session exit.
4567 : : */
4568 : : static void
2231 alvherre@alvh.no-ip. 4569 : 1774 : ReorderBufferCleanupSerializedTXNs(const char *slotname)
4570 : : {
4571 : : DIR *spill_dir;
4572 : : struct dirent *spill_de;
4573 : : struct stat statbuf;
4574 : : char path[MAXPGPATH * 2 + 12];
4575 : :
4576 : 1774 : sprintf(path, "pg_replslot/%s", slotname);
4577 : :
4578 : : /* we're only handling directories here, skip if it's not ours */
4579 [ + - - + ]: 1774 : if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2231 alvherre@alvh.no-ip. 4580 :UBC 0 : return;
4581 : :
2231 alvherre@alvh.no-ip. 4582 :CBC 1774 : spill_dir = AllocateDir(path);
4583 [ + + ]: 8870 : while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4584 : : {
4585 : : /* only look at names that can be ours */
4586 [ - + ]: 5322 : if (strncmp(spill_de->d_name, "xid", 3) == 0)
4587 : : {
2231 alvherre@alvh.no-ip. 4588 :UBC 0 : snprintf(path, sizeof(path),
4589 : : "pg_replslot/%s/%s", slotname,
4590 : 0 : spill_de->d_name);
4591 : :
4592 [ # # ]: 0 : if (unlink(path) != 0)
4593 [ # # ]: 0 : ereport(ERROR,
4594 : : (errcode_for_file_access(),
4595 : : errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4596 : : path, slotname)));
4597 : : }
4598 : : }
2231 alvherre@alvh.no-ip. 4599 :CBC 1774 : FreeDir(spill_dir);
4600 : : }
4601 : :
4602 : : /*
4603 : : * Given a replication slot, transaction ID and segment number, fill in the
4604 : : * corresponding spill file into 'path', which is a caller-owned buffer of size
4605 : : * at least MAXPGPATH.
4606 : : */
4607 : : static void
4608 : 3590 : ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
4609 : : XLogSegNo segno)
4610 : : {
4611 : : XLogRecPtr recptr;
4612 : :
2106 4613 : 3590 : XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4614 : :
2059 jdavis@postgresql.or 4615 : 3590 : snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
2180 tgl@sss.pgh.pa.us 4616 : 3590 : NameStr(MyReplicationSlot->data.name),
1146 peter@eisentraut.org 4617 : 3590 : xid, LSN_FORMAT_ARGS(recptr));
2231 alvherre@alvh.no-ip. 4618 : 3590 : }
4619 : :
4620 : : /*
4621 : : * Delete all data spilled to disk after we've restarted/crashed. It will be
4622 : : * recreated when the respective slots are reused.
4623 : : */
4624 : : void
3695 rhaas@postgresql.org 4625 : 823 : StartupReorderBuffer(void)
4626 : : {
4627 : : DIR *logical_dir;
4628 : : struct dirent *logical_de;
4629 : :
4630 : 823 : logical_dir = AllocateDir("pg_replslot");
4631 [ + + ]: 2528 : while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4632 : : {
4633 [ + + ]: 1705 : if (strcmp(logical_de->d_name, ".") == 0 ||
4634 [ + + ]: 882 : strcmp(logical_de->d_name, "..") == 0)
4635 : 1646 : continue;
4636 : :
4637 : : /* if it cannot be a slot, skip the directory */
4638 [ - + ]: 59 : if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
3695 rhaas@postgresql.org 4639 :UBC 0 : continue;
4640 : :
4641 : : /*
4642 : : * ok, has to be a surviving logical slot, iterate and delete
4643 : : * everything starting with xid-*
4644 : : */
2231 alvherre@alvh.no-ip. 4645 :CBC 59 : ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
4646 : : }
3695 rhaas@postgresql.org 4647 : 823 : FreeDir(logical_dir);
4648 : 823 : }
4649 : :
4650 : : /* ---------------------------------------
4651 : : * toast reassembly support
4652 : : * ---------------------------------------
4653 : : */
4654 : :
4655 : : /*
4656 : : * Initialize per tuple toast reconstruction support.
4657 : : */
4658 : : static void
4659 : 33 : ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
4660 : : {
4661 : : HASHCTL hash_ctl;
4662 : :
4663 [ - + ]: 33 : Assert(txn->toast_hash == NULL);
4664 : :
4665 : 33 : hash_ctl.keysize = sizeof(Oid);
4666 : 33 : hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4667 : 33 : hash_ctl.hcxt = rb->context;
4668 : 33 : txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4669 : : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4670 : 33 : }
4671 : :
4672 : : /*
4673 : : * Per toast-chunk handling for toast reconstruction
4674 : : *
4675 : : * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4676 : : * toasted Datum comes along.
4677 : : */
4678 : : static void
4679 : 1728 : ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
4680 : : Relation relation, ReorderBufferChange *change)
4681 : : {
4682 : : ReorderBufferToastEnt *ent;
4683 : : HeapTuple newtup;
4684 : : bool found;
4685 : : int32 chunksize;
4686 : : bool isnull;
4687 : : Pointer chunk;
4688 : 1728 : TupleDesc desc = RelationGetDescr(relation);
4689 : : Oid chunk_id;
4690 : : int32 chunk_seq;
4691 : :
4692 [ + + ]: 1728 : if (txn->toast_hash == NULL)
4693 : 33 : ReorderBufferToastInitHash(rb, txn);
4694 : :
4695 [ - + ]: 1728 : Assert(IsToastRelation(relation));
4696 : :
3691 tgl@sss.pgh.pa.us 4697 : 1728 : newtup = change->data.tp.newtuple;
76 msawada@postgresql.o 4698 :GNC 1728 : chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
3695 rhaas@postgresql.org 4699 [ - + ]:CBC 1728 : Assert(!isnull);
76 msawada@postgresql.o 4700 :GNC 1728 : chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
3695 rhaas@postgresql.org 4701 [ - + ]:CBC 1728 : Assert(!isnull);
4702 : :
4703 : : ent = (ReorderBufferToastEnt *)
433 peter@eisentraut.org 4704 : 1728 : hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4705 : :
3695 rhaas@postgresql.org 4706 [ + + ]: 1728 : if (!found)
4707 : : {
4708 [ - + ]: 47 : Assert(ent->chunk_id == chunk_id);
4709 : 47 : ent->num_chunks = 0;
4710 : 47 : ent->last_chunk_seq = 0;
4711 : 47 : ent->size = 0;
4712 : 47 : ent->reconstructed = NULL;
4713 : 47 : dlist_init(&ent->chunks);
4714 : :
4715 [ - + ]: 47 : if (chunk_seq != 0)
3695 rhaas@postgresql.org 4716 [ # # ]:UBC 0 : elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4717 : : chunk_seq, chunk_id);
4718 : : }
3695 rhaas@postgresql.org 4719 [ + - - + ]:CBC 1681 : else if (found && chunk_seq != ent->last_chunk_seq + 1)
3695 rhaas@postgresql.org 4720 [ # # ]:UBC 0 : elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4721 : : chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4722 : :
76 msawada@postgresql.o 4723 :GNC 1728 : chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
3695 rhaas@postgresql.org 4724 [ - + ]:CBC 1728 : Assert(!isnull);
4725 : :
4726 : : /* calculate size so we can allocate the right size at once later */
4727 [ + - ]: 1728 : if (!VARATT_IS_EXTENDED(chunk))
4728 : 1728 : chunksize = VARSIZE(chunk) - VARHDRSZ;
3695 rhaas@postgresql.org 4729 [ # # ]:UBC 0 : else if (VARATT_IS_SHORT(chunk))
4730 : : /* could happen due to heap_form_tuple doing its thing */
4731 : 0 : chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4732 : : else
4733 [ # # ]: 0 : elog(ERROR, "unexpected type of toast chunk");
4734 : :
3695 rhaas@postgresql.org 4735 :CBC 1728 : ent->size += chunksize;
4736 : 1728 : ent->last_chunk_seq = chunk_seq;
4737 : 1728 : ent->num_chunks++;
4738 : 1728 : dlist_push_tail(&ent->chunks, &change->node);
4739 : 1728 : }
4740 : :
4741 : : /*
4742 : : * Rejigger change->newtuple to point to in-memory toast tuples instead of
4743 : : * on-disk toast tuples that may no longer exist (think DROP TABLE or VACUUM).
4744 : : *
4745 : : * We cannot replace unchanged toast tuples though, so those will still point
4746 : : * to on-disk toast data.
4747 : : *
4748 : : * While updating the existing change with detoasted tuple data, we need to
4749 : : * update the memory accounting info, because the change size will differ.
4750 : : * Otherwise the accounting may get out of sync, triggering serialization
4751 : : * at unexpected times.
4752 : : *
4753 : : * We simply subtract size of the change before rejiggering the tuple, and
4754 : : * then add the new size. This makes it look like the change was removed
4755 : : * and then added back, except it only tweaks the accounting info.
4756 : : *
4757 : : * In particular it can't trigger serialization, which would be pointless
4758 : : * anyway as it happens during commit processing right before handing
4759 : : * the change to the output plugin.
4760 : : */
4761 : : static void
4762 : 343076 : ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
4763 : : Relation relation, ReorderBufferChange *change)
4764 : : {
4765 : : TupleDesc desc;
4766 : : int natt;
4767 : : Datum *attrs;
4768 : : bool *isnull;
4769 : : bool *free;
4770 : : HeapTuple tmphtup;
4771 : : Relation toast_rel;
4772 : : TupleDesc toast_desc;
4773 : : MemoryContext oldcontext;
4774 : : HeapTuple newtup;
4775 : : Size old_size;
4776 : :
4777 : : /* no toast tuples changed */
4778 [ + + ]: 343076 : if (txn->toast_hash == NULL)
4779 : 342831 : return;
4780 : :
4781 : : /*
4782 : : * We're going to modify the size of the change. So, to make sure the
4783 : : * accounting is correct we record the current change size and then after
4784 : : * re-computing the change we'll subtract the recorded size and then
4785 : : * re-add the new change size at the end. We don't immediately subtract
4786 : : * the old size because if there is any error before we add the new size,
4787 : : * we will release the changes and that will update the accounting info
4788 : : * (subtracting the size from the counters). And we don't want to
4789 : : * underflow there.
4790 : : */
944 akapila@postgresql.o 4791 : 245 : old_size = ReorderBufferChangeSize(change);
4792 : :
3695 rhaas@postgresql.org 4793 : 245 : oldcontext = MemoryContextSwitchTo(rb->context);
4794 : :
4795 : : /* we should only have toast tuples in an INSERT or UPDATE */
3691 tgl@sss.pgh.pa.us 4796 [ - + ]: 245 : Assert(change->data.tp.newtuple);
4797 : :
3695 rhaas@postgresql.org 4798 : 245 : desc = RelationGetDescr(relation);
4799 : :
4800 : 245 : toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
1680 tgl@sss.pgh.pa.us 4801 [ - + ]: 245 : if (!RelationIsValid(toast_rel))
935 akapila@postgresql.o 4802 [ # # ]:UBC 0 : elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4803 : : relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4804 : :
3695 rhaas@postgresql.org 4805 :CBC 245 : toast_desc = RelationGetDescr(toast_rel);
4806 : :
4807 : : /* should we allocate from stack instead? */
4808 : 245 : attrs = palloc0(sizeof(Datum) * desc->natts);
4809 : 245 : isnull = palloc0(sizeof(bool) * desc->natts);
4810 : 245 : free = palloc0(sizeof(bool) * desc->natts);
4811 : :
3691 tgl@sss.pgh.pa.us 4812 : 245 : newtup = change->data.tp.newtuple;
4813 : :
76 msawada@postgresql.o 4814 :GNC 245 : heap_deform_tuple(newtup, desc, attrs, isnull);
4815 : :
3695 rhaas@postgresql.org 4816 [ + + ]:CBC 755 : for (natt = 0; natt < desc->natts; natt++)
4817 : : {
2429 andres@anarazel.de 4818 : 510 : Form_pg_attribute attr = TupleDescAttr(desc, natt);
4819 : : ReorderBufferToastEnt *ent;
4820 : : struct varlena *varlena;
4821 : :
4822 : : /* va_rawsize is the size of the original datum -- including header */
4823 : : struct varatt_external toast_pointer;
4824 : : struct varatt_indirect redirect_pointer;
3695 rhaas@postgresql.org 4825 : 510 : struct varlena *new_datum = NULL;
4826 : : struct varlena *reconstructed;
4827 : : dlist_iter it;
4828 : 510 : Size data_done = 0;
4829 : :
4830 : : /* system columns aren't toasted */
4831 [ - + ]: 510 : if (attr->attnum < 0)
3695 rhaas@postgresql.org 4832 :UBC 0 : continue;
4833 : :
3695 rhaas@postgresql.org 4834 [ - + ]:CBC 510 : if (attr->attisdropped)
3695 rhaas@postgresql.org 4835 :UBC 0 : continue;
4836 : :
4837 : : /* not a varlena datatype */
3695 rhaas@postgresql.org 4838 [ + + ]:CBC 510 : if (attr->attlen != -1)
4839 : 241 : continue;
4840 : :
4841 : : /* no data */
4842 [ + + ]: 269 : if (isnull[natt])
4843 : 12 : continue;
4844 : :
4845 : : /* ok, we know we have a toast datum */
4846 : 257 : varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4847 : :
4848 : : /* no need to do anything if the tuple isn't external */
4849 [ + + ]: 257 : if (!VARATT_IS_EXTERNAL(varlena))
4850 : 202 : continue;
4851 : :
4852 [ - + + - : 55 : VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
+ - - + -
+ ]
4853 : :
4854 : : /*
4855 : : * Check whether the toast tuple changed, replace if so.
4856 : : */
4857 : : ent = (ReorderBufferToastEnt *)
4858 : 55 : hash_search(txn->toast_hash,
4859 : : &toast_pointer.va_valueid,
4860 : : HASH_FIND,
4861 : : NULL);
4862 [ + + ]: 55 : if (ent == NULL)
4863 : 8 : continue;
4864 : :
4865 : : new_datum =
4866 : 47 : (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4867 : :
4868 : 47 : free[natt] = true;
4869 : :
4870 : 47 : reconstructed = palloc0(toast_pointer.va_rawsize);
4871 : :
4872 : 47 : ent->reconstructed = reconstructed;
4873 : :
4874 : : /* stitch toast tuple back together from its parts */
4875 [ + - + + ]: 1775 : dlist_foreach(it, &ent->chunks)
4876 : : {
4877 : : bool cisnull;
4878 : : ReorderBufferChange *cchange;
4879 : : HeapTuple ctup;
4880 : : Pointer chunk;
4881 : :
3691 tgl@sss.pgh.pa.us 4882 : 1728 : cchange = dlist_container(ReorderBufferChange, node, it.cur);
4883 : 1728 : ctup = cchange->data.tp.newtuple;
76 msawada@postgresql.o 4884 :GNC 1728 : chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4885 : :
227 michael@paquier.xyz 4886 [ - + ]: 1728 : Assert(!cisnull);
3695 rhaas@postgresql.org 4887 [ - + ]:CBC 1728 : Assert(!VARATT_IS_EXTERNAL(chunk));
4888 [ - + ]: 1728 : Assert(!VARATT_IS_SHORT(chunk));
4889 : :
4890 : 1728 : memcpy(VARDATA(reconstructed) + data_done,
4891 : 1728 : VARDATA(chunk),
4892 : 1728 : VARSIZE(chunk) - VARHDRSZ);
4893 : 1728 : data_done += VARSIZE(chunk) - VARHDRSZ;
4894 : : }
1122 4895 [ - + ]: 47 : Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4896 : :
4897 : : /* make sure its marked as compressed or not */
3695 4898 [ + + ]: 47 : if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4899 : 5 : SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4900 : : else
4901 : 42 : SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4902 : :
4903 : 47 : memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4904 : 47 : redirect_pointer.pointer = reconstructed;
4905 : :
4906 : 47 : SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
4907 : 47 : memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4908 : : sizeof(redirect_pointer));
4909 : :
4910 : 47 : attrs[natt] = PointerGetDatum(new_datum);
4911 : : }
4912 : :
4913 : : /*
4914 : : * Build tuple in separate memory & copy tuple back into the tuplebuf
4915 : : * passed to the output plugin. We can't directly heap_fill_tuple() into
4916 : : * the tuplebuf because attrs[] will point back into the current content.
4917 : : */
3691 tgl@sss.pgh.pa.us 4918 : 245 : tmphtup = heap_form_tuple(desc, attrs, isnull);
76 msawada@postgresql.o 4919 [ - + ]:GNC 245 : Assert(newtup->t_len <= MaxHeapTupleSize);
4920 [ - + ]: 245 : Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4921 : :
4922 : 245 : memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4923 : 245 : newtup->t_len = tmphtup->t_len;
4924 : :
4925 : : /*
4926 : : * free resources we won't further need, more persistent stuff will be
4927 : : * free'd in ReorderBufferToastReset().
4928 : : */
3695 rhaas@postgresql.org 4929 :CBC 245 : RelationClose(toast_rel);
3691 tgl@sss.pgh.pa.us 4930 : 245 : pfree(tmphtup);
3695 rhaas@postgresql.org 4931 [ + + ]: 755 : for (natt = 0; natt < desc->natts; natt++)
4932 : : {
4933 [ + + ]: 510 : if (free[natt])
4934 : 47 : pfree(DatumGetPointer(attrs[natt]));
4935 : : }
4936 : 245 : pfree(attrs);
4937 : 245 : pfree(free);
4938 : 245 : pfree(isnull);
4939 : :
4940 : 245 : MemoryContextSwitchTo(oldcontext);
4941 : :
4942 : : /* subtract the old change size */
11 msawada@postgresql.o 4943 :GNC 245 : ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
4944 : : /* now add the change back, with the correct size */
4945 : 245 : ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4946 : : ReorderBufferChangeSize(change));
4947 : : }
4948 : :
4949 : : /*
4950 : : * Free all resources allocated for toast reconstruction.
4951 : : */
4952 : : static void
3695 rhaas@postgresql.org 4953 :CBC 346380 : ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
4954 : : {
4955 : : HASH_SEQ_STATUS hstat;
4956 : : ReorderBufferToastEnt *ent;
4957 : :
4958 [ + + ]: 346380 : if (txn->toast_hash == NULL)
4959 : 346347 : return;
4960 : :
4961 : : /* sequentially walk over the hash and free everything */
4962 : 33 : hash_seq_init(&hstat, txn->toast_hash);
4963 [ + + ]: 80 : while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4964 : : {
4965 : : dlist_mutable_iter it;
4966 : :
4967 [ + - ]: 47 : if (ent->reconstructed != NULL)
4968 : 47 : pfree(ent->reconstructed);
4969 : :
4970 [ + - + + ]: 1775 : dlist_foreach_modify(it, &ent->chunks)
4971 : : {
4972 : 1728 : ReorderBufferChange *change =
331 tgl@sss.pgh.pa.us 4973 : 1728 : dlist_container(ReorderBufferChange, node, it.cur);
4974 : :
3695 rhaas@postgresql.org 4975 : 1728 : dlist_delete(&change->node);
1345 akapila@postgresql.o 4976 : 1728 : ReorderBufferReturnChange(rb, change, true);
4977 : : }
4978 : : }
4979 : :
3695 rhaas@postgresql.org 4980 : 33 : hash_destroy(txn->toast_hash);
4981 : 33 : txn->toast_hash = NULL;
4982 : : }
4983 : :
4984 : :
4985 : : /* ---------------------------------------
4986 : : * Visibility support for logical decoding
4987 : : *
4988 : : *
4989 : : * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4990 : : * always rely on stored cmin/cmax values because of two scenarios:
4991 : : *
4992 : : * * A tuple got changed multiple times during a single transaction and thus
4993 : : * has got a combo CID. Combo CIDs are only valid for the duration of a
4994 : : * single transaction.
4995 : : * * A tuple with a cmin but no cmax (and thus no combo CID) got
4996 : : * deleted/updated in another transaction than the one which created it
4997 : : * which we are looking at right now. As only one of cmin, cmax or combo CID
4998 : : * is actually stored in the heap we don't have access to the value we
4999 : : * need anymore.
5000 : : *
5001 : : * To resolve those problems we have a per-transaction hash of (cmin,
5002 : : * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
5003 : : * (cmin, cmax) values. That also takes care of combo CIDs by simply
5004 : : * not caring about them at all. As we have the real cmin/cmax values
5005 : : * combo CIDs aren't interesting.
5006 : : *
5007 : : * As we only care about catalog tuples here the overhead of this
5008 : : * hashtable should be acceptable.
5009 : : *
5010 : : * Heap rewrites complicate this a bit, check rewriteheap.c for
5011 : : * details.
5012 : : * -------------------------------------------------------------------------
5013 : : */
5014 : :
5015 : : /* struct for sorting mapping files by LSN efficiently */
5016 : : typedef struct RewriteMappingFile
5017 : : {
5018 : : XLogRecPtr lsn;
5019 : : char fname[MAXPGPATH];
5020 : : } RewriteMappingFile;
5021 : :
5022 : : #ifdef NOT_USED
5023 : : static void
5024 : : DisplayMapping(HTAB *tuplecid_data)
5025 : : {
5026 : : HASH_SEQ_STATUS hstat;
5027 : : ReorderBufferTupleCidEnt *ent;
5028 : :
5029 : : hash_seq_init(&hstat, tuplecid_data);
5030 : : while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
5031 : : {
5032 : : elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5033 : : ent->key.rlocator.dbOid,
5034 : : ent->key.rlocator.spcOid,
5035 : : ent->key.rlocator.relNumber,
5036 : : ItemPointerGetBlockNumber(&ent->key.tid),
5037 : : ItemPointerGetOffsetNumber(&ent->key.tid),
5038 : : ent->cmin,
5039 : : ent->cmax
5040 : : );
5041 : : }
5042 : : }
5043 : : #endif
5044 : :
5045 : : /*
5046 : : * Apply a single mapping file to tuplecid_data.
5047 : : *
5048 : : * The mapping file has to have been verified to be a) committed b) for our
5049 : : * transaction c) applied in LSN order.
5050 : : */
5051 : : static void
5052 : 22 : ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
5053 : : {
5054 : : char path[MAXPGPATH];
5055 : : int fd;
5056 : : int readBytes;
5057 : : LogicalRewriteMappingData map;
5058 : :
3574 andres@anarazel.de 5059 : 22 : sprintf(path, "pg_logical/mappings/%s", fname);
2395 peter_e@gmx.net 5060 : 22 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3695 rhaas@postgresql.org 5061 [ + - ]: 22 : if (fd < 0)
3695 rhaas@postgresql.org 5062 [ # # ]:UBC 0 : ereport(ERROR,
5063 : : (errcode_for_file_access(),
5064 : : errmsg("could not open file \"%s\": %m", path)));
5065 : :
5066 : : while (true)
3695 rhaas@postgresql.org 5067 :CBC 119 : {
5068 : : ReorderBufferTupleCidKey key;
5069 : : ReorderBufferTupleCidEnt *ent;
5070 : : ReorderBufferTupleCidEnt *new_ent;
5071 : : bool found;
5072 : :
5073 : : /* be careful about padding */
5074 : 141 : memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5075 : :
5076 : : /* read all mappings till the end of the file */
2584 5077 : 141 : pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3695 5078 : 141 : readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
2584 5079 : 141 : pgstat_report_wait_end();
5080 : :
3695 5081 [ - + ]: 141 : if (readBytes < 0)
3695 rhaas@postgresql.org 5082 [ # # ]:UBC 0 : ereport(ERROR,
5083 : : (errcode_for_file_access(),
5084 : : errmsg("could not read file \"%s\": %m",
5085 : : path)));
3631 bruce@momjian.us 5086 [ + + ]:CBC 141 : else if (readBytes == 0) /* EOF */
3695 rhaas@postgresql.org 5087 : 22 : break;
5088 [ - + ]: 119 : else if (readBytes != sizeof(LogicalRewriteMappingData))
3695 rhaas@postgresql.org 5089 [ # # ]:UBC 0 : ereport(ERROR,
5090 : : (errcode_for_file_access(),
5091 : : errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5092 : : path, readBytes,
5093 : : (int32) sizeof(LogicalRewriteMappingData))));
5094 : :
648 rhaas@postgresql.org 5095 :CBC 119 : key.rlocator = map.old_locator;
3695 5096 : 119 : ItemPointerCopy(&map.old_tid,
5097 : : &key.tid);
5098 : :
5099 : :
5100 : : ent = (ReorderBufferTupleCidEnt *)
433 peter@eisentraut.org 5101 : 119 : hash_search(tuplecid_data, &key, HASH_FIND, NULL);
5102 : :
5103 : : /* no existing mapping, no need to update */
3695 rhaas@postgresql.org 5104 [ - + ]: 119 : if (!ent)
3695 rhaas@postgresql.org 5105 :UBC 0 : continue;
5106 : :
648 rhaas@postgresql.org 5107 :CBC 119 : key.rlocator = map.new_locator;
3695 5108 : 119 : ItemPointerCopy(&map.new_tid,
5109 : : &key.tid);
5110 : :
5111 : : new_ent = (ReorderBufferTupleCidEnt *)
433 peter@eisentraut.org 5112 : 119 : hash_search(tuplecid_data, &key, HASH_ENTER, &found);
5113 : :
3695 rhaas@postgresql.org 5114 [ + + ]: 119 : if (found)
5115 : : {
5116 : : /*
5117 : : * Make sure the existing mapping makes sense. We sometime update
5118 : : * old records that did not yet have a cmax (e.g. pg_class' own
5119 : : * entry while rewriting it) during rewrites, so allow that.
5120 : : */
5121 [ + - - + ]: 6 : Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5122 [ - + - - ]: 6 : Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5123 : : }
5124 : : else
5125 : : {
5126 : : /* update mapping */
5127 : 113 : new_ent->cmin = ent->cmin;
5128 : 113 : new_ent->cmax = ent->cmax;
5129 : 113 : new_ent->combocid = ent->combocid;
5130 : : }
5131 : : }
5132 : :
1744 peter@eisentraut.org 5133 [ - + ]: 22 : if (CloseTransientFile(fd) != 0)
1863 michael@paquier.xyz 5134 [ # # ]:UBC 0 : ereport(ERROR,
5135 : : (errcode_for_file_access(),
5136 : : errmsg("could not close file \"%s\": %m", path)));
3695 rhaas@postgresql.org 5137 :CBC 22 : }
5138 : :
5139 : :
5140 : : /*
5141 : : * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
5142 : : */
5143 : : static bool
5144 : 290 : TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
5145 : : {
5146 : 290 : return bsearch(&xid, xip, num,
5147 : 290 : sizeof(TransactionId), xidComparator) != NULL;
5148 : : }
5149 : :
5150 : : /*
5151 : : * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
5152 : : */
5153 : : static int
1734 tgl@sss.pgh.pa.us 5154 : 17 : file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
5155 : : {
5156 : 17 : RewriteMappingFile *a = (RewriteMappingFile *) lfirst(a_p);
5157 : 17 : RewriteMappingFile *b = (RewriteMappingFile *) lfirst(b_p);
5158 : :
58 nathan@postgresql.or 5159 :GNC 17 : return pg_cmp_u64(a->lsn, b->lsn);
5160 : : }
5161 : :
5162 : : /*
5163 : : * Apply any existing logical remapping files if there are any targeted at our
5164 : : * transaction for relid.
5165 : : */
5166 : : static void
3695 rhaas@postgresql.org 5167 :CBC 5 : UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
5168 : : {
5169 : : DIR *mapping_dir;
5170 : : struct dirent *mapping_de;
5171 : 5 : List *files = NIL;
5172 : : ListCell *file;
5173 [ + - ]: 5 : Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5174 : :
3574 andres@anarazel.de 5175 : 5 : mapping_dir = AllocateDir("pg_logical/mappings");
5176 [ + + ]: 460 : while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5177 : : {
5178 : : Oid f_dboid;
5179 : : Oid f_relid;
5180 : : TransactionId f_mapped_xid;
5181 : : TransactionId f_create_xid;
5182 : : XLogRecPtr f_lsn;
5183 : : uint32 f_hi,
5184 : : f_lo;
5185 : : RewriteMappingFile *f;
5186 : :
3695 rhaas@postgresql.org 5187 [ + + ]: 455 : if (strcmp(mapping_de->d_name, ".") == 0 ||
5188 [ + + ]: 450 : strcmp(mapping_de->d_name, "..") == 0)
5189 : 433 : continue;
5190 : :
5191 : : /* Ignore files that aren't ours */
5192 [ - + ]: 445 : if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3695 rhaas@postgresql.org 5193 :UBC 0 : continue;
5194 : :
3695 rhaas@postgresql.org 5195 [ - + ]:CBC 445 : if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5196 : : &f_dboid, &f_relid, &f_hi, &f_lo,
5197 : : &f_mapped_xid, &f_create_xid) != 6)
3637 tgl@sss.pgh.pa.us 5198 [ # # ]:UBC 0 : elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5199 : :
3695 rhaas@postgresql.org 5200 :CBC 445 : f_lsn = ((uint64) f_hi) << 32 | f_lo;
5201 : :
5202 : : /* mapping for another database */
5203 [ - + ]: 445 : if (f_dboid != dboid)
3695 rhaas@postgresql.org 5204 :UBC 0 : continue;
5205 : :
5206 : : /* mapping for another relation */
3695 rhaas@postgresql.org 5207 [ + + ]:CBC 445 : if (f_relid != relid)
5208 : 45 : continue;
5209 : :
5210 : : /* did the creating transaction abort? */
5211 [ + + ]: 400 : if (!TransactionIdDidCommit(f_create_xid))
5212 : 110 : continue;
5213 : :
5214 : : /* not for our transaction */
5215 [ + + ]: 290 : if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5216 : 268 : continue;
5217 : :
5218 : : /* ok, relevant, queue for apply */
5219 : 22 : f = palloc(sizeof(RewriteMappingFile));
5220 : 22 : f->lsn = f_lsn;
5221 : 22 : strcpy(f->fname, mapping_de->d_name);
5222 : 22 : files = lappend(files, f);
5223 : : }
5224 : 5 : FreeDir(mapping_dir);
5225 : :
5226 : : /* sort files so we apply them in LSN order */
1734 tgl@sss.pgh.pa.us 5227 : 5 : list_sort(files, file_sort_by_lsn);
5228 : :
5229 [ + - + + : 27 : foreach(file, files)
+ + ]
5230 : : {
5231 : 22 : RewriteMappingFile *f = (RewriteMappingFile *) lfirst(file);
5232 : :
3637 5233 [ - + ]: 22 : elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5234 : : snapshot->subxip[0]);
3695 rhaas@postgresql.org 5235 : 22 : ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
5236 : 22 : pfree(f);
5237 : : }
5238 : 5 : }
5239 : :
5240 : : /*
5241 : : * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5242 : : * combo CIDs.
5243 : : */
5244 : : bool
5245 : 790 : ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
5246 : : Snapshot snapshot,
5247 : : HeapTuple htup, Buffer buffer,
5248 : : CommandId *cmin, CommandId *cmax)
5249 : : {
5250 : : ReorderBufferTupleCidKey key;
5251 : : ReorderBufferTupleCidEnt *ent;
5252 : : ForkNumber forkno;
5253 : : BlockNumber blockno;
3631 bruce@momjian.us 5254 : 790 : bool updated_mapping = false;
5255 : :
5256 : : /*
5257 : : * Return unresolved if tuplecid_data is not valid. That's because when
5258 : : * streaming in-progress transactions we may run into tuples with the CID
5259 : : * before actually decoding them. Think e.g. about INSERT followed by
5260 : : * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5261 : : * INSERT. So in such cases, we assume the CID is from the future
5262 : : * command.
5263 : : */
1345 akapila@postgresql.o 5264 [ + + ]: 790 : if (tuplecid_data == NULL)
5265 : 9 : return false;
5266 : :
5267 : : /* be careful about padding */
3695 rhaas@postgresql.org 5268 : 781 : memset(&key, 0, sizeof(key));
5269 : :
5270 [ - + ]: 781 : Assert(!BufferIsLocal(buffer));
5271 : :
5272 : : /*
5273 : : * get relfilelocator from the buffer, no convenient way to access it
5274 : : * other than that.
5275 : : */
648 5276 : 781 : BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5277 : :
5278 : : /* tuples can only be in the main fork */
3695 5279 [ - + ]: 781 : Assert(forkno == MAIN_FORKNUM);
5280 [ - + ]: 781 : Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5281 : :
5282 : 781 : ItemPointerCopy(&htup->t_self,
5283 : : &key.tid);
5284 : :
5285 : 786 : restart:
5286 : : ent = (ReorderBufferTupleCidEnt *)
433 peter@eisentraut.org 5287 : 786 : hash_search(tuplecid_data, &key, HASH_FIND, NULL);
5288 : :
5289 : : /*
5290 : : * failed to find a mapping, check whether the table was rewritten and
5291 : : * apply mapping if so, but only do that once - there can be no new
5292 : : * mappings while we are in here since we have to hold a lock on the
5293 : : * relation.
5294 : : */
3695 rhaas@postgresql.org 5295 [ + + + - ]: 786 : if (ent == NULL && !updated_mapping)
5296 : : {
5297 : 5 : UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
5298 : : /* now check but don't update for a mapping again */
5299 : 5 : updated_mapping = true;
5300 : 5 : goto restart;
5301 : : }
5302 [ - + ]: 781 : else if (ent == NULL)
3695 rhaas@postgresql.org 5303 :UBC 0 : return false;
5304 : :
3695 rhaas@postgresql.org 5305 [ + - ]:CBC 781 : if (cmin)
5306 : 781 : *cmin = ent->cmin;
5307 [ + - ]: 781 : if (cmax)
5308 : 781 : *cmax = ent->cmax;
5309 : 781 : return true;
5310 : : }
|