Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slotfuncs.c
4 : : * Support functions for replication slots
5 : : *
6 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/slotfuncs.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/htup_details.h"
16 : : #include "access/xlog_internal.h"
17 : : #include "access/xlogrecovery.h"
18 : : #include "access/xlogutils.h"
19 : : #include "funcapi.h"
20 : : #include "miscadmin.h"
21 : : #include "replication/decode.h"
22 : : #include "replication/logical.h"
23 : : #include "replication/slot.h"
24 : : #include "replication/slotsync.h"
25 : : #include "utils/builtins.h"
26 : : #include "utils/guc.h"
27 : : #include "utils/inval.h"
28 : : #include "utils/pg_lsn.h"
29 : : #include "utils/resowner.h"
30 : :
31 : : /*
32 : : * Helper function for creating a new physical replication slot with
33 : : * given arguments. Note that this function doesn't release the created
34 : : * slot.
35 : : *
36 : : * If restart_lsn is a valid value, we use it without WAL reservation
37 : : * routine. So the caller must guarantee that WAL is available.
38 : : */
39 : : static void
1836 alvherre@alvh.no-ip. 40 :CBC 36 : create_physical_replication_slot(char *name, bool immediately_reserve,
41 : : bool temporary, XLogRecPtr restart_lsn)
42 : : {
43 [ - + ]: 36 : Assert(!MyReplicationSlot);
44 : :
45 : : /* acquire replication slot, this will check for conflicting names */
46 [ + + ]: 36 : ReplicationSlotCreate(name, false,
47 : : temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
48 : : false, false);
49 : :
50 [ + + ]: 36 : if (immediately_reserve)
51 : : {
52 : : /* Reserve WAL as the user asked for it */
53 [ + + ]: 16 : if (XLogRecPtrIsInvalid(restart_lsn))
54 : 12 : ReplicationSlotReserveWal();
55 : : else
56 : 4 : MyReplicationSlot->data.restart_lsn = restart_lsn;
57 : :
58 : : /* Write this slot to disk */
59 : 16 : ReplicationSlotMarkDirty();
60 : 16 : ReplicationSlotSave();
61 : : }
62 : 36 : }
63 : :
64 : : /*
65 : : * SQL function for creating a new physical (streaming replication)
66 : : * replication slot.
67 : : */
68 : : Datum
3726 rhaas@postgresql.org 69 : 32 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
70 : : {
71 : 32 : Name name = PG_GETARG_NAME(0);
2866 72 : 32 : bool immediately_reserve = PG_GETARG_BOOL(1);
2684 peter_e@gmx.net 73 : 32 : bool temporary = PG_GETARG_BOOL(2);
74 : : Datum values[2];
75 : : bool nulls[2];
76 : : TupleDesc tupdesc;
77 : : HeapTuple tuple;
78 : : Datum result;
79 : :
3726 rhaas@postgresql.org 80 [ - + ]: 32 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3726 rhaas@postgresql.org 81 [ # # ]:UBC 0 : elog(ERROR, "return type must be a row type");
82 : :
943 michael@paquier.xyz 83 :CBC 32 : CheckSlotPermissions();
84 : :
3594 andres@anarazel.de 85 : 32 : CheckSlotRequirements();
86 : :
1836 alvherre@alvh.no-ip. 87 : 32 : create_physical_replication_slot(NameStr(*name),
88 : : immediately_reserve,
89 : : temporary,
90 : : InvalidXLogRecPtr);
91 : :
3695 rhaas@postgresql.org 92 : 32 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
3726 93 : 32 : nulls[0] = false;
94 : :
3169 andres@anarazel.de 95 [ + + ]: 32 : if (immediately_reserve)
96 : : {
97 : 12 : values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
98 : 12 : nulls[1] = false;
99 : : }
100 : : else
101 : 20 : nulls[1] = true;
102 : :
3726 rhaas@postgresql.org 103 : 32 : tuple = heap_form_tuple(tupdesc, values, nulls);
104 : 32 : result = HeapTupleGetDatum(tuple);
105 : :
106 : 32 : ReplicationSlotRelease();
107 : :
108 : 32 : PG_RETURN_DATUM(result);
109 : : }
110 : :
111 : :
112 : : /*
113 : : * Helper function for creating a new logical replication slot with
114 : : * given arguments. Note that this function doesn't release the created
115 : : * slot.
116 : : *
117 : : * When find_startpoint is false, the slot's confirmed_flush is not set; it's
118 : : * caller's responsibility to ensure it's set to something sensible.
119 : : */
120 : : static void
1836 alvherre@alvh.no-ip. 121 : 118 : create_logical_replication_slot(char *name, char *plugin,
122 : : bool temporary, bool two_phase,
123 : : bool failover,
124 : : XLogRecPtr restart_lsn,
125 : : bool find_startpoint)
126 : : {
3695 rhaas@postgresql.org 127 : 118 : LogicalDecodingContext *ctx = NULL;
128 : :
3594 andres@anarazel.de 129 [ - + ]: 118 : Assert(!MyReplicationSlot);
130 : :
131 : : /*
132 : : * Acquire a logical decoding slot, this will check for conflicting names.
133 : : * Initially create persistent slot as ephemeral - that allows us to
134 : : * nicely handle errors during initialization because it'll get dropped if
135 : : * this transaction fails. We'll make it persistent at the end. Temporary
136 : : * slots can be created as temporary from beginning as they get dropped on
137 : : * error as well.
138 : : */
1836 alvherre@alvh.no-ip. 139 [ + + ]: 118 : ReplicationSlotCreate(name, true,
140 : : temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
141 : : failover, false);
142 : :
143 : : /*
144 : : * Create logical decoding context to find start point or, if we don't
145 : : * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
146 : : *
147 : : * Note: when !find_startpoint this is still important, because it's at
148 : : * this point that the output plugin is validated.
149 : : */
150 : 113 : ctx = CreateInitDecodingContext(plugin, NIL,
151 : : false, /* just catalogs is OK */
152 : : restart_lsn,
1070 tmunro@postgresql.or 153 : 113 : XL_ROUTINE(.page_read = read_local_xlog_page,
154 : : .segment_open = wal_segment_open,
155 : : .segment_close = wal_segment_close),
156 : : NULL, NULL, NULL);
157 : :
158 : : /*
159 : : * If caller needs us to determine the decoding start point, do so now.
160 : : * This might take a while.
161 : : */
1489 alvherre@alvh.no-ip. 162 [ + + ]: 110 : if (find_startpoint)
163 : 104 : DecodingContextFindStartpoint(ctx);
164 : :
165 : : /* don't need the decoding context anymore */
3695 rhaas@postgresql.org 166 : 108 : FreeDecodingContext(ctx);
1836 alvherre@alvh.no-ip. 167 : 108 : }
168 : :
169 : : /*
170 : : * SQL function for creating a new logical replication slot.
171 : : */
172 : : Datum
173 : 112 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
174 : : {
175 : 112 : Name name = PG_GETARG_NAME(0);
176 : 112 : Name plugin = PG_GETARG_NAME(1);
177 : 112 : bool temporary = PG_GETARG_BOOL(2);
1138 akapila@postgresql.o 178 : 112 : bool two_phase = PG_GETARG_BOOL(3);
80 akapila@postgresql.o 179 :GNC 112 : bool failover = PG_GETARG_BOOL(4);
180 : : Datum result;
181 : : TupleDesc tupdesc;
182 : : HeapTuple tuple;
183 : : Datum values[2];
184 : : bool nulls[2];
185 : :
1836 alvherre@alvh.no-ip. 186 [ - + ]:CBC 112 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1836 alvherre@alvh.no-ip. 187 [ # # ]:UBC 0 : elog(ERROR, "return type must be a row type");
188 : :
943 michael@paquier.xyz 189 :CBC 112 : CheckSlotPermissions();
190 : :
1836 alvherre@alvh.no-ip. 191 : 111 : CheckLogicalDecodingRequirements();
192 : :
193 : 111 : create_logical_replication_slot(NameStr(*name),
194 : 111 : NameStr(*plugin),
195 : : temporary,
196 : : two_phase,
197 : : failover,
198 : : InvalidXLogRecPtr,
199 : : true);
200 : :
201 : 102 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
202 : 102 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
203 : :
3695 rhaas@postgresql.org 204 : 102 : memset(nulls, 0, sizeof(nulls));
205 : :
206 : 102 : tuple = heap_form_tuple(tupdesc, values, nulls);
207 : 102 : result = HeapTupleGetDatum(tuple);
208 : :
209 : : /* ok, slot is now fully created, mark it as persistent if needed */
2684 peter_e@gmx.net 210 [ + + ]: 102 : if (!temporary)
211 : 97 : ReplicationSlotPersist();
3695 rhaas@postgresql.org 212 : 102 : ReplicationSlotRelease();
213 : :
214 : 102 : PG_RETURN_DATUM(result);
215 : : }
216 : :
217 : :
218 : : /*
219 : : * SQL function for dropping a replication slot.
220 : : */
221 : : Datum
3726 222 : 122 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
223 : : {
224 : 122 : Name name = PG_GETARG_NAME(0);
225 : :
943 michael@paquier.xyz 226 : 122 : CheckSlotPermissions();
227 : :
3726 rhaas@postgresql.org 228 : 120 : CheckSlotRequirements();
229 : :
2417 alvherre@alvh.no-ip. 230 : 120 : ReplicationSlotDrop(NameStr(*name), true);
231 : :
3726 rhaas@postgresql.org 232 : 114 : PG_RETURN_VOID();
233 : : }
234 : :
235 : : /*
236 : : * pg_get_replication_slots - SQL SRF showing all replication slots
237 : : * that currently exist on the database cluster.
238 : : */
239 : : Datum
240 : 283 : pg_get_replication_slots(PG_FUNCTION_ARGS)
241 : : {
242 : : #define PG_GET_REPLICATION_SLOTS_COLS 19
243 : 283 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
244 : : XLogRecPtr currlsn;
245 : : int slotno;
246 : :
247 : : /*
248 : : * We don't require any special permission to see this function's data
249 : : * because nothing should be sensitive. The most critical being the slot
250 : : * name, which shouldn't contain anything particularly sensitive.
251 : : */
252 : :
544 michael@paquier.xyz 253 : 283 : InitMaterializedSRF(fcinfo, 0);
254 : :
1377 alvherre@alvh.no-ip. 255 : 283 : currlsn = GetXLogWriteRecPtr();
256 : :
2455 257 : 283 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3726 rhaas@postgresql.org 258 [ + + ]: 2276 : for (slotno = 0; slotno < max_replication_slots; slotno++)
259 : : {
260 : 1993 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
261 : : ReplicationSlot slot_contents;
262 : : Datum values[PG_GET_REPLICATION_SLOTS_COLS];
263 : : bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
264 : : WALAvailability walstate;
265 : : int i;
266 : : ReplicationSlotInvalidationCause cause;
267 : :
268 [ + + ]: 1993 : if (!slot->in_use)
269 : 1533 : continue;
270 : :
271 : : /* Copy slot contents while holding spinlock, then examine at leisure */
2455 alvherre@alvh.no-ip. 272 [ - + ]: 460 : SpinLockAcquire(&slot->mutex);
1411 tgl@sss.pgh.pa.us 273 : 460 : slot_contents = *slot;
3726 rhaas@postgresql.org 274 : 460 : SpinLockRelease(&slot->mutex);
275 : :
1411 tgl@sss.pgh.pa.us 276 : 460 : memset(values, 0, sizeof(values));
3726 rhaas@postgresql.org 277 : 460 : memset(nulls, 0, sizeof(nulls));
278 : :
279 : 460 : i = 0;
1411 tgl@sss.pgh.pa.us 280 : 460 : values[i++] = NameGetDatum(&slot_contents.data.name);
281 : :
282 [ + + ]: 460 : if (slot_contents.data.database == InvalidOid)
3695 rhaas@postgresql.org 283 : 142 : nulls[i++] = true;
284 : : else
1411 tgl@sss.pgh.pa.us 285 : 318 : values[i++] = NameGetDatum(&slot_contents.data.plugin);
286 : :
287 [ + + ]: 460 : if (slot_contents.data.database == InvalidOid)
3726 rhaas@postgresql.org 288 : 142 : values[i++] = CStringGetTextDatum("physical");
289 : : else
290 : 318 : values[i++] = CStringGetTextDatum("logical");
291 : :
1411 tgl@sss.pgh.pa.us 292 [ + + ]: 460 : if (slot_contents.data.database == InvalidOid)
3695 rhaas@postgresql.org 293 : 142 : nulls[i++] = true;
294 : : else
1411 tgl@sss.pgh.pa.us 295 : 318 : values[i++] = ObjectIdGetDatum(slot_contents.data.database);
296 : :
297 : 460 : values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
298 : 460 : values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
299 : :
300 [ + + ]: 460 : if (slot_contents.active_pid != 0)
301 : 157 : values[i++] = Int32GetDatum(slot_contents.active_pid);
302 : : else
3281 andres@anarazel.de 303 : 303 : nulls[i++] = true;
304 : :
1411 tgl@sss.pgh.pa.us 305 [ + + ]: 460 : if (slot_contents.data.xmin != InvalidTransactionId)
306 : 63 : values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
307 : : else
3726 rhaas@postgresql.org 308 : 397 : nulls[i++] = true;
309 : :
1411 tgl@sss.pgh.pa.us 310 [ + + ]: 460 : if (slot_contents.data.catalog_xmin != InvalidTransactionId)
311 : 347 : values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
312 : : else
3695 rhaas@postgresql.org 313 : 113 : nulls[i++] = true;
314 : :
1411 tgl@sss.pgh.pa.us 315 [ + + ]: 460 : if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
316 : 443 : values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
317 : : else
3726 rhaas@postgresql.org 318 : 17 : nulls[i++] = true;
319 : :
1411 tgl@sss.pgh.pa.us 320 [ + + ]: 460 : if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
321 : 295 : values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
322 : : else
3170 andres@anarazel.de 323 : 165 : nulls[i++] = true;
324 : :
325 : : /*
326 : : * If the slot has not been invalidated, test availability from
327 : : * restart_lsn.
328 : : */
373 329 [ + + ]: 460 : if (slot_contents.data.invalidated != RS_INVAL_NONE)
1388 alvherre@alvh.no-ip. 330 : 31 : walstate = WALAVAIL_REMOVED;
331 : : else
332 : 429 : walstate = GetWALAvailability(slot_contents.data.restart_lsn);
333 : :
1468 334 [ + + + + : 460 : switch (walstate)
+ - ]
335 : : {
336 : 14 : case WALAVAIL_INVALID_LSN:
337 : 14 : nulls[i++] = true;
338 : 14 : break;
339 : :
340 : 412 : case WALAVAIL_RESERVED:
341 : 412 : values[i++] = CStringGetTextDatum("reserved");
342 : 412 : break;
343 : :
1390 344 : 2 : case WALAVAIL_EXTENDED:
345 : 2 : values[i++] = CStringGetTextDatum("extended");
346 : 2 : break;
347 : :
348 : 1 : case WALAVAIL_UNRESERVED:
349 : 1 : values[i++] = CStringGetTextDatum("unreserved");
350 : 1 : break;
351 : :
1468 352 : 31 : case WALAVAIL_REMOVED:
353 : :
354 : : /*
355 : : * If we read the restart_lsn long enough ago, maybe that file
356 : : * has been removed by now. However, the walsender could have
357 : : * moved forward enough that it jumped to another file after
358 : : * we looked. If checkpointer signalled the process to
359 : : * termination, then it's definitely lost; but if a process is
360 : : * still alive, then "unreserved" seems more appropriate.
361 : : *
362 : : * If we do change it, save the state for safe_wal_size below.
363 : : */
1390 364 [ + + ]: 31 : if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
365 : : {
366 : : int pid;
367 : :
368 [ - + ]: 28 : SpinLockAcquire(&slot->mutex);
369 : 28 : pid = slot->active_pid;
1377 370 : 28 : slot_contents.data.restart_lsn = slot->data.restart_lsn;
1390 371 : 28 : SpinLockRelease(&slot->mutex);
372 [ - + ]: 28 : if (pid != 0)
373 : : {
1390 alvherre@alvh.no-ip. 374 :UBC 0 : values[i++] = CStringGetTextDatum("unreserved");
1377 375 : 0 : walstate = WALAVAIL_UNRESERVED;
1390 376 : 0 : break;
377 : : }
378 : : }
1468 alvherre@alvh.no-ip. 379 :CBC 31 : values[i++] = CStringGetTextDatum("lost");
380 : 31 : break;
381 : : }
382 : :
383 : : /*
384 : : * safe_wal_size is only computed for slots that have not been lost,
385 : : * and only if there's a configured maximum size.
386 : : */
1377 387 [ + + + + ]: 460 : if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
388 : 455 : nulls[i++] = true;
389 : : else
390 : : {
391 : : XLogSegNo targetSeg;
392 : : uint64 slotKeepSegs;
393 : : uint64 keepSegs;
394 : : XLogSegNo failSeg;
395 : : XLogRecPtr failLSN;
396 : :
397 : 5 : XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
398 : :
399 : : /* determine how many segments can be kept by slots */
1364 fujii@postgresql.org 400 : 5 : slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
401 : : /* ditto for wal_keep_size */
402 : 5 : keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
403 : :
404 : : /* if currpos reaches failLSN, we lose our segment */
405 : 5 : failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
1377 alvherre@alvh.no-ip. 406 : 5 : XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
407 : :
408 : 5 : values[i++] = Int64GetDatum(failLSN - currlsn);
409 : : }
410 : :
1138 akapila@postgresql.o 411 : 460 : values[i++] = BoolGetDatum(slot_contents.data.two_phase);
412 : :
18 akapila@postgresql.o 413 [ + + ]:GNC 460 : if (slot_contents.inactive_since > 0)
414 : 316 : values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
415 : : else
20 416 : 144 : nulls[i++] = true;
417 : :
23 418 : 460 : cause = slot_contents.data.invalidated;
419 : :
420 [ + + ]: 460 : if (SlotIsPhysical(&slot_contents))
373 andres@anarazel.de 421 :CBC 142 : nulls[i++] = true;
422 : : else
423 : : {
424 : : /*
425 : : * rows_removed and wal_level_insufficient are the only two
426 : : * reasons for the logical slot's conflict with recovery.
427 : : */
23 akapila@postgresql.o 428 [ + + + + ]:GNC 318 : if (cause == RS_INVAL_HORIZON ||
429 : : cause == RS_INVAL_WAL_LEVEL)
23 akapila@postgresql.o 430 :CBC 28 : values[i++] = BoolGetDatum(true);
431 : : else
432 : 290 : values[i++] = BoolGetDatum(false);
433 : : }
434 : :
23 akapila@postgresql.o 435 [ + + ]:GNC 460 : if (cause == RS_INVAL_NONE)
436 : 429 : nulls[i++] = true;
437 : : else
438 : 31 : values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
439 : :
80 440 : 460 : values[i++] = BoolGetDatum(slot_contents.data.failover);
441 : :
60 442 : 460 : values[i++] = BoolGetDatum(slot_contents.data.synced);
443 : :
1411 tgl@sss.pgh.pa.us 444 [ - + ]:CBC 460 : Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
445 : :
769 michael@paquier.xyz 446 : 460 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
447 : : values, nulls);
448 : : }
449 : :
2455 alvherre@alvh.no-ip. 450 : 283 : LWLockRelease(ReplicationSlotControlLock);
451 : :
3726 rhaas@postgresql.org 452 : 283 : return (Datum) 0;
453 : : }
454 : :
455 : : /*
456 : : * Helper function for advancing our physical replication slot forward.
457 : : *
458 : : * The LSN position to move to is compared simply to the slot's restart_lsn,
459 : : * knowing that any position older than that would be removed by successive
460 : : * checkpoints.
461 : : */
462 : : static XLogRecPtr
2134 michael@paquier.xyz 463 : 1 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
464 : : {
465 : 1 : XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
466 : 1 : XLogRecPtr retlsn = startlsn;
467 : :
1468 alvherre@alvh.no-ip. 468 [ - + ]: 1 : Assert(moveto != InvalidXLogRecPtr);
469 : :
2134 michael@paquier.xyz 470 [ + - ]: 1 : if (startlsn < moveto)
471 : : {
472 [ - + ]: 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
2279 simon@2ndQuadrant.co 473 : 1 : MyReplicationSlot->data.restart_lsn = moveto;
2134 michael@paquier.xyz 474 : 1 : SpinLockRelease(&MyReplicationSlot->mutex);
2279 simon@2ndQuadrant.co 475 : 1 : retlsn = moveto;
476 : :
477 : : /*
478 : : * Dirty the slot so as it is written out at the next checkpoint. Note
479 : : * that the LSN position advanced may still be lost in the event of a
480 : : * crash, but this makes the data consistent after a clean shutdown.
481 : : */
1536 michael@paquier.xyz 482 : 1 : ReplicationSlotMarkDirty();
483 : :
484 : : /*
485 : : * Wake up logical walsenders holding logical failover slots after
486 : : * updating the restart_lsn of the physical slot.
487 : : */
37 akapila@postgresql.o 488 :GNC 1 : PhysicalWakeupLogicalWalSnd();
489 : : }
490 : :
2279 simon@2ndQuadrant.co 491 :CBC 1 : return retlsn;
492 : : }
493 : :
494 : : /*
495 : : * Advance our logical replication slot forward. See
496 : : * LogicalSlotAdvanceAndCheckSnapState for details.
497 : : */
498 : : static XLogRecPtr
2134 michael@paquier.xyz 499 : 5 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
500 : : {
11 akapila@postgresql.o 501 :GNC 5 : return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
502 : : }
503 : :
504 : : /*
505 : : * SQL function for moving the position in a replication slot.
506 : : */
507 : : Datum
2279 simon@2ndQuadrant.co 508 :CBC 8 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
509 : : {
510 : 8 : Name slotname = PG_GETARG_NAME(0);
511 : 8 : XLogRecPtr moveto = PG_GETARG_LSN(1);
512 : : XLogRecPtr endlsn;
513 : : XLogRecPtr minlsn;
514 : : TupleDesc tupdesc;
515 : : Datum values[2];
516 : : bool nulls[2];
517 : : HeapTuple tuple;
518 : : Datum result;
519 : :
520 [ - + ]: 8 : Assert(!MyReplicationSlot);
521 : :
943 michael@paquier.xyz 522 : 8 : CheckSlotPermissions();
523 : :
2279 simon@2ndQuadrant.co 524 [ + + ]: 8 : if (XLogRecPtrIsInvalid(moveto))
525 [ + - ]: 1 : ereport(ERROR,
526 : : (errmsg("invalid target WAL LSN")));
527 : :
528 : : /* Build a tuple descriptor for our result type */
529 [ - + ]: 7 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2279 simon@2ndQuadrant.co 530 [ # # ]:UBC 0 : elog(ERROR, "return type must be a row type");
531 : :
532 : : /*
533 : : * We can't move slot past what's been flushed/replayed so clamp the
534 : : * target position accordingly.
535 : : */
2279 simon@2ndQuadrant.co 536 [ + - ]:CBC 7 : if (!RecoveryInProgress())
891 rhaas@postgresql.org 537 [ + + ]: 7 : moveto = Min(moveto, GetFlushRecPtr(NULL));
538 : : else
891 rhaas@postgresql.org 539 [ # # ]:UBC 0 : moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
540 : :
541 : : /* Acquire the slot so we "own" it */
1038 alvherre@alvh.no-ip. 542 :CBC 7 : ReplicationSlotAcquire(NameStr(*slotname), true);
543 : :
544 : : /* A slot whose restart_lsn has never been reserved cannot be advanced */
2104 michael@paquier.xyz 545 [ + + ]: 7 : if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
546 [ + - ]: 1 : ereport(ERROR,
547 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548 : : errmsg("replication slot \"%s\" cannot be advanced",
549 : : NameStr(*slotname)),
550 : : errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
551 : :
552 : : /*
553 : : * Check if the slot is not moving backwards. Physical slots rely simply
554 : : * on restart_lsn as a minimum point, while logical slots have confirmed
555 : : * consumption up to confirmed_flush, meaning that in both cases data
556 : : * older than that is not available anymore.
557 : : */
2134 558 [ + + ]: 6 : if (OidIsValid(MyReplicationSlot->data.database))
559 : 5 : minlsn = MyReplicationSlot->data.confirmed_flush;
560 : : else
561 : 1 : minlsn = MyReplicationSlot->data.restart_lsn;
562 : :
563 [ - + ]: 6 : if (moveto < minlsn)
2279 simon@2ndQuadrant.co 564 [ # # ]:UBC 0 : ereport(ERROR,
565 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
566 : : errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
567 : : LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
568 : :
569 : : /* Do the actual slot update, depending on the slot type */
2279 simon@2ndQuadrant.co 570 [ + + ]:CBC 6 : if (OidIsValid(MyReplicationSlot->data.database))
2134 michael@paquier.xyz 571 : 5 : endlsn = pg_logical_replication_slot_advance(moveto);
572 : : else
573 : 1 : endlsn = pg_physical_replication_slot_advance(moveto);
574 : :
2279 simon@2ndQuadrant.co 575 : 6 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
576 : 6 : nulls[0] = false;
577 : :
578 : : /*
579 : : * Recompute the minimum LSN and xmin across all slots to adjust with the
580 : : * advancing potentially done.
581 : : */
1396 michael@paquier.xyz 582 : 6 : ReplicationSlotsComputeRequiredXmin(false);
583 : 6 : ReplicationSlotsComputeRequiredLSN();
584 : :
2279 simon@2ndQuadrant.co 585 : 6 : ReplicationSlotRelease();
586 : :
587 : : /* Return the reached position. */
588 : 6 : values[1] = LSNGetDatum(endlsn);
589 : 6 : nulls[1] = false;
590 : :
591 : 6 : tuple = heap_form_tuple(tupdesc, values, nulls);
592 : 6 : result = HeapTupleGetDatum(tuple);
593 : :
594 : 6 : PG_RETURN_DATUM(result);
595 : : }
596 : :
597 : : /*
598 : : * Helper function of copying a replication slot.
599 : : */
600 : : static Datum
1836 alvherre@alvh.no-ip. 601 : 14 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
602 : : {
603 : 14 : Name src_name = PG_GETARG_NAME(0);
604 : 14 : Name dst_name = PG_GETARG_NAME(1);
605 : 14 : ReplicationSlot *src = NULL;
606 : : ReplicationSlot first_slot_contents;
607 : : ReplicationSlot second_slot_contents;
608 : : XLogRecPtr src_restart_lsn;
609 : : bool src_islogical;
610 : : bool temporary;
611 : : char *plugin;
612 : : Datum values[2];
613 : : bool nulls[2];
614 : : Datum result;
615 : : TupleDesc tupdesc;
616 : : HeapTuple tuple;
617 : :
618 [ - + ]: 14 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1836 alvherre@alvh.no-ip. 619 [ # # ]:UBC 0 : elog(ERROR, "return type must be a row type");
620 : :
943 michael@paquier.xyz 621 :CBC 14 : CheckSlotPermissions();
622 : :
1836 alvherre@alvh.no-ip. 623 [ + + ]: 14 : if (logical_slot)
624 : 8 : CheckLogicalDecodingRequirements();
625 : : else
626 : 6 : CheckSlotRequirements();
627 : :
628 : 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
629 : :
630 : : /*
631 : : * We need to prevent the source slot's reserved WAL from being removed,
632 : : * but we don't want to lock that slot for very long, and it can advance
633 : : * in the meantime. So obtain the source slot's data, and create a new
634 : : * slot using its restart_lsn. Afterwards we lock the source slot again
635 : : * and verify that the data we copied (name, type) has not changed
636 : : * incompatibly. No inconvenient WAL removal can occur once the new slot
637 : : * is created -- but since WAL removal could have occurred before we
638 : : * managed to create the new slot, we advance the new slot's restart_lsn
639 : : * to the source slot's updated restart_lsn the second time we lock it.
640 : : */
641 [ + - ]: 15 : for (int i = 0; i < max_replication_slots; i++)
642 : : {
643 : 15 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
644 : :
645 [ + - + + ]: 15 : if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
646 : : {
647 : : /* Copy the slot contents while holding spinlock */
648 [ - + ]: 14 : SpinLockAcquire(&s->mutex);
1411 tgl@sss.pgh.pa.us 649 : 14 : first_slot_contents = *s;
1836 alvherre@alvh.no-ip. 650 : 14 : SpinLockRelease(&s->mutex);
651 : 14 : src = s;
652 : 14 : break;
653 : : }
654 : : }
655 : :
656 : 14 : LWLockRelease(ReplicationSlotControlLock);
657 : :
658 [ - + ]: 14 : if (src == NULL)
1836 alvherre@alvh.no-ip. 659 [ # # ]:UBC 0 : ereport(ERROR,
660 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
661 : : errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
662 : :
1411 tgl@sss.pgh.pa.us 663 :CBC 14 : src_islogical = SlotIsLogical(&first_slot_contents);
664 : 14 : src_restart_lsn = first_slot_contents.data.restart_lsn;
665 : 14 : temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
666 [ + + ]: 14 : plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
667 : :
668 : : /* Check type of replication slot */
1836 alvherre@alvh.no-ip. 669 [ + + ]: 14 : if (src_islogical != logical_slot)
670 [ + - + + ]: 2 : ereport(ERROR,
671 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
672 : : src_islogical ?
673 : : errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
674 : : NameStr(*src_name)) :
675 : : errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
676 : : NameStr(*src_name))));
677 : :
678 : : /* Copying non-reserved slot doesn't make sense */
679 [ + + ]: 12 : if (XLogRecPtrIsInvalid(src_restart_lsn))
680 [ + - ]: 1 : ereport(ERROR,
681 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
682 : : errmsg("cannot copy a replication slot that doesn't reserve WAL")));
683 : :
684 : : /* Overwrite params from optional arguments */
685 [ + + ]: 11 : if (PG_NARGS() >= 3)
686 : 6 : temporary = PG_GETARG_BOOL(2);
687 [ + + ]: 11 : if (PG_NARGS() >= 4)
688 : : {
689 [ - + ]: 4 : Assert(logical_slot);
690 : 4 : plugin = NameStr(*(PG_GETARG_NAME(3)));
691 : : }
692 : :
693 : : /* Create new slot and acquire it */
694 [ + + ]: 11 : if (logical_slot)
695 : : {
696 : : /*
697 : : * We must not try to read WAL, since we haven't reserved it yet --
698 : : * hence pass find_startpoint false. confirmed_flush will be set
699 : : * below, by copying from the source slot.
700 : : *
701 : : * To avoid potential issues with the slot synchronization where the
702 : : * restart_lsn of a replication slot can go backward, we set the
703 : : * failover option to false here. This situation occurs when a slot
704 : : * on the primary server is dropped and immediately replaced with a
705 : : * new slot of the same name, created by copying from another existing
706 : : * slot. However, the slot synchronization will only observe the
707 : : * restart_lsn of the same slot going backward.
708 : : */
709 : 7 : create_logical_replication_slot(NameStr(*dst_name),
710 : : plugin,
711 : : temporary,
712 : : false,
713 : : false,
714 : : src_restart_lsn,
715 : : false);
716 : : }
717 : : else
718 : 4 : create_physical_replication_slot(NameStr(*dst_name),
719 : : true,
720 : : temporary,
721 : : src_restart_lsn);
722 : :
723 : : /*
724 : : * Update the destination slot to current values of the source slot;
725 : : * recheck that the source slot is still the one we saw previously.
726 : : */
727 : : {
728 : : TransactionId copy_effective_xmin;
729 : : TransactionId copy_effective_catalog_xmin;
730 : : TransactionId copy_xmin;
731 : : TransactionId copy_catalog_xmin;
732 : : XLogRecPtr copy_restart_lsn;
733 : : XLogRecPtr copy_confirmed_flush;
734 : : bool copy_islogical;
735 : : char *copy_name;
736 : :
737 : : /* Copy data of source slot again */
738 [ - + ]: 10 : SpinLockAcquire(&src->mutex);
1411 tgl@sss.pgh.pa.us 739 : 10 : second_slot_contents = *src;
740 : 10 : SpinLockRelease(&src->mutex);
741 : :
742 : 10 : copy_effective_xmin = second_slot_contents.effective_xmin;
743 : 10 : copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
744 : :
745 : 10 : copy_xmin = second_slot_contents.data.xmin;
746 : 10 : copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
747 : 10 : copy_restart_lsn = second_slot_contents.data.restart_lsn;
748 : 10 : copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
749 : :
750 : : /* for existence check */
751 : 10 : copy_name = NameStr(second_slot_contents.data.name);
752 : 10 : copy_islogical = SlotIsLogical(&second_slot_contents);
753 : :
754 : : /*
755 : : * Check if the source slot still exists and is valid. We regard it as
756 : : * invalid if the type of replication slot or name has been changed,
757 : : * or the restart_lsn either is invalid or has gone backward. (The
758 : : * restart_lsn could go backwards if the source slot is dropped and
759 : : * copied from an older slot during installation.)
760 : : *
761 : : * Since erroring out will release and drop the destination slot we
762 : : * don't need to release it here.
763 : : */
1836 alvherre@alvh.no-ip. 764 [ + - + - ]: 10 : if (copy_restart_lsn < src_restart_lsn ||
765 : 10 : src_islogical != copy_islogical ||
766 [ - + ]: 10 : strcmp(copy_name, NameStr(*src_name)) != 0)
1836 alvherre@alvh.no-ip. 767 [ # # ]:UBC 0 : ereport(ERROR,
768 : : (errmsg("could not copy replication slot \"%s\"",
769 : : NameStr(*src_name)),
770 : : errdetail("The source replication slot was modified incompatibly during the copy operation.")));
771 : :
772 : : /* The source slot must have a consistent snapshot */
1489 alvherre@alvh.no-ip. 773 [ + + - + ]:CBC 10 : if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
1489 alvherre@alvh.no-ip. 774 [ # # ]:UBC 0 : ereport(ERROR,
775 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
776 : : errmsg("cannot copy unfinished logical replication slot \"%s\"",
777 : : NameStr(*src_name)),
778 : : errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
779 : :
780 : : /* Install copied values again */
1836 alvherre@alvh.no-ip. 781 [ - + ]:CBC 10 : SpinLockAcquire(&MyReplicationSlot->mutex);
782 : 10 : MyReplicationSlot->effective_xmin = copy_effective_xmin;
783 : 10 : MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
784 : :
785 : 10 : MyReplicationSlot->data.xmin = copy_xmin;
786 : 10 : MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
787 : 10 : MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
1489 788 : 10 : MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
1836 789 : 10 : SpinLockRelease(&MyReplicationSlot->mutex);
790 : :
791 : 10 : ReplicationSlotMarkDirty();
792 : 10 : ReplicationSlotsComputeRequiredXmin(false);
793 : 10 : ReplicationSlotsComputeRequiredLSN();
794 : 10 : ReplicationSlotSave();
795 : :
796 : : #ifdef USE_ASSERT_CHECKING
797 : : /* Check that the restart_lsn is available */
798 : : {
799 : : XLogSegNo segno;
800 : :
801 : 10 : XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
802 [ - + ]: 10 : Assert(XLogGetLastRemovedSegno() < segno);
803 : : }
804 : : #endif
805 : : }
806 : :
807 : : /* target slot fully created, mark as persistent if needed */
808 [ + + + + ]: 10 : if (logical_slot && !temporary)
809 : 3 : ReplicationSlotPersist();
810 : :
811 : : /* All done. Set up the return values */
812 : 10 : values[0] = NameGetDatum(dst_name);
813 : 10 : nulls[0] = false;
814 [ + + ]: 10 : if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
815 : : {
816 : 6 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
817 : 6 : nulls[1] = false;
818 : : }
819 : : else
820 : 4 : nulls[1] = true;
821 : :
822 : 10 : tuple = heap_form_tuple(tupdesc, values, nulls);
823 : 10 : result = HeapTupleGetDatum(tuple);
824 : :
825 : 10 : ReplicationSlotRelease();
826 : :
827 : 10 : PG_RETURN_DATUM(result);
828 : : }
829 : :
830 : : /* The wrappers below are all to appease opr_sanity */
831 : : Datum
832 : 4 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
833 : : {
834 : 4 : return copy_replication_slot(fcinfo, true);
835 : : }
836 : :
837 : : Datum
1836 alvherre@alvh.no-ip. 838 :UBC 0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
839 : : {
840 : 0 : return copy_replication_slot(fcinfo, true);
841 : : }
842 : :
843 : : Datum
1836 alvherre@alvh.no-ip. 844 :CBC 4 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
845 : : {
846 : 4 : return copy_replication_slot(fcinfo, true);
847 : : }
848 : :
849 : : Datum
850 : 2 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
851 : : {
852 : 2 : return copy_replication_slot(fcinfo, false);
853 : : }
854 : :
855 : : Datum
856 : 4 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
857 : : {
858 : 4 : return copy_replication_slot(fcinfo, false);
859 : : }
860 : :
861 : : /*
862 : : * Synchronize failover enabled replication slots to a standby server
863 : : * from the primary server.
864 : : */
865 : : Datum
60 akapila@postgresql.o 866 :GNC 10 : pg_sync_replication_slots(PG_FUNCTION_ARGS)
867 : : {
868 : : WalReceiverConn *wrconn;
869 : : char *err;
870 : : StringInfoData app_name;
871 : :
872 : 10 : CheckSlotPermissions();
873 : :
874 [ + + ]: 9 : if (!RecoveryInProgress())
875 [ + - ]: 1 : ereport(ERROR,
876 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
877 : : errmsg("replication slots can only be synchronized to a standby server"));
878 : :
52 879 : 8 : ValidateSlotSyncParams(ERROR);
880 : :
881 : : /* Load the libpq-specific functions */
60 882 : 8 : load_file("libpqwalreceiver", false);
883 : :
52 884 : 8 : (void) CheckAndGetDbnameFromConninfo();
885 : :
60 886 : 7 : initStringInfo(&app_name);
887 [ + - ]: 7 : if (cluster_name[0])
888 : 7 : appendStringInfo(&app_name, "%s_slotsync", cluster_name);
889 : : else
60 akapila@postgresql.o 890 :UNC 0 : appendStringInfoString(&app_name, "slotsync");
891 : :
892 : : /* Connect to the primary server. */
60 akapila@postgresql.o 893 :GNC 7 : wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
894 : : app_name.data, &err);
895 : 7 : pfree(app_name.data);
896 : :
897 [ - + ]: 7 : if (!wrconn)
60 akapila@postgresql.o 898 [ # # ]:UNC 0 : ereport(ERROR,
899 : : errcode(ERRCODE_CONNECTION_FAILURE),
900 : : errmsg("could not connect to the primary server: %s", err));
901 : :
60 akapila@postgresql.o 902 :GNC 7 : SyncReplicationSlots(wrconn);
903 : :
904 : 6 : walrcv_disconnect(wrconn);
905 : :
906 : 6 : PG_RETURN_VOID();
907 : : }
|