Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slotfuncs.c
4 : * Support functions for replication slots
5 : *
6 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/slotfuncs.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/htup_details.h"
16 : #include "access/xlog_internal.h"
17 : #include "access/xlogrecovery.h"
18 : #include "access/xlogutils.h"
19 : #include "funcapi.h"
20 : #include "miscadmin.h"
21 : #include "replication/decode.h"
22 : #include "replication/logical.h"
23 : #include "replication/slot.h"
24 : #include "utils/builtins.h"
25 : #include "utils/inval.h"
26 : #include "utils/pg_lsn.h"
27 : #include "utils/resowner.h"
28 :
29 : /*
30 : * Helper function for creating a new physical replication slot with
31 : * given arguments. Note that this function doesn't release the created
32 : * slot.
33 : *
34 : * If restart_lsn is a valid value, we use it without WAL reservation
35 : * routine. So the caller must guarantee that WAL is available.
36 : */
37 : static void
1465 alvherre 38 CBC 29 : create_physical_replication_slot(char *name, bool immediately_reserve,
39 : bool temporary, XLogRecPtr restart_lsn)
40 : {
41 29 : Assert(!MyReplicationSlot);
42 :
43 : /* acquire replication slot, this will check for conflicting names */
44 29 : ReplicationSlotCreate(name, false,
45 : temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
46 :
47 29 : if (immediately_reserve)
48 : {
49 : /* Reserve WAL as the user asked for it */
50 15 : if (XLogRecPtrIsInvalid(restart_lsn))
51 11 : ReplicationSlotReserveWal();
52 : else
53 4 : MyReplicationSlot->data.restart_lsn = restart_lsn;
54 :
55 : /* Write this slot to disk */
56 15 : ReplicationSlotMarkDirty();
57 15 : ReplicationSlotSave();
58 : }
59 29 : }
60 :
61 : /*
62 : * SQL function for creating a new physical (streaming replication)
63 : * replication slot.
64 : */
65 : Datum
3355 rhaas 66 25 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
67 : {
68 25 : Name name = PG_GETARG_NAME(0);
2495 69 25 : bool immediately_reserve = PG_GETARG_BOOL(1);
2313 peter_e 70 25 : bool temporary = PG_GETARG_BOOL(2);
71 : Datum values[2];
72 : bool nulls[2];
73 : TupleDesc tupdesc;
74 : HeapTuple tuple;
75 : Datum result;
76 :
3355 rhaas 77 25 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3355 rhaas 78 UBC 0 : elog(ERROR, "return type must be a row type");
79 :
572 michael 80 CBC 25 : CheckSlotPermissions();
81 :
3223 andres 82 25 : CheckSlotRequirements();
83 :
1465 alvherre 84 25 : create_physical_replication_slot(NameStr(*name),
85 : immediately_reserve,
86 : temporary,
87 : InvalidXLogRecPtr);
88 :
3324 rhaas 89 25 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
3355 90 25 : nulls[0] = false;
91 :
2798 andres 92 25 : if (immediately_reserve)
93 : {
94 11 : values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
95 11 : nulls[1] = false;
96 : }
97 : else
98 14 : nulls[1] = true;
99 :
3355 rhaas 100 25 : tuple = heap_form_tuple(tupdesc, values, nulls);
101 25 : result = HeapTupleGetDatum(tuple);
102 :
103 25 : ReplicationSlotRelease();
104 :
105 25 : PG_RETURN_DATUM(result);
106 : }
107 :
108 :
109 : /*
110 : * Helper function for creating a new logical replication slot with
111 : * given arguments. Note that this function doesn't release the created
112 : * slot.
113 : *
114 : * When find_startpoint is false, the slot's confirmed_flush is not set; it's
115 : * caller's responsibility to ensure it's set to something sensible.
116 : */
117 : static void
1465 alvherre 118 101 : create_logical_replication_slot(char *name, char *plugin,
119 : bool temporary, bool two_phase,
120 : XLogRecPtr restart_lsn,
121 : bool find_startpoint)
122 : {
3324 rhaas 123 101 : LogicalDecodingContext *ctx = NULL;
124 :
3223 andres 125 101 : Assert(!MyReplicationSlot);
126 :
127 : /*
128 : * Acquire a logical decoding slot, this will check for conflicting names.
129 : * Initially create persistent slot as ephemeral - that allows us to
130 : * nicely handle errors during initialization because it'll get dropped if
131 : * this transaction fails. We'll make it persistent at the end. Temporary
132 : * slots can be created as temporary from beginning as they get dropped on
133 : * error as well.
134 : */
1465 alvherre 135 101 : ReplicationSlotCreate(name, true,
136 : temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
137 :
138 : /*
139 : * Create logical decoding context to find start point or, if we don't
140 : * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
141 : *
142 : * Note: when !find_startpoint this is still important, because it's at
143 : * this point that the output plugin is validated.
144 : */
145 97 : ctx = CreateInitDecodingContext(plugin, NIL,
146 : false, /* just catalogs is OK */
147 : restart_lsn,
699 tmunro 148 97 : XL_ROUTINE(.page_read = read_local_xlog_page,
149 : .segment_open = wal_segment_open,
150 : .segment_close = wal_segment_close),
151 : NULL, NULL, NULL);
152 :
153 : /*
154 : * If caller needs us to determine the decoding start point, do so now.
155 : * This might take a while.
156 : */
1118 alvherre 157 94 : if (find_startpoint)
158 88 : DecodingContextFindStartpoint(ctx);
159 :
160 : /* don't need the decoding context anymore */
3324 rhaas 161 92 : FreeDecodingContext(ctx);
1465 alvherre 162 92 : }
163 :
164 : /*
165 : * SQL function for creating a new logical replication slot.
166 : */
167 : Datum
168 95 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
169 : {
170 95 : Name name = PG_GETARG_NAME(0);
171 95 : Name plugin = PG_GETARG_NAME(1);
172 95 : bool temporary = PG_GETARG_BOOL(2);
767 akapila 173 95 : bool two_phase = PG_GETARG_BOOL(3);
174 : Datum result;
175 : TupleDesc tupdesc;
176 : HeapTuple tuple;
177 : Datum values[2];
178 : bool nulls[2];
179 :
1465 alvherre 180 95 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1465 alvherre 181 UBC 0 : elog(ERROR, "return type must be a row type");
182 :
572 michael 183 CBC 95 : CheckSlotPermissions();
184 :
1465 alvherre 185 94 : CheckLogicalDecodingRequirements();
186 :
187 94 : create_logical_replication_slot(NameStr(*name),
188 94 : NameStr(*plugin),
189 : temporary,
190 : two_phase,
191 : InvalidXLogRecPtr,
192 : true);
193 :
194 86 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
195 86 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
196 :
3324 rhaas 197 86 : memset(nulls, 0, sizeof(nulls));
198 :
199 86 : tuple = heap_form_tuple(tupdesc, values, nulls);
200 86 : result = HeapTupleGetDatum(tuple);
201 :
202 : /* ok, slot is now fully created, mark it as persistent if needed */
2313 peter_e 203 86 : if (!temporary)
204 81 : ReplicationSlotPersist();
3324 rhaas 205 86 : ReplicationSlotRelease();
206 :
207 86 : PG_RETURN_DATUM(result);
208 : }
209 :
210 :
211 : /*
212 : * SQL function for dropping a replication slot.
213 : */
214 : Datum
3355 215 109 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
216 : {
217 109 : Name name = PG_GETARG_NAME(0);
218 :
572 michael 219 109 : CheckSlotPermissions();
220 :
3355 rhaas 221 107 : CheckSlotRequirements();
222 :
2046 alvherre 223 107 : ReplicationSlotDrop(NameStr(*name), true);
224 :
3355 rhaas 225 102 : PG_RETURN_VOID();
226 : }
227 :
228 : /*
229 : * pg_get_replication_slots - SQL SRF showing all replication slots
230 : * that currently exist on the database cluster.
231 : */
232 : Datum
3355 rhaas 233 GIC 171 : pg_get_replication_slots(PG_FUNCTION_ARGS)
3355 rhaas 234 ECB : {
235 : #define PG_GET_REPLICATION_SLOTS_COLS 15
3355 rhaas 236 GIC 171 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1006 alvherre 237 ECB : XLogRecPtr currlsn;
238 : int slotno;
239 :
240 : /*
241 : * We don't require any special permission to see this function's data
242 : * because nothing should be sensitive. The most critical being the slot
243 : * name, which shouldn't contain anything particularly sensitive.
244 : */
245 :
173 michael 246 GIC 171 : InitMaterializedSRF(fcinfo, 0);
3355 rhaas 247 ECB :
1006 alvherre 248 GIC 171 : currlsn = GetXLogWriteRecPtr();
1006 alvherre 249 ECB :
2084 alvherre 250 GIC 171 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3355 rhaas 251 CBC 1113 : for (slotno = 0; slotno < max_replication_slots; slotno++)
3355 rhaas 252 ECB : {
3355 rhaas 253 GIC 942 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
1040 tgl 254 ECB : ReplicationSlot slot_contents;
255 : Datum values[PG_GET_REPLICATION_SLOTS_COLS];
256 : bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
257 : WALAvailability walstate;
258 : int i;
259 :
3355 rhaas 260 GIC 942 : if (!slot->in_use)
3355 rhaas 261 CBC 623 : continue;
2084 alvherre 262 ECB :
263 : /* Copy slot contents while holding spinlock, then examine at leisure */
2084 alvherre 264 GIC 319 : SpinLockAcquire(&slot->mutex);
1040 tgl 265 CBC 319 : slot_contents = *slot;
3355 rhaas 266 319 : SpinLockRelease(&slot->mutex);
3355 rhaas 267 ECB :
1040 tgl 268 GIC 319 : memset(values, 0, sizeof(values));
3355 rhaas 269 CBC 319 : memset(nulls, 0, sizeof(nulls));
3355 rhaas 270 ECB :
3355 rhaas 271 GIC 319 : i = 0;
1040 tgl 272 CBC 319 : values[i++] = NameGetDatum(&slot_contents.data.name);
3324 rhaas 273 ECB :
1040 tgl 274 GIC 319 : if (slot_contents.data.database == InvalidOid)
3324 rhaas 275 CBC 124 : nulls[i++] = true;
3324 rhaas 276 ECB : else
1040 tgl 277 GIC 195 : values[i++] = NameGetDatum(&slot_contents.data.plugin);
3324 rhaas 278 ECB :
1040 tgl 279 GIC 319 : if (slot_contents.data.database == InvalidOid)
3355 rhaas 280 CBC 124 : values[i++] = CStringGetTextDatum("physical");
3355 rhaas 281 ECB : else
3355 rhaas 282 GIC 195 : values[i++] = CStringGetTextDatum("logical");
3324 rhaas 283 ECB :
1040 tgl 284 GIC 319 : if (slot_contents.data.database == InvalidOid)
3324 rhaas 285 CBC 124 : nulls[i++] = true;
3324 rhaas 286 ECB : else
1040 tgl 287 GIC 195 : values[i++] = ObjectIdGetDatum(slot_contents.data.database);
3324 rhaas 288 ECB :
1040 tgl 289 GIC 319 : values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
1040 tgl 290 CBC 319 : values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
2910 andres 291 ECB :
1040 tgl 292 GIC 319 : if (slot_contents.active_pid != 0)
1040 tgl 293 CBC 132 : values[i++] = Int32GetDatum(slot_contents.active_pid);
2910 andres 294 ECB : else
2910 andres 295 GIC 187 : nulls[i++] = true;
3324 rhaas 296 ECB :
1040 tgl 297 GIC 319 : if (slot_contents.data.xmin != InvalidTransactionId)
1040 tgl 298 CBC 16 : values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
3355 rhaas 299 ECB : else
3355 rhaas 300 GIC 303 : nulls[i++] = true;
3324 rhaas 301 ECB :
1040 tgl 302 GIC 319 : if (slot_contents.data.catalog_xmin != InvalidTransactionId)
1040 tgl 303 CBC 199 : values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
3324 rhaas 304 ECB : else
3324 rhaas 305 GIC 120 : nulls[i++] = true;
3324 rhaas 306 ECB :
1040 tgl 307 GIC 319 : if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
1040 tgl 308 CBC 306 : values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
3355 rhaas 309 ECB : else
3355 rhaas 310 GIC 13 : nulls[i++] = true;
3355 rhaas 311 ECB :
1040 tgl 312 GIC 319 : if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
1040 tgl 313 CBC 173 : values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
2799 andres 314 ECB : else
2799 andres 315 GIC 146 : nulls[i++] = true;
2799 andres 316 ECB :
317 : /*
318 : * If the slot has not been invalidated, test availability from
319 : * restart_lsn.
320 : */
2 andres 321 GNC 319 : if (slot_contents.data.invalidated != RS_INVAL_NONE)
1017 alvherre 322 GIC 14 : walstate = WALAVAIL_REMOVED;
1017 alvherre 323 ECB : else
1017 alvherre 324 GIC 305 : walstate = GetWALAvailability(slot_contents.data.restart_lsn);
1097 alvherre 325 ECB :
1097 alvherre 326 GIC 319 : switch (walstate)
1097 alvherre 327 ECB : {
1097 alvherre 328 CBC 11 : case WALAVAIL_INVALID_LSN:
329 11 : nulls[i++] = true;
1097 alvherre 330 GIC 11 : break;
1097 alvherre 331 ECB :
1097 alvherre 332 CBC 291 : case WALAVAIL_RESERVED:
333 291 : values[i++] = CStringGetTextDatum("reserved");
1097 alvherre 334 GIC 291 : break;
1097 alvherre 335 ECB :
1019 alvherre 336 CBC 2 : case WALAVAIL_EXTENDED:
337 2 : values[i++] = CStringGetTextDatum("extended");
1019 alvherre 338 GIC 2 : break;
1019 alvherre 339 ECB :
1019 alvherre 340 CBC 1 : case WALAVAIL_UNRESERVED:
341 1 : values[i++] = CStringGetTextDatum("unreserved");
1019 alvherre 342 GIC 1 : break;
1019 alvherre 343 ECB :
1097 alvherre 344 GIC 14 : case WALAVAIL_REMOVED:
345 :
346 : /*
347 : * If we read the restart_lsn long enough ago, maybe that file
348 : * has been removed by now. However, the walsender could have
349 : * moved forward enough that it jumped to another file after
350 : * we looked. If checkpointer signalled the process to
351 : * termination, then it's definitely lost; but if a process is
352 : * still alive, then "unreserved" seems more appropriate.
353 : *
354 : * If we do change it, save the state for safe_wal_size below.
1019 alvherre 355 ECB : */
1019 alvherre 356 GIC 14 : if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
357 : {
358 : int pid;
1019 alvherre 359 ECB :
1019 alvherre 360 CBC 12 : SpinLockAcquire(&slot->mutex);
361 12 : pid = slot->active_pid;
1006 362 12 : slot_contents.data.restart_lsn = slot->data.restart_lsn;
1019 363 12 : SpinLockRelease(&slot->mutex);
1019 alvherre 364 GIC 12 : if (pid != 0)
1019 alvherre 365 EUB : {
1019 alvherre 366 UBC 0 : values[i++] = CStringGetTextDatum("unreserved");
1006 367 0 : walstate = WALAVAIL_UNRESERVED;
1019 alvherre 368 UIC 0 : break;
369 : }
1019 alvherre 370 ECB : }
1097 alvherre 371 CBC 14 : values[i++] = CStringGetTextDatum("lost");
1097 alvherre 372 GIC 14 : break;
373 : }
374 :
375 : /*
376 : * safe_wal_size is only computed for slots that have not been lost,
377 : * and only if there's a configured maximum size.
1006 alvherre 378 ECB : */
1006 alvherre 379 CBC 319 : if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
1006 alvherre 380 GIC 314 : nulls[i++] = true;
381 : else
382 : {
383 : XLogSegNo targetSeg;
384 : uint64 slotKeepSegs;
385 : uint64 keepSegs;
386 : XLogSegNo failSeg;
387 : XLogRecPtr failLSN;
1097 alvherre 388 ECB :
1006 alvherre 389 GIC 5 : XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
390 :
494 michael 391 ECB : /* determine how many segments can be kept by slots */
993 fujii 392 GIC 5 : slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
993 fujii 393 ECB : /* ditto for wal_keep_size */
993 fujii 394 GIC 5 : keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
395 :
1006 alvherre 396 ECB : /* if currpos reaches failLSN, we lose our segment */
993 fujii 397 CBC 5 : failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
1006 alvherre 398 GIC 5 : XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
1006 alvherre 399 ECB :
1006 alvherre 400 GIC 5 : values[i++] = Int64GetDatum(failLSN - currlsn);
401 : }
1097 alvherre 402 ECB :
767 akapila 403 GIC 319 : values[i++] = BoolGetDatum(slot_contents.data.two_phase);
767 akapila 404 ECB :
2 andres 405 GNC 319 : if (slot_contents.data.database == InvalidOid)
406 124 : nulls[i++] = true;
407 : else
408 : {
409 195 : if (slot_contents.data.invalidated != RS_INVAL_NONE)
410 12 : values[i++] = BoolGetDatum(true);
411 : else
412 183 : values[i++] = BoolGetDatum(false);
413 : }
414 :
1040 tgl 415 CBC 319 : Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
416 :
398 michael 417 GIC 319 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
398 michael 418 ECB : values, nulls);
3355 rhaas 419 : }
420 :
2084 alvherre 421 CBC 171 : LWLockRelease(ReplicationSlotControlLock);
422 :
3355 rhaas 423 GIC 171 : return (Datum) 0;
3355 rhaas 424 ECB : }
425 :
1908 simon 426 : /*
427 : * Helper function for advancing our physical replication slot forward.
428 : *
429 : * The LSN position to move to is compared simply to the slot's restart_lsn,
1725 alvherre 430 : * knowing that any position older than that would be removed by successive
431 : * checkpoints.
1908 simon 432 : */
433 : static XLogRecPtr
1763 michael 434 GIC 1 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
435 : {
436 1 : XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
437 1 : XLogRecPtr retlsn = startlsn;
438 :
1097 alvherre 439 1 : Assert(moveto != InvalidXLogRecPtr);
440 :
1763 michael 441 1 : if (startlsn < moveto)
442 : {
1763 michael 443 CBC 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
1908 simon 444 GIC 1 : MyReplicationSlot->data.restart_lsn = moveto;
1763 michael 445 CBC 1 : SpinLockRelease(&MyReplicationSlot->mutex);
1908 simon 446 1 : retlsn = moveto;
447 :
1165 michael 448 ECB : /*
449 : * Dirty the slot so as it is written out at the next checkpoint. Note
1060 tgl 450 : * that the LSN position advanced may still be lost in the event of a
451 : * crash, but this makes the data consistent after a clean shutdown.
1165 michael 452 : */
1165 michael 453 CBC 1 : ReplicationSlotMarkDirty();
1908 simon 454 ECB : }
455 :
1908 simon 456 GIC 1 : return retlsn;
457 : }
458 :
459 : /*
460 : * Helper function for advancing our logical replication slot forward.
461 : *
1200 michael 462 ECB : * The slot's restart_lsn is used as start point for reading records, while
463 : * confirmed_flush is used as base point for the decoding context.
464 : *
1725 alvherre 465 : * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
466 : * because we need to digest WAL to advance restart_lsn allowing to recycle
467 : * WAL and removal of old catalog tuples. As decoding is done in fast_forward
468 : * mode, no changes are generated anyway.
469 : */
470 : static XLogRecPtr
1763 michael 471 GIC 3 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
472 : {
473 : LogicalDecodingContext *ctx;
1809 tgl 474 3 : ResourceOwner old_resowner = CurrentResourceOwner;
475 : XLogRecPtr retlsn;
476 :
1097 alvherre 477 3 : Assert(moveto != InvalidXLogRecPtr);
478 :
1908 simon 479 3 : PG_TRY();
1908 simon 480 ECB : {
481 : /*
482 : * Create our decoding context in fast_forward mode, passing start_lsn
1725 alvherre 483 : * as InvalidXLogRecPtr, so that we start processing from my slot's
484 : * confirmed_flush.
485 : */
1908 simon 486 CBC 6 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
487 : NIL,
1725 alvherre 488 ECB : true, /* fast_forward */
699 tmunro 489 GIC 3 : XL_ROUTINE(.page_read = read_local_xlog_page,
490 : .segment_open = wal_segment_open,
491 : .segment_close = wal_segment_close),
492 : NULL, NULL, NULL);
493 :
494 : /*
1725 alvherre 495 ECB : * Start reading at the slot's restart_lsn, which we know to point to
496 : * a valid record.
497 : */
1169 heikki.linnakangas 498 CBC 3 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
499 :
500 : /* invalidate non-timetravel entries */
1908 simon 501 GIC 3 : InvalidateSystemCaches();
502 :
503 : /* Decode at least one record, until we run out of records */
1169 heikki.linnakangas 504 91 : while (ctx->reader->EndRecPtr < moveto)
505 : {
1908 simon 506 91 : char *errm = NULL;
1725 alvherre 507 ECB : XLogRecord *record;
508 :
509 : /*
510 : * Read records. No changes are generated in fast_forward mode,
511 : * but snapbuilder/slot statuses are updated properly.
512 : */
699 tmunro 513 CBC 91 : record = XLogReadRecord(ctx->reader, &errm);
1908 simon 514 GIC 91 : if (errm)
515 michael 515 LBC 0 : elog(ERROR, "could not find record while advancing replication slot: %s",
516 : errm);
517 :
518 : /*
519 : * Process the record. Storage-level changes are ignored in
520 : * fast_forward mode, but other modules (such as snapbuilder)
521 : * might still have critical updates to do.
1908 simon 522 ECB : */
1725 alvherre 523 CBC 91 : if (record)
1908 simon 524 GBC 91 : LogicalDecodingProcessRecord(ctx, ctx->reader);
525 :
526 : /* Stop once the requested target has been reached */
1908 simon 527 GIC 91 : if (moveto <= ctx->reader->EndRecPtr)
528 3 : break;
529 :
530 88 : CHECK_FOR_INTERRUPTS();
531 : }
1908 simon 532 ECB :
1726 tgl 533 : /*
534 : * Logical decoding could have clobbered CurrentResourceOwner during
535 : * transaction management, so restore the executor's value. (This is
536 : * a kluge, but it's not worth cleaning up right now.)
537 : */
1908 simon 538 GIC 3 : CurrentResourceOwner = old_resowner;
1908 simon 539 ECB :
1908 simon 540 GIC 3 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
541 : {
542 3 : LogicalConfirmReceivedLocation(moveto);
543 :
544 : /*
545 : * If only the confirmed_flush LSN has changed the slot won't get
546 : * marked as dirty by the above. Callers on the walsender
1908 simon 547 ECB : * interface are expected to keep track of their own progress and
548 : * don't need it written out. But SQL-interface users cannot
549 : * specify their own start positions and it's harder for them to
550 : * keep track of their progress, so we should make more of an
551 : * effort to save it for them.
552 : *
553 : * Dirty the slot so it is written out at the next checkpoint. The
554 : * LSN position advanced to may still be lost on a crash but this
555 : * makes the data consistent after a clean shutdown.
556 : */
1908 simon 557 GIC 3 : ReplicationSlotMarkDirty();
558 : }
559 :
560 3 : retlsn = MyReplicationSlot->data.confirmed_flush;
561 :
562 : /* free context, call shutdown callback */
563 3 : FreeDecodingContext(ctx);
564 :
565 3 : InvalidateSystemCaches();
1908 simon 566 ECB : }
1908 simon 567 UIC 0 : PG_CATCH();
568 : {
1908 simon 569 ECB : /* clear all timetravel entries */
1908 simon 570 UIC 0 : InvalidateSystemCaches();
571 :
1908 simon 572 LBC 0 : PG_RE_THROW();
573 : }
1908 simon 574 CBC 3 : PG_END_TRY();
575 :
1908 simon 576 GBC 3 : return retlsn;
577 : }
578 :
1908 simon 579 EUB : /*
580 : * SQL function for moving the position in a replication slot.
581 : */
582 : Datum
1908 simon 583 CBC 6 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
584 : {
585 6 : Name slotname = PG_GETARG_NAME(0);
1908 simon 586 GIC 6 : XLogRecPtr moveto = PG_GETARG_LSN(1);
587 : XLogRecPtr endlsn;
588 : XLogRecPtr minlsn;
589 : TupleDesc tupdesc;
590 : Datum values[2];
591 : bool nulls[2];
1908 simon 592 ECB : HeapTuple tuple;
593 : Datum result;
594 :
1908 simon 595 CBC 6 : Assert(!MyReplicationSlot);
596 :
572 michael 597 GIC 6 : CheckSlotPermissions();
598 :
1908 simon 599 6 : if (XLogRecPtrIsInvalid(moveto))
600 1 : ereport(ERROR,
601 : (errmsg("invalid target WAL LSN")));
602 :
603 : /* Build a tuple descriptor for our result type */
1908 simon 604 CBC 5 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1908 simon 605 UIC 0 : elog(ERROR, "return type must be a row type");
1908 simon 606 ECB :
607 : /*
608 : * We can't move slot past what's been flushed/replayed so clamp the
1906 609 : * target position accordingly.
610 : */
1908 simon 611 GIC 5 : if (!RecoveryInProgress())
520 rhaas 612 5 : moveto = Min(moveto, GetFlushRecPtr(NULL));
1908 simon 613 ECB : else
520 rhaas 614 UBC 0 : moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
615 :
616 : /* Acquire the slot so we "own" it */
667 alvherre 617 GIC 5 : ReplicationSlotAcquire(NameStr(*slotname), true);
618 :
619 : /* A slot whose restart_lsn has never been reserved cannot be advanced */
1733 michael 620 CBC 5 : if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
621 1 : ereport(ERROR,
622 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1076 alvherre 623 EUB : errmsg("replication slot \"%s\" cannot be advanced",
624 : NameStr(*slotname)),
625 : errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
1733 michael 626 ECB :
627 : /*
628 : * Check if the slot is not moving backwards. Physical slots rely simply
1763 629 : * on restart_lsn as a minimum point, while logical slots have confirmed
1200 630 : * consumption up to confirmed_flush, meaning that in both cases data
631 : * older than that is not available anymore.
632 : */
1763 michael 633 GIC 4 : if (OidIsValid(MyReplicationSlot->data.database))
634 3 : minlsn = MyReplicationSlot->data.confirmed_flush;
635 : else
636 1 : minlsn = MyReplicationSlot->data.restart_lsn;
637 :
638 4 : if (moveto < minlsn)
1908 simon 639 UIC 0 : ereport(ERROR,
640 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
641 : errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
775 peter 642 ECB : LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
1908 simon 643 :
644 : /* Do the actual slot update, depending on the slot type */
1908 simon 645 CBC 4 : if (OidIsValid(MyReplicationSlot->data.database))
1763 michael 646 GIC 3 : endlsn = pg_logical_replication_slot_advance(moveto);
1908 simon 647 ECB : else
1763 michael 648 GBC 1 : endlsn = pg_physical_replication_slot_advance(moveto);
649 :
1908 simon 650 GIC 4 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
651 4 : nulls[0] = false;
652 :
653 : /*
1025 michael 654 ECB : * Recompute the minimum LSN and xmin across all slots to adjust with the
655 : * advancing potentially done.
656 : */
1025 michael 657 CBC 4 : ReplicationSlotsComputeRequiredXmin(false);
1025 michael 658 GIC 4 : ReplicationSlotsComputeRequiredLSN();
1025 michael 659 ECB :
1908 simon 660 CBC 4 : ReplicationSlotRelease();
661 :
662 : /* Return the reached position. */
1908 simon 663 GIC 4 : values[1] = LSNGetDatum(endlsn);
664 4 : nulls[1] = false;
665 :
1908 simon 666 CBC 4 : tuple = heap_form_tuple(tupdesc, values, nulls);
667 4 : result = HeapTupleGetDatum(tuple);
668 :
669 4 : PG_RETURN_DATUM(result);
670 : }
671 :
1465 alvherre 672 ECB : /*
673 : * Helper function of copying a replication slot.
674 : */
675 : static Datum
1465 alvherre 676 CBC 14 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
677 : {
678 14 : Name src_name = PG_GETARG_NAME(0);
1465 alvherre 679 GIC 14 : Name dst_name = PG_GETARG_NAME(1);
680 14 : ReplicationSlot *src = NULL;
681 : ReplicationSlot first_slot_contents;
682 : ReplicationSlot second_slot_contents;
683 : XLogRecPtr src_restart_lsn;
684 : bool src_islogical;
1465 alvherre 685 ECB : bool temporary;
686 : char *plugin;
687 : Datum values[2];
688 : bool nulls[2];
689 : Datum result;
690 : TupleDesc tupdesc;
691 : HeapTuple tuple;
692 :
1465 alvherre 693 GIC 14 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1465 alvherre 694 UIC 0 : elog(ERROR, "return type must be a row type");
695 :
572 michael 696 GIC 14 : CheckSlotPermissions();
697 :
1465 alvherre 698 14 : if (logical_slot)
699 8 : CheckLogicalDecodingRequirements();
700 : else
701 6 : CheckSlotRequirements();
1465 alvherre 702 ECB :
1465 alvherre 703 GBC 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
704 :
1465 alvherre 705 ECB : /*
706 : * We need to prevent the source slot's reserved WAL from being removed,
707 : * but we don't want to lock that slot for very long, and it can advance
708 : * in the meantime. So obtain the source slot's data, and create a new
709 : * slot using its restart_lsn. Afterwards we lock the source slot again
710 : * and verify that the data we copied (name, type) has not changed
711 : * incompatibly. No inconvenient WAL removal can occur once the new slot
712 : * is created -- but since WAL removal could have occurred before we
713 : * managed to create the new slot, we advance the new slot's restart_lsn
714 : * to the source slot's updated restart_lsn the second time we lock it.
715 : */
1465 alvherre 716 GIC 15 : for (int i = 0; i < max_replication_slots; i++)
717 : {
718 15 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
719 :
720 15 : if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
721 : {
722 : /* Copy the slot contents while holding spinlock */
723 14 : SpinLockAcquire(&s->mutex);
1040 tgl 724 14 : first_slot_contents = *s;
1465 alvherre 725 CBC 14 : SpinLockRelease(&s->mutex);
1465 alvherre 726 GIC 14 : src = s;
1465 alvherre 727 CBC 14 : break;
728 : }
1465 alvherre 729 ECB : }
730 :
1465 alvherre 731 GIC 14 : LWLockRelease(ReplicationSlotControlLock);
1465 alvherre 732 ECB :
1465 alvherre 733 CBC 14 : if (src == NULL)
1465 alvherre 734 LBC 0 : ereport(ERROR,
1465 alvherre 735 ECB : (errcode(ERRCODE_UNDEFINED_OBJECT),
736 : errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
737 :
1040 tgl 738 GIC 14 : src_islogical = SlotIsLogical(&first_slot_contents);
739 14 : src_restart_lsn = first_slot_contents.data.restart_lsn;
1040 tgl 740 CBC 14 : temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
1040 tgl 741 GIC 14 : plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
1040 tgl 742 ECB :
1465 alvherre 743 EUB : /* Check type of replication slot */
1465 alvherre 744 GIC 14 : if (src_islogical != logical_slot)
745 2 : ereport(ERROR,
746 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1465 alvherre 747 ECB : src_islogical ?
748 : errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
749 : NameStr(*src_name)) :
750 : errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
751 : NameStr(*src_name))));
752 :
753 : /* Copying non-reserved slot doesn't make sense */
1465 alvherre 754 CBC 12 : if (XLogRecPtrIsInvalid(src_restart_lsn))
1465 alvherre 755 GIC 1 : ereport(ERROR,
756 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
757 : errmsg("cannot copy a replication slot that doesn't reserve WAL")));
758 :
759 : /* Overwrite params from optional arguments */
760 11 : if (PG_NARGS() >= 3)
761 6 : temporary = PG_GETARG_BOOL(2);
762 11 : if (PG_NARGS() >= 4)
1465 alvherre 763 ECB : {
1465 alvherre 764 CBC 4 : Assert(logical_slot);
1465 alvherre 765 GIC 4 : plugin = NameStr(*(PG_GETARG_NAME(3)));
766 : }
767 :
768 : /* Create new slot and acquire it */
1465 alvherre 769 CBC 11 : if (logical_slot)
1118 alvherre 770 ECB : {
771 : /*
772 : * We must not try to read WAL, since we haven't reserved it yet --
773 : * hence pass find_startpoint false. confirmed_flush will be set
774 : * below, by copying from the source slot.
775 : */
1465 alvherre 776 GIC 7 : create_logical_replication_slot(NameStr(*dst_name),
777 : plugin,
1465 alvherre 778 ECB : temporary,
779 : false,
780 : src_restart_lsn,
781 : false);
782 : }
783 : else
1465 alvherre 784 GIC 4 : create_physical_replication_slot(NameStr(*dst_name),
1465 alvherre 785 ECB : true,
786 : temporary,
787 : src_restart_lsn);
788 :
789 : /*
790 : * Update the destination slot to current values of the source slot;
791 : * recheck that the source slot is still the one we saw previously.
792 : */
793 : {
794 : TransactionId copy_effective_xmin;
795 : TransactionId copy_effective_catalog_xmin;
796 : TransactionId copy_xmin;
797 : TransactionId copy_catalog_xmin;
798 : XLogRecPtr copy_restart_lsn;
799 : XLogRecPtr copy_confirmed_flush;
800 : bool copy_islogical;
801 : char *copy_name;
802 :
803 : /* Copy data of source slot again */
1465 alvherre 804 GIC 10 : SpinLockAcquire(&src->mutex);
1040 tgl 805 10 : second_slot_contents = *src;
806 10 : SpinLockRelease(&src->mutex);
807 :
808 10 : copy_effective_xmin = second_slot_contents.effective_xmin;
809 10 : copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
810 :
811 10 : copy_xmin = second_slot_contents.data.xmin;
812 10 : copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
1040 tgl 813 CBC 10 : copy_restart_lsn = second_slot_contents.data.restart_lsn;
814 10 : copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
1465 alvherre 815 ECB :
816 : /* for existence check */
1040 tgl 817 CBC 10 : copy_name = NameStr(second_slot_contents.data.name);
818 10 : copy_islogical = SlotIsLogical(&second_slot_contents);
819 :
1465 alvherre 820 ECB : /*
1418 tgl 821 : * Check if the source slot still exists and is valid. We regard it as
822 : * invalid if the type of replication slot or name has been changed,
823 : * or the restart_lsn either is invalid or has gone backward. (The
824 : * restart_lsn could go backwards if the source slot is dropped and
825 : * copied from an older slot during installation.)
1465 alvherre 826 : *
827 : * Since erroring out will release and drop the destination slot we
828 : * don't need to release it here.
829 : */
1465 alvherre 830 GIC 10 : if (copy_restart_lsn < src_restart_lsn ||
831 10 : src_islogical != copy_islogical ||
832 10 : strcmp(copy_name, NameStr(*src_name)) != 0)
1465 alvherre 833 UIC 0 : ereport(ERROR,
834 : (errmsg("could not copy replication slot \"%s\"",
835 : NameStr(*src_name)),
836 : errdetail("The source replication slot was modified incompatibly during the copy operation.")));
837 :
838 : /* The source slot must have a consistent snapshot */
1118 alvherre 839 CBC 10 : if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
1118 alvherre 840 LBC 0 : ereport(ERROR,
1118 alvherre 841 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1118 alvherre 842 EUB : errmsg("cannot copy unfinished logical replication slot \"%s\"",
843 : NameStr(*src_name)),
844 : errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
845 :
846 : /* Install copied values again */
1465 alvherre 847 GIC 10 : SpinLockAcquire(&MyReplicationSlot->mutex);
1465 alvherre 848 CBC 10 : MyReplicationSlot->effective_xmin = copy_effective_xmin;
1465 alvherre 849 GBC 10 : MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
850 :
1465 alvherre 851 GIC 10 : MyReplicationSlot->data.xmin = copy_xmin;
852 10 : MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
853 10 : MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
1118 854 10 : MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
1465 855 10 : SpinLockRelease(&MyReplicationSlot->mutex);
1465 alvherre 856 ECB :
1465 alvherre 857 CBC 10 : ReplicationSlotMarkDirty();
858 10 : ReplicationSlotsComputeRequiredXmin(false);
1465 alvherre 859 GIC 10 : ReplicationSlotsComputeRequiredLSN();
1465 alvherre 860 CBC 10 : ReplicationSlotSave();
1465 alvherre 861 ECB :
862 : #ifdef USE_ASSERT_CHECKING
863 : /* Check that the restart_lsn is available */
864 : {
865 : XLogSegNo segno;
866 :
1465 alvherre 867 CBC 10 : XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
868 10 : Assert(XLogGetLastRemovedSegno() < segno);
1465 alvherre 869 ECB : }
870 : #endif
871 : }
872 :
873 : /* target slot fully created, mark as persistent if needed */
1465 alvherre 874 GIC 10 : if (logical_slot && !temporary)
875 3 : ReplicationSlotPersist();
1465 alvherre 876 ECB :
877 : /* All done. Set up the return values */
1465 alvherre 878 GIC 10 : values[0] = NameGetDatum(dst_name);
879 10 : nulls[0] = false;
880 10 : if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
881 : {
882 6 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
1465 alvherre 883 CBC 6 : nulls[1] = false;
1465 alvherre 884 ECB : }
885 : else
1465 alvherre 886 GIC 4 : nulls[1] = true;
1465 alvherre 887 ECB :
1465 alvherre 888 CBC 10 : tuple = heap_form_tuple(tupdesc, values, nulls);
889 10 : result = HeapTupleGetDatum(tuple);
890 :
891 10 : ReplicationSlotRelease();
1465 alvherre 892 ECB :
1465 alvherre 893 GIC 10 : PG_RETURN_DATUM(result);
894 : }
1465 alvherre 895 ECB :
896 : /* The wrappers below are all to appease opr_sanity */
897 : Datum
1465 alvherre 898 CBC 4 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
899 : {
900 4 : return copy_replication_slot(fcinfo, true);
901 : }
1465 alvherre 902 ECB :
903 : Datum
1465 alvherre 904 UIC 0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
905 : {
906 0 : return copy_replication_slot(fcinfo, true);
1465 alvherre 907 ECB : }
908 :
909 : Datum
1465 alvherre 910 GIC 4 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
911 : {
912 4 : return copy_replication_slot(fcinfo, true);
1465 alvherre 913 EUB : }
914 :
915 : Datum
1465 alvherre 916 GIC 2 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
917 : {
918 2 : return copy_replication_slot(fcinfo, false);
1465 alvherre 919 ECB : }
920 :
921 : Datum
1465 alvherre 922 GIC 4 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
923 : {
924 4 : return copy_replication_slot(fcinfo, false);
1465 alvherre 925 ECB : }
|