Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slot.c
4 : : * Replication slot management.
5 : : *
6 : : *
7 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/replication/slot.c
12 : : *
13 : : * NOTES
14 : : *
15 : : * Replication slots are used to keep state about replication streams
16 : : * originating from this cluster. Their primary purpose is to prevent the
17 : : * premature removal of WAL or of old tuple versions in a manner that would
18 : : * interfere with replication; they are also useful for monitoring purposes.
19 : : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : : * on standbys (to support cascading setups). The requirement that slots be
21 : : * usable on standbys precludes storing them in the system catalogs.
22 : : *
23 : : * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 : : * directory. Inside that directory the state file will contain the slot's
25 : : * own data. Additional data can be stored alongside that file if required.
26 : : * While the server is running, the state data is also cached in memory for
27 : : * efficiency.
28 : : *
29 : : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : : * of a slot. The remaining data in each slot is protected by its mutex.
33 : : *
34 : : *-------------------------------------------------------------------------
35 : : */
36 : :
37 : : #include "postgres.h"
38 : :
39 : : #include <unistd.h>
40 : : #include <sys/stat.h>
41 : :
42 : : #include "access/transam.h"
43 : : #include "access/xlog_internal.h"
44 : : #include "access/xlogrecovery.h"
45 : : #include "common/file_utils.h"
46 : : #include "common/string.h"
47 : : #include "miscadmin.h"
48 : : #include "pgstat.h"
49 : : #include "postmaster/interrupt.h"
50 : : #include "replication/slotsync.h"
51 : : #include "replication/slot.h"
52 : : #include "replication/walsender_private.h"
53 : : #include "storage/fd.h"
54 : : #include "storage/ipc.h"
55 : : #include "storage/proc.h"
56 : : #include "storage/procarray.h"
57 : : #include "utils/builtins.h"
58 : : #include "utils/guc_hooks.h"
59 : : #include "utils/varlena.h"
60 : :
61 : : /*
62 : : * Replication slot on-disk data structure.
63 : : */
64 : : typedef struct ReplicationSlotOnDisk
65 : : {
66 : : /* first part of this struct needs to be version independent */
67 : :
68 : : /* data not covered by checksum */
69 : : uint32 magic;
70 : : pg_crc32c checksum;
71 : :
72 : : /* data covered by checksum */
73 : : uint32 version;
74 : : uint32 length;
75 : :
76 : : /*
77 : : * The actual data in the slot that follows can differ based on the above
78 : : * 'version'.
79 : : */
80 : :
81 : : ReplicationSlotPersistentData slotdata;
82 : : } ReplicationSlotOnDisk;
83 : :
84 : : /*
85 : : * Struct for the configuration of standby_slot_names.
86 : : *
87 : : * Note: this must be a flat representation that can be held in a single chunk
88 : : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
89 : : * standby_slot_names GUC.
90 : : */
91 : : typedef struct
92 : : {
93 : : /* Number of slot names in the slot_names[] */
94 : : int nslotnames;
95 : :
96 : : /*
97 : : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
98 : : */
99 : : char slot_names[FLEXIBLE_ARRAY_MEMBER];
100 : : } StandbySlotNamesConfigData;
101 : :
102 : : /*
103 : : * Lookup table for slot invalidation causes.
104 : : */
105 : : const char *const SlotInvalidationCauses[] = {
106 : : [RS_INVAL_NONE] = "none",
107 : : [RS_INVAL_WAL_REMOVED] = "wal_removed",
108 : : [RS_INVAL_HORIZON] = "rows_removed",
109 : : [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
110 : : };
111 : :
112 : : /* Maximum number of invalidation causes */
113 : : #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
114 : :
115 : : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
116 : : "array length mismatch");
117 : :
118 : : /* size of version independent data */
119 : : #define ReplicationSlotOnDiskConstantSize \
120 : : offsetof(ReplicationSlotOnDisk, slotdata)
121 : : /* size of the part of the slot not covered by the checksum */
122 : : #define ReplicationSlotOnDiskNotChecksummedSize \
123 : : offsetof(ReplicationSlotOnDisk, version)
124 : : /* size of the part covered by the checksum */
125 : : #define ReplicationSlotOnDiskChecksummedSize \
126 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
127 : : /* size of the slot data that is version dependent */
128 : : #define ReplicationSlotOnDiskV2Size \
129 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
130 : :
131 : : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
132 : : #define SLOT_VERSION 5 /* version for new files */
133 : :
134 : : /* Control array for replication slot management */
135 : : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
136 : :
137 : : /* My backend's replication slot in the shared memory array */
138 : : ReplicationSlot *MyReplicationSlot = NULL;
139 : :
140 : : /* GUC variables */
141 : : int max_replication_slots = 10; /* the maximum number of replication
142 : : * slots */
143 : :
144 : : /*
145 : : * This GUC lists streaming replication standby server slot names that
146 : : * logical WAL sender processes will wait for.
147 : : */
148 : : char *standby_slot_names;
149 : :
150 : : /* This is the parsed and cached configuration for standby_slot_names */
151 : : static StandbySlotNamesConfigData *standby_slot_names_config;
152 : :
153 : : /*
154 : : * Oldest LSN that has been confirmed to be flushed to the standbys
155 : : * corresponding to the physical slots specified in the standby_slot_names GUC.
156 : : */
157 : : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
158 : :
159 : : static void ReplicationSlotShmemExit(int code, Datum arg);
160 : : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
161 : :
162 : : /* internal persistency functions */
163 : : static void RestoreSlotFromDisk(const char *name);
164 : : static void CreateSlotOnDisk(ReplicationSlot *slot);
165 : : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
166 : :
167 : : /*
168 : : * Report shared-memory space needed by ReplicationSlotsShmemInit.
169 : : */
170 : : Size
3726 rhaas@postgresql.org 171 :CBC 3473 : ReplicationSlotsShmemSize(void)
172 : : {
173 : 3473 : Size size = 0;
174 : :
175 [ + + ]: 3473 : if (max_replication_slots == 0)
3726 rhaas@postgresql.org 176 :GBC 2 : return size;
177 : :
3726 rhaas@postgresql.org 178 :CBC 3471 : size = offsetof(ReplicationSlotCtlData, replication_slots);
179 : 3471 : size = add_size(size,
180 : : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
181 : :
182 : 3471 : return size;
183 : : }
184 : :
185 : : /*
186 : : * Allocate and initialize shared memory for replication slots.
187 : : */
188 : : void
189 : 898 : ReplicationSlotsShmemInit(void)
190 : : {
191 : : bool found;
192 : :
193 [ + + ]: 898 : if (max_replication_slots == 0)
3726 rhaas@postgresql.org 194 :GBC 1 : return;
195 : :
3726 rhaas@postgresql.org 196 :CBC 897 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
197 : 897 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
198 : : &found);
199 : :
200 [ + - ]: 897 : if (!found)
201 : : {
202 : : int i;
203 : :
204 : : /* First time through, so initialize */
205 [ + - + - : 1737 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+ - + + +
+ ]
206 : :
207 [ + + ]: 9679 : for (i = 0; i < max_replication_slots; i++)
208 : : {
209 : 8782 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
210 : :
211 : : /* everything else is zeroed by the memset above */
212 : 8782 : SpinLockInit(&slot->mutex);
1430 tgl@sss.pgh.pa.us 213 : 8782 : LWLockInitialize(&slot->io_in_progress_lock,
214 : : LWTRANCHE_REPLICATION_SLOT_IO);
2455 alvherre@alvh.no-ip. 215 : 8782 : ConditionVariableInit(&slot->active_cv);
216 : : }
217 : : }
218 : : }
219 : :
220 : : /*
221 : : * Register the callback for replication slot cleanup and releasing.
222 : : */
223 : : void
790 andres@anarazel.de 224 : 19573 : ReplicationSlotInitialize(void)
225 : : {
226 : 19573 : before_shmem_exit(ReplicationSlotShmemExit, 0);
227 : 19573 : }
228 : :
229 : : /*
230 : : * Release and cleanup replication slots.
231 : : */
232 : : static void
233 : 18043 : ReplicationSlotShmemExit(int code, Datum arg)
234 : : {
235 : : /* Make sure active replication slots are released */
236 [ + + ]: 18043 : if (MyReplicationSlot != NULL)
237 : 194 : ReplicationSlotRelease();
238 : :
239 : : /* Also cleanup all the temporary slots. */
240 : 18043 : ReplicationSlotCleanup();
241 : 18043 : }
242 : :
243 : : /*
244 : : * Check whether the passed slot name is valid and report errors at elevel.
245 : : *
246 : : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
247 : : * the name to be used as a directory name on every supported OS.
248 : : *
249 : : * Returns whether the directory name is valid or not if elevel < ERROR.
250 : : */
251 : : bool
3726 rhaas@postgresql.org 252 : 794 : ReplicationSlotValidateName(const char *name, int elevel)
253 : : {
254 : : const char *cp;
255 : :
256 [ + + ]: 794 : if (strlen(name) == 0)
257 : : {
258 [ + - ]: 3 : ereport(elevel,
259 : : (errcode(ERRCODE_INVALID_NAME),
260 : : errmsg("replication slot name \"%s\" is too short",
261 : : name)));
3726 rhaas@postgresql.org 262 :UBC 0 : return false;
263 : : }
264 : :
3726 rhaas@postgresql.org 265 [ - + ]:CBC 791 : if (strlen(name) >= NAMEDATALEN)
266 : : {
3726 rhaas@postgresql.org 267 [ # # ]:UBC 0 : ereport(elevel,
268 : : (errcode(ERRCODE_NAME_TOO_LONG),
269 : : errmsg("replication slot name \"%s\" is too long",
270 : : name)));
271 : 0 : return false;
272 : : }
273 : :
3726 rhaas@postgresql.org 274 [ + + ]:CBC 15791 : for (cp = name; *cp; cp++)
275 : : {
276 [ + + - + ]: 15001 : if (!((*cp >= 'a' && *cp <= 'z')
277 [ + - + + ]: 7372 : || (*cp >= '0' && *cp <= '9')
278 [ + + ]: 1447 : || (*cp == '_')))
279 : : {
280 [ + - ]: 1 : ereport(elevel,
281 : : (errcode(ERRCODE_INVALID_NAME),
282 : : errmsg("replication slot name \"%s\" contains invalid character",
283 : : name),
284 : : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
3726 rhaas@postgresql.org 285 :UBC 0 : return false;
286 : : }
287 : : }
3726 rhaas@postgresql.org 288 :CBC 790 : return true;
289 : : }
290 : :
291 : : /*
292 : : * Create a new replication slot and mark it as used by this backend.
293 : : *
294 : : * name: Name of the slot
295 : : * db_specific: logical decoding is db specific; if the slot is going to
296 : : * be used for that pass true, otherwise false.
297 : : * two_phase: Allows decoding of prepared transactions. We allow this option
298 : : * to be enabled only at the slot creation time. If we allow this option
299 : : * to be changed during decoding then it is quite possible that we skip
300 : : * prepare first time because this option was not enabled. Now next time
301 : : * during getting changes, if the two_phase option is enabled it can skip
302 : : * prepare because by that time start decoding point has been moved. So the
303 : : * user will only get commit prepared.
304 : : * failover: If enabled, allows the slot to be synced to standbys so
305 : : * that logical replication can be resumed after failover.
306 : : * synced: True if the slot is synchronized from the primary server.
307 : : */
308 : : void
3695 309 : 563 : ReplicationSlotCreate(const char *name, bool db_specific,
310 : : ReplicationSlotPersistency persistency,
311 : : bool two_phase, bool failover, bool synced)
312 : : {
3726 313 : 563 : ReplicationSlot *slot = NULL;
314 : : int i;
315 : :
316 [ - + ]: 563 : Assert(MyReplicationSlot == NULL);
317 : :
318 : 563 : ReplicationSlotValidateName(name, ERROR);
319 : :
60 akapila@postgresql.o 320 [ + + ]:GNC 562 : if (failover)
321 : : {
322 : : /*
323 : : * Do not allow users to create the failover enabled slots on the
324 : : * standby as we do not support sync to the cascading standby.
325 : : *
326 : : * However, failover enabled slots can be created during slot
327 : : * synchronization because we need to retain the same values as the
328 : : * remote slot.
329 : : */
330 [ + + - + ]: 18 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
60 akapila@postgresql.o 331 [ # # ]:UNC 0 : ereport(ERROR,
332 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
333 : : errmsg("cannot enable failover for a replication slot created on the standby"));
334 : :
335 : : /*
336 : : * Do not allow users to create failover enabled temporary slots,
337 : : * because temporary slots will not be synced to the standby.
338 : : *
339 : : * However, failover enabled temporary slots can be created during
340 : : * slot synchronization. See the comments atop slotsync.c for details.
341 : : */
60 akapila@postgresql.o 342 [ + + + + ]:GNC 18 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
343 [ + - ]: 1 : ereport(ERROR,
344 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 : : errmsg("cannot enable failover for a temporary replication slot"));
346 : : }
347 : :
348 : : /*
349 : : * If some other backend ran this code concurrently with us, we'd likely
350 : : * both allocate the same slot, and that would be bad. We'd also be at
351 : : * risk of missing a name collision. Also, we don't want to try to create
352 : : * a new slot while somebody's busy cleaning up an old one, because we
353 : : * might both be monkeying with the same directory.
354 : : */
3726 rhaas@postgresql.org 355 :CBC 561 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
356 : :
357 : : /*
358 : : * Check for name collision, and identify an allocatable slot. We need to
359 : : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
360 : : * else can change the in_use flags while we're looking at them.
361 : : */
362 : 561 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
363 [ + + ]: 5331 : for (i = 0; i < max_replication_slots; i++)
364 : : {
365 : 4773 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
366 : :
367 [ + + + + ]: 4773 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
368 [ + - ]: 3 : ereport(ERROR,
369 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
370 : : errmsg("replication slot \"%s\" already exists", name)));
371 [ + + + + ]: 4770 : if (!s->in_use && slot == NULL)
372 : 557 : slot = s;
373 : : }
374 : 558 : LWLockRelease(ReplicationSlotControlLock);
375 : :
376 : : /* If all slots are in use, we're out of luck. */
377 [ + + ]: 558 : if (slot == NULL)
378 [ + - ]: 1 : ereport(ERROR,
379 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
380 : : errmsg("all replication slots are in use"),
381 : : errhint("Free one or increase max_replication_slots.")));
382 : :
383 : : /*
384 : : * Since this slot is not in use, nobody should be looking at any part of
385 : : * it other than the in_use field unless they're trying to allocate it.
386 : : * And since we hold ReplicationSlotAllocationLock, nobody except us can
387 : : * be doing that. So it's safe to initialize the slot.
388 : : */
389 [ - + ]: 557 : Assert(!slot->in_use);
3281 andres@anarazel.de 390 [ - + ]: 557 : Assert(slot->active_pid == 0);
391 : :
392 : : /* first initialize persistent data */
2797 393 : 557 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
1343 peter@eisentraut.org 394 : 557 : namestrcpy(&slot->data.name, name);
3726 rhaas@postgresql.org 395 [ + + ]: 557 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
2797 andres@anarazel.de 396 : 557 : slot->data.persistency = persistency;
1138 akapila@postgresql.o 397 : 557 : slot->data.two_phase = two_phase;
1005 398 : 557 : slot->data.two_phase_at = InvalidXLogRecPtr;
80 akapila@postgresql.o 399 :GNC 557 : slot->data.failover = failover;
60 400 : 557 : slot->data.synced = synced;
401 : :
402 : : /* and then data only present in shared memory */
2797 andres@anarazel.de 403 :CBC 557 : slot->just_dirtied = false;
404 : 557 : slot->dirty = false;
405 : 557 : slot->effective_xmin = InvalidTransactionId;
406 : 557 : slot->effective_catalog_xmin = InvalidTransactionId;
407 : 557 : slot->candidate_catalog_xmin = InvalidTransactionId;
408 : 557 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
409 : 557 : slot->candidate_restart_valid = InvalidXLogRecPtr;
410 : 557 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
213 akapila@postgresql.o 411 :GNC 557 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
18 412 : 557 : slot->inactive_since = 0;
413 : :
414 : : /*
415 : : * Create the slot on disk. We haven't actually marked the slot allocated
416 : : * yet, so no special cleanup is required if this errors out.
417 : : */
3726 rhaas@postgresql.org 418 :CBC 557 : CreateSlotOnDisk(slot);
419 : :
420 : : /*
421 : : * We need to briefly prevent any other backend from iterating over the
422 : : * slots while we flip the in_use flag. We also need to set the active
423 : : * flag while holding the ControlLock as otherwise a concurrent
424 : : * ReplicationSlotAcquire() could acquire the slot as well.
425 : : */
426 : 557 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
427 : :
428 : 557 : slot->in_use = true;
429 : :
430 : : /* We can now mark the slot active, and that makes it our slot. */
3113 431 [ - + ]: 557 : SpinLockAcquire(&slot->mutex);
432 [ - + ]: 557 : Assert(slot->active_pid == 0);
433 : 557 : slot->active_pid = MyProcPid;
434 : 557 : SpinLockRelease(&slot->mutex);
435 : 557 : MyReplicationSlot = slot;
436 : :
3726 437 : 557 : LWLockRelease(ReplicationSlotControlLock);
438 : :
439 : : /*
440 : : * Create statistics entry for the new logical slot. We don't collect any
441 : : * stats for physical slots, so no need to create an entry for the same.
442 : : * See ReplicationSlotDropPtr for why we need to do this before releasing
443 : : * ReplicationSlotAllocationLock.
444 : : */
1284 akapila@postgresql.o 445 [ + + ]: 557 : if (SlotIsLogical(slot))
739 andres@anarazel.de 446 : 402 : pgstat_create_replslot(slot);
447 : :
448 : : /*
449 : : * Now that the slot has been marked as in_use and active, it's safe to
450 : : * let somebody else try to allocate a slot.
451 : : */
3726 rhaas@postgresql.org 452 : 557 : LWLockRelease(ReplicationSlotAllocationLock);
453 : :
454 : : /* Let everybody know we've modified this slot */
2455 alvherre@alvh.no-ip. 455 : 557 : ConditionVariableBroadcast(&slot->active_cv);
3726 rhaas@postgresql.org 456 : 557 : }
457 : :
458 : : /*
459 : : * Search for the named replication slot.
460 : : *
461 : : * Return the replication slot if found, otherwise NULL.
462 : : */
463 : : ReplicationSlot *
1083 akapila@postgresql.o 464 : 1270 : SearchNamedReplicationSlot(const char *name, bool need_lock)
465 : : {
466 : : int i;
467 : 1270 : ReplicationSlot *slot = NULL;
468 : :
469 [ + + ]: 1270 : if (need_lock)
470 : 72 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
471 : :
3726 rhaas@postgresql.org 472 [ + + ]: 2242 : for (i = 0; i < max_replication_slots; i++)
473 : : {
474 : 2223 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
475 : :
476 [ + + + + ]: 2223 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
477 : : {
478 : 1251 : slot = s;
479 : 1251 : break;
480 : : }
481 : : }
482 : :
1083 akapila@postgresql.o 483 [ + + ]: 1270 : if (need_lock)
484 : 72 : LWLockRelease(ReplicationSlotControlLock);
485 : :
1395 fujii@postgresql.org 486 : 1270 : return slot;
487 : : }
488 : :
489 : : /*
490 : : * Return the index of the replication slot in
491 : : * ReplicationSlotCtl->replication_slots.
492 : : *
493 : : * This is mainly useful to have an efficient key for storing replication slot
494 : : * stats.
495 : : */
496 : : int
739 andres@anarazel.de 497 : 6886 : ReplicationSlotIndex(ReplicationSlot *slot)
498 : : {
499 [ + - - + ]: 6886 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
500 : : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
501 : :
502 : 6886 : return slot - ReplicationSlotCtl->replication_slots;
503 : : }
504 : :
505 : : /*
506 : : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
507 : : * the slot's name and true is returned.
508 : : *
509 : : * This likely is only useful for pgstat_replslot.c during shutdown, in other
510 : : * cases there are obvious TOCTOU issues.
511 : : */
512 : : bool
554 513 : 67 : ReplicationSlotName(int index, Name name)
514 : : {
515 : : ReplicationSlot *slot;
516 : : bool found;
517 : :
518 : 67 : slot = &ReplicationSlotCtl->replication_slots[index];
519 : :
520 : : /*
521 : : * Ensure that the slot cannot be dropped while we copy the name. Don't
522 : : * need the spinlock as the name of an existing slot cannot change.
523 : : */
524 : 67 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
525 : 67 : found = slot->in_use;
526 [ + - ]: 67 : if (slot->in_use)
527 : 67 : namestrcpy(name, NameStr(slot->data.name));
528 : 67 : LWLockRelease(ReplicationSlotControlLock);
529 : :
530 : 67 : return found;
531 : : }
532 : :
533 : : /*
534 : : * Find a previously created slot and mark it as used by this process.
535 : : *
536 : : * An error is raised if nowait is true and the slot is currently in use. If
537 : : * nowait is false, we sleep until the slot is released by the owning process.
538 : : */
539 : : void
1038 alvherre@alvh.no-ip. 540 : 1110 : ReplicationSlotAcquire(const char *name, bool nowait)
541 : : {
542 : : ReplicationSlot *s;
543 : : int active_pid;
544 : :
534 peter@eisentraut.org 545 [ + - ]: 1110 : Assert(name != NULL);
546 : :
1395 fujii@postgresql.org 547 : 1110 : retry:
548 [ - + ]: 1110 : Assert(MyReplicationSlot == NULL);
549 : :
550 : 1110 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
551 : :
552 : : /* Check if the slot exits with the given name. */
1038 alvherre@alvh.no-ip. 553 : 1110 : s = SearchNamedReplicationSlot(name, false);
1395 fujii@postgresql.org 554 [ + + - + ]: 1110 : if (s == NULL || !s->in_use)
555 : : {
556 : 8 : LWLockRelease(ReplicationSlotControlLock);
557 : :
3726 rhaas@postgresql.org 558 [ + - ]: 8 : ereport(ERROR,
559 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
560 : : errmsg("replication slot \"%s\" does not exist",
561 : : name)));
562 : : }
563 : :
564 : : /*
565 : : * This is the slot we want; check if it's active under some other
566 : : * process. In single user mode, we don't need this check.
567 : : */
1395 fujii@postgresql.org 568 [ + - ]: 1102 : if (IsUnderPostmaster)
569 : : {
570 : : /*
571 : : * Get ready to sleep on the slot in case it is active. (We may end
572 : : * up not sleeping, but we don't want to do this while holding the
573 : : * spinlock.)
574 : : */
1038 alvherre@alvh.no-ip. 575 [ + + ]: 1102 : if (!nowait)
1395 fujii@postgresql.org 576 : 230 : ConditionVariablePrepareToSleep(&s->active_cv);
577 : :
578 [ - + ]: 1102 : SpinLockAcquire(&s->mutex);
579 [ + + ]: 1102 : if (s->active_pid == 0)
580 : 979 : s->active_pid = MyProcPid;
581 : 1102 : active_pid = s->active_pid;
582 : 1102 : SpinLockRelease(&s->mutex);
583 : : }
584 : : else
1395 fujii@postgresql.org 585 :UBC 0 : active_pid = MyProcPid;
1395 fujii@postgresql.org 586 :CBC 1102 : LWLockRelease(ReplicationSlotControlLock);
587 : :
588 : : /*
589 : : * If we found the slot but it's already active in another process, we
590 : : * wait until the owning process signals us that it's been released, or
591 : : * error out.
592 : : */
2684 peter_e@gmx.net 593 [ - + ]: 1102 : if (active_pid != MyProcPid)
594 : : {
1038 alvherre@alvh.no-ip. 595 [ # # ]:UBC 0 : if (!nowait)
596 : : {
597 : : /* Wait here until we get signaled, and then restart */
1035 598 : 0 : ConditionVariableSleep(&s->active_cv,
599 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
600 : 0 : ConditionVariableCancelSleep();
601 : 0 : goto retry;
602 : : }
603 : :
604 [ # # ]: 0 : ereport(ERROR,
605 : : (errcode(ERRCODE_OBJECT_IN_USE),
606 : : errmsg("replication slot \"%s\" is active for PID %d",
607 : : NameStr(s->data.name), active_pid)));
608 : : }
1038 alvherre@alvh.no-ip. 609 [ + + ]:CBC 1102 : else if (!nowait)
1068 tgl@sss.pgh.pa.us 610 : 230 : ConditionVariableCancelSleep(); /* no sleep needed after all */
611 : :
612 : : /* Let everybody know we've modified this slot */
1395 fujii@postgresql.org 613 : 1102 : ConditionVariableBroadcast(&s->active_cv);
614 : :
615 : : /* We made this slot active, so it's ours now. */
616 : 1102 : MyReplicationSlot = s;
617 : :
618 : : /*
619 : : * The call to pgstat_acquire_replslot() protects against stats for a
620 : : * different slot, from before a restart or such, being present during
621 : : * pgstat_report_replslot().
622 : : */
739 andres@anarazel.de 623 [ + + ]: 1102 : if (SlotIsLogical(s))
624 : 919 : pgstat_acquire_replslot(s);
625 : :
626 : : /*
627 : : * Reset the time since the slot has become inactive as the slot is active
628 : : * now.
629 : : */
20 akapila@postgresql.o 630 [ - + ]:GNC 1102 : SpinLockAcquire(&s->mutex);
18 631 : 1102 : s->inactive_since = 0;
20 632 : 1102 : SpinLockRelease(&s->mutex);
633 : :
145 634 [ + + ]: 1102 : if (am_walsender)
635 : : {
636 [ + - + - : 750 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
637 : : SlotIsLogical(s)
638 : : ? errmsg("acquired logical replication slot \"%s\"",
639 : : NameStr(s->data.name))
640 : : : errmsg("acquired physical replication slot \"%s\"",
641 : : NameStr(s->data.name)));
642 : : }
3726 rhaas@postgresql.org 643 :CBC 1102 : }
644 : :
645 : : /*
646 : : * Release the replication slot that this backend considers to own.
647 : : *
648 : : * This or another backend can re-acquire the slot later.
649 : : * Resources this slot requires will be preserved.
650 : : */
651 : : void
652 : 1308 : ReplicationSlotRelease(void)
653 : : {
654 : 1308 : ReplicationSlot *slot = MyReplicationSlot;
145 akapila@postgresql.o 655 :GNC 1308 : char *slotname = NULL; /* keep compiler quiet */
656 : 1308 : bool is_logical = false; /* keep compiler quiet */
20 657 : 1308 : TimestampTz now = 0;
658 : :
3281 andres@anarazel.de 659 [ + - - + ]:CBC 1308 : Assert(slot != NULL && slot->active_pid != 0);
660 : :
145 akapila@postgresql.o 661 [ + + ]:GNC 1308 : if (am_walsender)
662 : : {
663 : 906 : slotname = pstrdup(NameStr(slot->data.name));
664 : 906 : is_logical = SlotIsLogical(slot);
665 : : }
666 : :
3695 rhaas@postgresql.org 667 [ + + ]:CBC 1308 : if (slot->data.persistency == RS_EPHEMERAL)
668 : : {
669 : : /*
670 : : * Delete the slot. There is no !PANIC case where this is allowed to
671 : : * fail, all that may happen is an incomplete cleanup of the on-disk
672 : : * data.
673 : : */
674 : 5 : ReplicationSlotDropAcquired();
675 : : }
676 : :
677 : : /*
678 : : * If slot needed to temporarily restrain both data and catalog xmin to
679 : : * create the catalog snapshot, remove that temporary constraint.
680 : : * Snapshots can only be exported while the initial snapshot is still
681 : : * acquired.
682 : : */
2548 andres@anarazel.de 683 [ + + ]: 1308 : if (!TransactionIdIsValid(slot->data.xmin) &&
684 [ + + ]: 1295 : TransactionIdIsValid(slot->effective_xmin))
685 : : {
686 [ - + ]: 169 : SpinLockAcquire(&slot->mutex);
687 : 169 : slot->effective_xmin = InvalidTransactionId;
688 : 169 : SpinLockRelease(&slot->mutex);
689 : 169 : ReplicationSlotsComputeRequiredXmin(false);
690 : : }
691 : :
692 : : /*
693 : : * Set the time since the slot has become inactive. We get the current
694 : : * time beforehand to avoid system call while holding the spinlock.
695 : : */
9 akapila@postgresql.o 696 :GNC 1308 : now = GetCurrentTimestamp();
697 : :
2455 alvherre@alvh.no-ip. 698 [ + + ]:CBC 1308 : if (slot->data.persistency == RS_PERSISTENT)
699 : : {
700 : : /*
701 : : * Mark persistent slot inactive. We're not freeing it, just
702 : : * disconnecting, but wake up others that may be waiting for it.
703 : : */
704 [ - + ]: 1056 : SpinLockAcquire(&slot->mutex);
705 : 1056 : slot->active_pid = 0;
18 akapila@postgresql.o 706 :GNC 1056 : slot->inactive_since = now;
2455 alvherre@alvh.no-ip. 707 :CBC 1056 : SpinLockRelease(&slot->mutex);
708 : 1056 : ConditionVariableBroadcast(&slot->active_cv);
709 : : }
710 : : else
711 : : {
20 akapila@postgresql.o 712 [ - + ]:GNC 252 : SpinLockAcquire(&slot->mutex);
18 713 : 252 : slot->inactive_since = now;
20 714 : 252 : SpinLockRelease(&slot->mutex);
715 : : }
716 : :
3695 rhaas@postgresql.org 717 :CBC 1308 : MyReplicationSlot = NULL;
718 : :
719 : : /* might not have been set when we've been a plain slot */
885 alvherre@alvh.no-ip. 720 : 1308 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1245 721 : 1308 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
722 : 1308 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
3695 rhaas@postgresql.org 723 : 1308 : LWLockRelease(ProcArrayLock);
724 : :
145 akapila@postgresql.o 725 [ + + ]:GNC 1308 : if (am_walsender)
726 : : {
727 [ + - + - : 906 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
728 : : is_logical
729 : : ? errmsg("released logical replication slot \"%s\"",
730 : : slotname)
731 : : : errmsg("released physical replication slot \"%s\"",
732 : : slotname));
733 : :
734 : 906 : pfree(slotname);
735 : : }
3726 rhaas@postgresql.org 736 :CBC 1308 : }
737 : :
738 : : /*
739 : : * Cleanup all temporary slots created in current session.
740 : : */
741 : : void
2566 andres@anarazel.de 742 : 38061 : ReplicationSlotCleanup(void)
743 : : {
744 : : int i;
745 : :
2684 peter_e@gmx.net 746 [ + - ]: 38061 : Assert(MyReplicationSlot == NULL);
747 : :
2455 alvherre@alvh.no-ip. 748 : 38061 : restart:
749 : 38185 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2684 peter_e@gmx.net 750 [ + + ]: 413704 : for (i = 0; i < max_replication_slots; i++)
751 : : {
752 : 375643 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
753 : :
2455 alvherre@alvh.no-ip. 754 [ + + ]: 375643 : if (!s->in_use)
755 : 363882 : continue;
756 : :
757 [ + + ]: 11761 : SpinLockAcquire(&s->mutex);
2684 peter_e@gmx.net 758 [ + + ]: 11761 : if (s->active_pid == MyProcPid)
759 : : {
2455 alvherre@alvh.no-ip. 760 [ - + ]: 124 : Assert(s->data.persistency == RS_TEMPORARY);
761 : 124 : SpinLockRelease(&s->mutex);
762 : 124 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
763 : :
2684 peter_e@gmx.net 764 : 124 : ReplicationSlotDropPtr(s);
765 : :
2455 alvherre@alvh.no-ip. 766 : 124 : ConditionVariableBroadcast(&s->active_cv);
767 : 124 : goto restart;
768 : : }
769 : : else
770 : 11637 : SpinLockRelease(&s->mutex);
771 : : }
772 : :
773 : 38061 : LWLockRelease(ReplicationSlotControlLock);
2684 peter_e@gmx.net 774 : 38061 : }
775 : :
776 : : /*
777 : : * Permanently drop replication slot identified by the passed in name.
778 : : */
779 : : void
2455 alvherre@alvh.no-ip. 780 : 343 : ReplicationSlotDrop(const char *name, bool nowait)
781 : : {
3695 rhaas@postgresql.org 782 [ - + ]: 343 : Assert(MyReplicationSlot == NULL);
783 : :
1038 alvherre@alvh.no-ip. 784 : 343 : ReplicationSlotAcquire(name, nowait);
785 : :
786 : : /*
787 : : * Do not allow users to drop the slots which are currently being synced
788 : : * from the primary to the standby.
789 : : */
60 akapila@postgresql.o 790 [ + + + + ]:GNC 338 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
791 [ + - ]: 1 : ereport(ERROR,
792 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
793 : : errmsg("cannot drop replication slot \"%s\"", name),
794 : : errdetail("This slot is being synced from the primary server."));
795 : :
3695 rhaas@postgresql.org 796 :CBC 337 : ReplicationSlotDropAcquired();
797 : 337 : }
798 : :
799 : : /*
800 : : * Change the definition of the slot identified by the specified name.
801 : : */
802 : : void
76 akapila@postgresql.o 803 :GNC 10 : ReplicationSlotAlter(const char *name, bool failover)
804 : : {
805 [ - + ]: 10 : Assert(MyReplicationSlot == NULL);
806 : :
807 : 10 : ReplicationSlotAcquire(name, false);
808 : :
809 [ - + ]: 10 : if (SlotIsPhysical(MyReplicationSlot))
76 akapila@postgresql.o 810 [ # # ]:UNC 0 : ereport(ERROR,
811 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
812 : : errmsg("cannot use %s with a physical replication slot",
813 : : "ALTER_REPLICATION_SLOT"));
814 : :
60 akapila@postgresql.o 815 [ + + ]:GNC 10 : if (RecoveryInProgress())
816 : : {
817 : : /*
818 : : * Do not allow users to alter the slots which are currently being
819 : : * synced from the primary to the standby.
820 : : */
821 [ + - ]: 1 : if (MyReplicationSlot->data.synced)
822 [ + - ]: 1 : ereport(ERROR,
823 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
824 : : errmsg("cannot alter replication slot \"%s\"", name),
825 : : errdetail("This slot is being synced from the primary server."));
826 : :
827 : : /*
828 : : * Do not allow users to enable failover on the standby as we do not
829 : : * support sync to the cascading standby.
830 : : */
60 akapila@postgresql.o 831 [ # # ]:UNC 0 : if (failover)
832 [ # # ]: 0 : ereport(ERROR,
833 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
834 : : errmsg("cannot enable failover for a replication slot"
835 : : " on the standby"));
836 : : }
837 : :
838 : : /*
839 : : * Do not allow users to enable failover for temporary slots as we do not
840 : : * support syncing temporary slots to the standby.
841 : : */
60 akapila@postgresql.o 842 [ + + - + ]:GNC 9 : if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
60 akapila@postgresql.o 843 [ # # ]:UNC 0 : ereport(ERROR,
844 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
845 : : errmsg("cannot enable failover for a temporary replication slot"));
846 : :
67 akapila@postgresql.o 847 [ + + ]:GNC 9 : if (MyReplicationSlot->data.failover != failover)
848 : : {
849 [ - + ]: 3 : SpinLockAcquire(&MyReplicationSlot->mutex);
850 : 3 : MyReplicationSlot->data.failover = failover;
851 : 3 : SpinLockRelease(&MyReplicationSlot->mutex);
852 : :
853 : 3 : ReplicationSlotMarkDirty();
854 : 3 : ReplicationSlotSave();
855 : : }
856 : :
76 857 : 9 : ReplicationSlotRelease();
858 : 9 : }
859 : :
860 : : /*
861 : : * Permanently drop the currently acquired replication slot.
862 : : */
863 : : void
3695 rhaas@postgresql.org 864 :CBC 349 : ReplicationSlotDropAcquired(void)
865 : : {
866 : 349 : ReplicationSlot *slot = MyReplicationSlot;
867 : :
868 [ - + ]: 349 : Assert(MyReplicationSlot != NULL);
869 : :
870 : : /* slot isn't acquired anymore */
871 : 349 : MyReplicationSlot = NULL;
872 : :
2684 peter_e@gmx.net 873 : 349 : ReplicationSlotDropPtr(slot);
874 : 349 : }
875 : :
876 : : /*
877 : : * Permanently drop the replication slot which will be released by the point
878 : : * this function returns.
879 : : */
880 : : static void
881 : 473 : ReplicationSlotDropPtr(ReplicationSlot *slot)
882 : : {
883 : : char path[MAXPGPATH];
884 : : char tmppath[MAXPGPATH];
885 : :
886 : : /*
887 : : * If some other backend ran this code concurrently with us, we might try
888 : : * to delete a slot with a certain name while someone else was trying to
889 : : * create a slot with the same name.
890 : : */
3726 rhaas@postgresql.org 891 : 473 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
892 : :
893 : : /* Generate pathnames. */
894 : 473 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
895 : 473 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
896 : :
897 : : /*
898 : : * Rename the slot directory on disk, so that we'll no longer recognize
899 : : * this as a valid slot. Note that if this fails, we've got to mark the
900 : : * slot inactive before bailing out. If we're dropping an ephemeral or a
901 : : * temporary slot, we better never fail hard as the caller won't expect
902 : : * the slot to survive and this might get called during error handling.
903 : : */
3695 904 [ + - ]: 473 : if (rename(path, tmppath) == 0)
905 : : {
906 : : /*
907 : : * We need to fsync() the directory we just renamed and its parent to
908 : : * make sure that our changes are on disk in a crash-safe fashion. If
909 : : * fsync() fails, we can't be sure whether the changes are on disk or
910 : : * not. For now, we handle that by panicking;
911 : : * StartupReplicationSlots() will try to straighten it out after
912 : : * restart.
913 : : */
914 : 473 : START_CRIT_SECTION();
915 : 473 : fsync_fname(tmppath, true);
916 : 473 : fsync_fname("pg_replslot", true);
917 [ - + ]: 473 : END_CRIT_SECTION();
918 : : }
919 : : else
920 : : {
2684 peter_e@gmx.net 921 :UBC 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
922 : :
3726 rhaas@postgresql.org 923 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
3113 924 : 0 : slot->active_pid = 0;
3726 925 : 0 : SpinLockRelease(&slot->mutex);
926 : :
927 : : /* wake up anyone waiting on this slot */
2455 alvherre@alvh.no-ip. 928 : 0 : ConditionVariableBroadcast(&slot->active_cv);
929 : :
3695 rhaas@postgresql.org 930 [ # # # # ]: 0 : ereport(fail_softly ? WARNING : ERROR,
931 : : (errcode_for_file_access(),
932 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
933 : : path, tmppath)));
934 : : }
935 : :
936 : : /*
937 : : * The slot is definitely gone. Lock out concurrent scans of the array
938 : : * long enough to kill it. It's OK to clear the active PID here without
939 : : * grabbing the mutex because nobody else can be scanning the array here,
940 : : * and nobody can be attached to this slot and thus access it without
941 : : * scanning the array.
942 : : *
943 : : * Also wake up processes waiting for it.
944 : : */
3726 rhaas@postgresql.org 945 :CBC 473 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
3281 andres@anarazel.de 946 : 473 : slot->active_pid = 0;
3726 rhaas@postgresql.org 947 : 473 : slot->in_use = false;
948 : 473 : LWLockRelease(ReplicationSlotControlLock);
2455 alvherre@alvh.no-ip. 949 : 473 : ConditionVariableBroadcast(&slot->active_cv);
950 : :
951 : : /*
952 : : * Slot is dead and doesn't prevent resource removal anymore, recompute
953 : : * limits.
954 : : */
3695 rhaas@postgresql.org 955 : 473 : ReplicationSlotsComputeRequiredXmin(false);
3726 956 : 473 : ReplicationSlotsComputeRequiredLSN();
957 : :
958 : : /*
959 : : * If removing the directory fails, the worst thing that will happen is
960 : : * that the user won't be able to create a new slot with the same name
961 : : * until the next server restart. We warn about it, but that's all.
962 : : */
963 [ - + ]: 473 : if (!rmtree(tmppath, true))
3726 rhaas@postgresql.org 964 [ # # ]:UBC 0 : ereport(WARNING,
965 : : (errmsg("could not remove directory \"%s\"", tmppath)));
966 : :
967 : : /*
968 : : * Drop the statistics entry for the replication slot. Do this while
969 : : * holding ReplicationSlotAllocationLock so that we don't drop a
970 : : * statistics entry for another slot with the same name just created in
971 : : * another session.
972 : : */
1284 akapila@postgresql.o 973 [ + + ]:CBC 473 : if (SlotIsLogical(slot))
739 andres@anarazel.de 974 : 341 : pgstat_drop_replslot(slot);
975 : :
976 : : /*
977 : : * We release this at the very end, so that nobody starts trying to create
978 : : * a slot while we're still cleaning up the detritus of the old one.
979 : : */
3726 rhaas@postgresql.org 980 : 473 : LWLockRelease(ReplicationSlotAllocationLock);
981 : 473 : }
982 : :
983 : : /*
984 : : * Serialize the currently acquired slot's state from memory to disk, thereby
985 : : * guaranteeing the current state will survive a crash.
986 : : */
987 : : void
988 : 1219 : ReplicationSlotSave(void)
989 : : {
990 : : char path[MAXPGPATH];
991 : :
992 [ - + ]: 1219 : Assert(MyReplicationSlot != NULL);
993 : :
994 : 1219 : sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
995 : 1219 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
996 : 1219 : }
997 : :
998 : : /*
999 : : * Signal that it would be useful if the currently acquired slot would be
1000 : : * flushed out to disk.
1001 : : *
1002 : : * Note that the actual flush to disk can be delayed for a long time, if
1003 : : * required for correctness explicitly do a ReplicationSlotSave().
1004 : : */
1005 : : void
1006 : 22558 : ReplicationSlotMarkDirty(void)
1007 : : {
3113 1008 : 22558 : ReplicationSlot *slot = MyReplicationSlot;
1009 : :
3726 1010 [ - + ]: 22558 : Assert(MyReplicationSlot != NULL);
1011 : :
3113 1012 [ - + ]: 22558 : SpinLockAcquire(&slot->mutex);
1013 : 22558 : MyReplicationSlot->just_dirtied = true;
1014 : 22558 : MyReplicationSlot->dirty = true;
1015 : 22558 : SpinLockRelease(&slot->mutex);
3726 1016 : 22558 : }
1017 : :
1018 : : /*
1019 : : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1020 : : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1021 : : */
1022 : : void
3695 1023 : 389 : ReplicationSlotPersist(void)
1024 : : {
1025 : 389 : ReplicationSlot *slot = MyReplicationSlot;
1026 : :
1027 [ - + ]: 389 : Assert(slot != NULL);
1028 [ - + ]: 389 : Assert(slot->data.persistency != RS_PERSISTENT);
1029 : :
3113 1030 [ - + ]: 389 : SpinLockAcquire(&slot->mutex);
1031 : 389 : slot->data.persistency = RS_PERSISTENT;
1032 : 389 : SpinLockRelease(&slot->mutex);
1033 : :
3695 1034 : 389 : ReplicationSlotMarkDirty();
1035 : 389 : ReplicationSlotSave();
1036 : 389 : }
1037 : :
1038 : : /*
1039 : : * Compute the oldest xmin across all slots and store it in the ProcArray.
1040 : : *
1041 : : * If already_locked is true, ProcArrayLock has already been acquired
1042 : : * exclusively.
1043 : : */
1044 : : void
1045 : 2028 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1046 : : {
1047 : : int i;
3726 1048 : 2028 : TransactionId agg_xmin = InvalidTransactionId;
3695 1049 : 2028 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1050 : :
3726 1051 [ - + ]: 2028 : Assert(ReplicationSlotCtl != NULL);
1052 : :
2548 andres@anarazel.de 1053 : 2028 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1054 : :
3726 rhaas@postgresql.org 1055 [ + + ]: 20379 : for (i = 0; i < max_replication_slots; i++)
1056 : : {
1057 : 18351 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1058 : : TransactionId effective_xmin;
1059 : : TransactionId effective_catalog_xmin;
1060 : : bool invalidated;
1061 : :
1062 [ + + ]: 18351 : if (!s->in_use)
1063 : 16393 : continue;
1064 : :
3113 1065 [ - + ]: 1958 : SpinLockAcquire(&s->mutex);
1066 : 1958 : effective_xmin = s->effective_xmin;
1067 : 1958 : effective_catalog_xmin = s->effective_catalog_xmin;
373 andres@anarazel.de 1068 : 1958 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3113 rhaas@postgresql.org 1069 : 1958 : SpinLockRelease(&s->mutex);
1070 : :
1071 : : /* invalidated slots need not apply */
509 alvherre@alvh.no-ip. 1072 [ + + ]: 1958 : if (invalidated)
1073 : 21 : continue;
1074 : :
1075 : : /* check the data xmin */
3726 rhaas@postgresql.org 1076 [ + + + + ]: 1937 : if (TransactionIdIsValid(effective_xmin) &&
1077 [ - + ]: 11 : (!TransactionIdIsValid(agg_xmin) ||
1078 : 11 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1079 : 251 : agg_xmin = effective_xmin;
1080 : :
1081 : : /* check the catalog xmin */
3695 1082 [ + + + + ]: 1937 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1083 [ + + ]: 864 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1084 : 864 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1085 : 997 : agg_catalog_xmin = effective_catalog_xmin;
1086 : : }
1087 : :
2548 andres@anarazel.de 1088 : 2028 : LWLockRelease(ReplicationSlotControlLock);
1089 : :
3695 rhaas@postgresql.org 1090 : 2028 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
3726 1091 : 2028 : }
1092 : :
1093 : : /*
1094 : : * Compute the oldest restart LSN across all slots and inform xlog module.
1095 : : *
1096 : : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1097 : : * purpose, we don't try to account for that, because this module doesn't
1098 : : * know what to compare against.
1099 : : */
1100 : : void
1101 : 22907 : ReplicationSlotsComputeRequiredLSN(void)
1102 : : {
1103 : : int i;
3631 bruce@momjian.us 1104 : 22907 : XLogRecPtr min_required = InvalidXLogRecPtr;
1105 : :
3726 rhaas@postgresql.org 1106 [ - + ]: 22907 : Assert(ReplicationSlotCtl != NULL);
1107 : :
1108 : 22907 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1109 [ + + ]: 248264 : for (i = 0; i < max_replication_slots; i++)
1110 : : {
1111 : 225357 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1112 : : XLogRecPtr restart_lsn;
1113 : : bool invalidated;
1114 : :
1115 [ + + ]: 225357 : if (!s->in_use)
1116 : 202537 : continue;
1117 : :
3113 1118 [ + + ]: 22820 : SpinLockAcquire(&s->mutex);
1119 : 22820 : restart_lsn = s->data.restart_lsn;
373 andres@anarazel.de 1120 : 22820 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3113 rhaas@postgresql.org 1121 : 22820 : SpinLockRelease(&s->mutex);
1122 : :
1123 : : /* invalidated slots need not apply */
373 andres@anarazel.de 1124 [ + + ]: 22820 : if (invalidated)
1125 : 22 : continue;
1126 : :
3726 rhaas@postgresql.org 1127 [ + + + + ]: 22798 : if (restart_lsn != InvalidXLogRecPtr &&
1128 [ + + ]: 873 : (min_required == InvalidXLogRecPtr ||
1129 : : restart_lsn < min_required))
1130 : 21969 : min_required = restart_lsn;
1131 : : }
1132 : 22907 : LWLockRelease(ReplicationSlotControlLock);
1133 : :
1134 : 22907 : XLogSetReplicationSlotMinimumLSN(min_required);
1135 : 22907 : }
1136 : :
1137 : : /*
1138 : : * Compute the oldest WAL LSN required by *logical* decoding slots..
1139 : : *
1140 : : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1141 : : * slots exist.
1142 : : *
1143 : : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1144 : : * ignores physical replication slots.
1145 : : *
1146 : : * The results aren't required frequently, so we don't maintain a precomputed
1147 : : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1148 : : */
1149 : : XLogRecPtr
3695 1150 : 2306 : ReplicationSlotsComputeLogicalRestartLSN(void)
1151 : : {
1152 : 2306 : XLogRecPtr result = InvalidXLogRecPtr;
1153 : : int i;
1154 : :
1155 [ + + ]: 2306 : if (max_replication_slots <= 0)
3695 rhaas@postgresql.org 1156 :GBC 2 : return InvalidXLogRecPtr;
1157 : :
3695 rhaas@postgresql.org 1158 :CBC 2304 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1159 : :
1160 [ + + ]: 24754 : for (i = 0; i < max_replication_slots; i++)
1161 : : {
1162 : : ReplicationSlot *s;
1163 : : XLogRecPtr restart_lsn;
1164 : : bool invalidated;
1165 : :
1166 : 22450 : s = &ReplicationSlotCtl->replication_slots[i];
1167 : :
1168 : : /* cannot change while ReplicationSlotCtlLock is held */
1169 [ + + ]: 22450 : if (!s->in_use)
1170 : 21990 : continue;
1171 : :
1172 : : /* we're only interested in logical slots */
3169 andres@anarazel.de 1173 [ + + ]: 460 : if (!SlotIsLogical(s))
3695 rhaas@postgresql.org 1174 : 264 : continue;
1175 : :
1176 : : /* read once, it's ok if it increases while we're checking */
1177 [ - + ]: 196 : SpinLockAcquire(&s->mutex);
1178 : 196 : restart_lsn = s->data.restart_lsn;
373 andres@anarazel.de 1179 : 196 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3695 rhaas@postgresql.org 1180 : 196 : SpinLockRelease(&s->mutex);
1181 : :
1182 : : /* invalidated slots need not apply */
373 andres@anarazel.de 1183 [ + + ]: 196 : if (invalidated)
1184 : 4 : continue;
1185 : :
1468 alvherre@alvh.no-ip. 1186 [ - + ]: 192 : if (restart_lsn == InvalidXLogRecPtr)
1468 alvherre@alvh.no-ip. 1187 :UBC 0 : continue;
1188 : :
3695 rhaas@postgresql.org 1189 [ + + + + ]:CBC 192 : if (result == InvalidXLogRecPtr ||
1190 : : restart_lsn < result)
1191 : 156 : result = restart_lsn;
1192 : : }
1193 : :
1194 : 2304 : LWLockRelease(ReplicationSlotControlLock);
1195 : :
1196 : 2304 : return result;
1197 : : }
1198 : :
1199 : : /*
1200 : : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1201 : : * passed database oid.
1202 : : *
1203 : : * Returns true if there are any slots referencing the database. *nslots will
1204 : : * be set to the absolute number of slots in the database, *nactive to ones
1205 : : * currently active.
1206 : : */
1207 : : bool
1208 : 31 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1209 : : {
1210 : : int i;
1211 : :
1212 : 31 : *nslots = *nactive = 0;
1213 : :
1214 [ - + ]: 31 : if (max_replication_slots <= 0)
3695 rhaas@postgresql.org 1215 :UBC 0 : return false;
1216 : :
3695 rhaas@postgresql.org 1217 :CBC 31 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1218 [ + + ]: 312 : for (i = 0; i < max_replication_slots; i++)
1219 : : {
1220 : : ReplicationSlot *s;
1221 : :
1222 : 281 : s = &ReplicationSlotCtl->replication_slots[i];
1223 : :
1224 : : /* cannot change while ReplicationSlotCtlLock is held */
1225 [ + + ]: 281 : if (!s->in_use)
1226 : 264 : continue;
1227 : :
1228 : : /* only logical slots are database specific, skip */
3169 andres@anarazel.de 1229 [ + + ]: 17 : if (!SlotIsLogical(s))
3691 bruce@momjian.us 1230 : 9 : continue;
1231 : :
1232 : : /* not our database, skip */
3695 rhaas@postgresql.org 1233 [ + + ]: 8 : if (s->data.database != dboid)
1234 : 5 : continue;
1235 : :
1236 : : /* NB: intentionally counting invalidated slots */
1237 : :
1238 : : /* count slots with spinlock held */
1239 [ - + ]: 3 : SpinLockAcquire(&s->mutex);
1240 : 3 : (*nslots)++;
3281 andres@anarazel.de 1241 [ + + ]: 3 : if (s->active_pid != 0)
3695 rhaas@postgresql.org 1242 : 1 : (*nactive)++;
1243 : 3 : SpinLockRelease(&s->mutex);
1244 : : }
1245 : 31 : LWLockRelease(ReplicationSlotControlLock);
1246 : :
1247 [ + + ]: 31 : if (*nslots > 0)
1248 : 3 : return true;
1249 : 28 : return false;
1250 : : }
1251 : :
1252 : : /*
1253 : : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1254 : : * passed database oid. The caller should hold an exclusive lock on the
1255 : : * pg_database oid for the database to prevent creation of new slots on the db
1256 : : * or replay from existing slots.
1257 : : *
1258 : : * Another session that concurrently acquires an existing slot on the target DB
1259 : : * (most likely to drop it) may cause this function to ERROR. If that happens
1260 : : * it may have dropped some but not all slots.
1261 : : *
1262 : : * This routine isn't as efficient as it could be - but we don't drop
1263 : : * databases often, especially databases with lots of slots.
1264 : : */
1265 : : void
2574 simon@2ndQuadrant.co 1266 : 59 : ReplicationSlotsDropDBSlots(Oid dboid)
1267 : : {
1268 : : int i;
1269 : :
1270 [ + - ]: 59 : if (max_replication_slots <= 0)
2574 simon@2ndQuadrant.co 1271 :UBC 0 : return;
1272 : :
2574 simon@2ndQuadrant.co 1273 :CBC 59 : restart:
1274 : 64 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1275 [ + + ]: 612 : for (i = 0; i < max_replication_slots; i++)
1276 : : {
1277 : : ReplicationSlot *s;
1278 : : char *slotname;
1279 : : int active_pid;
1280 : :
1281 : 553 : s = &ReplicationSlotCtl->replication_slots[i];
1282 : :
1283 : : /* cannot change while ReplicationSlotCtlLock is held */
1284 [ + + ]: 553 : if (!s->in_use)
1285 : 527 : continue;
1286 : :
1287 : : /* only logical slots are database specific, skip */
1288 [ + + ]: 26 : if (!SlotIsLogical(s))
1289 : 10 : continue;
1290 : :
1291 : : /* not our database, skip */
1292 [ + + ]: 16 : if (s->data.database != dboid)
1293 : 11 : continue;
1294 : :
1295 : : /* NB: intentionally including invalidated slots */
1296 : :
1297 : : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1298 [ - + ]: 5 : SpinLockAcquire(&s->mutex);
1299 : : /* can't change while ReplicationSlotControlLock is held */
2566 andres@anarazel.de 1300 : 5 : slotname = NameStr(s->data.name);
2574 simon@2ndQuadrant.co 1301 : 5 : active_pid = s->active_pid;
1302 [ + - ]: 5 : if (active_pid == 0)
1303 : : {
1304 : 5 : MyReplicationSlot = s;
1305 : 5 : s->active_pid = MyProcPid;
1306 : : }
1307 : 5 : SpinLockRelease(&s->mutex);
1308 : :
1309 : : /*
1310 : : * Even though we hold an exclusive lock on the database object a
1311 : : * logical slot for that DB can still be active, e.g. if it's
1312 : : * concurrently being dropped by a backend connected to another DB.
1313 : : *
1314 : : * That's fairly unlikely in practice, so we'll just bail out.
1315 : : *
1316 : : * The slot sync worker holds a shared lock on the database before
1317 : : * operating on synced logical slots to avoid conflict with the drop
1318 : : * happening here. The persistent synced slots are thus safe but there
1319 : : * is a possibility that the slot sync worker has created a temporary
1320 : : * slot (which stays active even on release) and we are trying to drop
1321 : : * that here. In practice, the chances of hitting this scenario are
1322 : : * less as during slot synchronization, the temporary slot is
1323 : : * immediately converted to persistent and thus is safe due to the
1324 : : * shared lock taken on the database. So, we'll just bail out in such
1325 : : * a case.
1326 : : *
1327 : : * XXX: We can consider shutting down the slot sync worker before
1328 : : * trying to drop synced temporary slots here.
1329 : : */
1330 [ - + ]: 5 : if (active_pid)
2566 andres@anarazel.de 1331 [ # # ]:UBC 0 : ereport(ERROR,
1332 : : (errcode(ERRCODE_OBJECT_IN_USE),
1333 : : errmsg("replication slot \"%s\" is active for PID %d",
1334 : : slotname, active_pid)));
1335 : :
1336 : : /*
1337 : : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1338 : : * holding ReplicationSlotControlLock over filesystem operations,
1339 : : * release ReplicationSlotControlLock and use
1340 : : * ReplicationSlotDropAcquired.
1341 : : *
1342 : : * As that means the set of slots could change, restart scan from the
1343 : : * beginning each time we release the lock.
1344 : : */
2574 simon@2ndQuadrant.co 1345 :CBC 5 : LWLockRelease(ReplicationSlotControlLock);
1346 : 5 : ReplicationSlotDropAcquired();
1347 : 5 : goto restart;
1348 : : }
1349 : 59 : LWLockRelease(ReplicationSlotControlLock);
1350 : : }
1351 : :
1352 : :
1353 : : /*
1354 : : * Check whether the server's configuration supports using replication
1355 : : * slots.
1356 : : */
1357 : : void
3726 rhaas@postgresql.org 1358 : 1511 : CheckSlotRequirements(void)
1359 : : {
1360 : : /*
1361 : : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1362 : : * needs the same check.
1363 : : */
1364 : :
1365 [ - + ]: 1511 : if (max_replication_slots == 0)
3726 rhaas@postgresql.org 1366 [ # # ]:UBC 0 : ereport(ERROR,
1367 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1368 : : errmsg("replication slots can only be used if max_replication_slots > 0")));
1369 : :
2967 peter_e@gmx.net 1370 [ - + ]:CBC 1511 : if (wal_level < WAL_LEVEL_REPLICA)
3726 rhaas@postgresql.org 1371 [ # # ]:UBC 0 : ereport(ERROR,
1372 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1373 : : errmsg("replication slots can only be used if wal_level >= replica")));
3726 rhaas@postgresql.org 1374 :CBC 1511 : }
1375 : :
1376 : : /*
1377 : : * Check whether the user has privilege to use replication slots.
1378 : : */
1379 : : void
943 michael@paquier.xyz 1380 : 495 : CheckSlotPermissions(void)
1381 : : {
395 peter@eisentraut.org 1382 [ + + ]: 495 : if (!has_rolreplication(GetUserId()))
943 michael@paquier.xyz 1383 [ + - ]: 5 : ereport(ERROR,
1384 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1385 : : errmsg("permission denied to use replication slots"),
1386 : : errdetail("Only roles with the %s attribute may use replication slots.",
1387 : : "REPLICATION")));
1388 : 490 : }
1389 : :
1390 : : /*
1391 : : * Reserve WAL for the currently active slot.
1392 : : *
1393 : : * Compute and set restart_lsn in a manner that's appropriate for the type of
1394 : : * the slot and concurrency safe.
1395 : : */
1396 : : void
3169 andres@anarazel.de 1397 : 520 : ReplicationSlotReserveWal(void)
1398 : : {
1399 : 520 : ReplicationSlot *slot = MyReplicationSlot;
1400 : :
1401 [ - + ]: 520 : Assert(slot != NULL);
1402 [ + - ]: 520 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1403 : :
1404 : : /*
1405 : : * The replication slot mechanism is used to prevent removal of required
1406 : : * WAL. As there is no interlock between this routine and checkpoints, WAL
1407 : : * segments could concurrently be removed when a now stale return value of
1408 : : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1409 : : * this happens we'll just retry.
1410 : : */
1411 : : while (true)
3169 andres@anarazel.de 1412 :UBC 0 : {
1413 : : XLogSegNo segno;
1414 : : XLogRecPtr restart_lsn;
1415 : :
1416 : : /*
1417 : : * For logical slots log a standby snapshot and start logical decoding
1418 : : * at exactly that position. That allows the slot to start up more
1419 : : * quickly. But on a standby we cannot do WAL writes, so just use the
1420 : : * replay pointer; effectively, an attempt to create a logical slot on
1421 : : * standby will cause it to wait for an xl_running_xact record to be
1422 : : * logged independently on the primary, so that a snapshot can be
1423 : : * built using the record.
1424 : : *
1425 : : * None of this is needed (or indeed helpful) for physical slots as
1426 : : * they'll start replay at the last logged checkpoint anyway. Instead
1427 : : * return the location of the last redo LSN. While that slightly
1428 : : * increases the chance that we have to retry, it's where a base
1429 : : * backup has to start replay at.
1430 : : */
372 andres@anarazel.de 1431 [ + + ]:CBC 520 : if (SlotIsPhysical(slot))
1432 : 130 : restart_lsn = GetRedoRecPtr();
1433 [ + + ]: 390 : else if (RecoveryInProgress())
1434 : 22 : restart_lsn = GetXLogReplayRecPtr(NULL);
1435 : : else
2133 michael@paquier.xyz 1436 : 368 : restart_lsn = GetXLogInsertRecPtr();
1437 : :
372 andres@anarazel.de 1438 [ - + ]: 520 : SpinLockAcquire(&slot->mutex);
1439 : 520 : slot->data.restart_lsn = restart_lsn;
1440 : 520 : SpinLockRelease(&slot->mutex);
1441 : :
1442 : : /* prevent WAL removal as fast as possible */
3169 1443 : 520 : ReplicationSlotsComputeRequiredLSN();
1444 : :
1445 : : /*
1446 : : * If all required WAL is still there, great, otherwise retry. The
1447 : : * slot should prevent further removal of WAL, unless there's a
1448 : : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1449 : : * the new restart_lsn above, so normally we should never need to loop
1450 : : * more than twice.
1451 : : */
2399 1452 : 520 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
3169 1453 [ + - ]: 520 : if (XLogGetLastRemovedSegno() < segno)
1454 : 520 : break;
1455 : : }
1456 : :
372 1457 [ + + + + ]: 520 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1458 : : {
1459 : : XLogRecPtr flushptr;
1460 : :
1461 : : /* make sure we have enough information to start */
1462 : 368 : flushptr = LogStandbySnapshot();
1463 : :
1464 : : /* and make sure it's fsynced to disk */
1465 : 368 : XLogFlush(flushptr);
1466 : : }
3169 1467 : 520 : }
1468 : :
1469 : : /*
1470 : : * Report that replication slot needs to be invalidated
1471 : : */
1472 : : static void
373 1473 : 19 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1474 : : bool terminating,
1475 : : int pid,
1476 : : NameData slotname,
1477 : : XLogRecPtr restart_lsn,
1478 : : XLogRecPtr oldestLSN,
1479 : : TransactionId snapshotConflictHorizon)
1480 : : {
1481 : : StringInfoData err_detail;
1482 : 19 : bool hint = false;
1483 : :
1484 : 19 : initStringInfo(&err_detail);
1485 : :
1486 [ + + + - : 19 : switch (cause)
- ]
1487 : : {
1488 : 4 : case RS_INVAL_WAL_REMOVED:
1489 : : {
234 peter@eisentraut.org 1490 : 4 : unsigned long long ex = oldestLSN - restart_lsn;
1491 : :
1492 : 4 : hint = true;
1493 : 4 : appendStringInfo(&err_detail,
1494 : 4 : ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1495 : : "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1496 : : ex),
1497 : 4 : LSN_FORMAT_ARGS(restart_lsn),
1498 : : ex);
1499 : 4 : break;
1500 : : }
373 andres@anarazel.de 1501 : 12 : case RS_INVAL_HORIZON:
1502 : 12 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1503 : : snapshotConflictHorizon);
1504 : 12 : break;
1505 : :
1506 : 3 : case RS_INVAL_WAL_LEVEL:
194 drowley@postgresql.o 1507 :GNC 3 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
373 andres@anarazel.de 1508 :CBC 3 : break;
373 andres@anarazel.de 1509 :UBC 0 : case RS_INVAL_NONE:
1510 : 0 : pg_unreachable();
1511 : : }
1512 : :
373 andres@anarazel.de 1513 [ + - + + :CBC 19 : ereport(LOG,
+ + ]
1514 : : terminating ?
1515 : : errmsg("terminating process %d to release replication slot \"%s\"",
1516 : : pid, NameStr(slotname)) :
1517 : : errmsg("invalidating obsolete replication slot \"%s\"",
1518 : : NameStr(slotname)),
1519 : : errdetail_internal("%s", err_detail.data),
1520 : : hint ? errhint("You might need to increase %s.", "max_slot_wal_keep_size") : 0);
1521 : :
1522 : 19 : pfree(err_detail.data);
1523 : 19 : }
1524 : :
1525 : : /*
1526 : : * Helper for InvalidateObsoleteReplicationSlots
1527 : : *
1528 : : * Acquires the given slot and mark it invalid, if necessary and possible.
1529 : : *
1530 : : * Returns whether ReplicationSlotControlLock was released in the interim (and
1531 : : * in that case we're not holding the lock at return, otherwise we are).
1532 : : *
1533 : : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1534 : : *
1535 : : * This is inherently racy, because we release the LWLock
1536 : : * for syscalls, so caller must restart if we return true.
1537 : : */
1538 : : static bool
1539 : 282 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
1540 : : ReplicationSlot *s,
1541 : : XLogRecPtr oldestLSN,
1542 : : Oid dboid, TransactionId snapshotConflictHorizon,
1543 : : bool *invalidated)
1544 : : {
1038 alvherre@alvh.no-ip. 1545 : 282 : int last_signaled_pid = 0;
1546 : 282 : bool released_lock = false;
54 michael@paquier.xyz 1547 : 282 : bool terminated = false;
3 1548 : 282 : TransactionId initial_effective_xmin = InvalidTransactionId;
1549 : 282 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
54 1550 : 282 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
23 akapila@postgresql.o 1551 :GNC 282 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1552 : :
1553 : : for (;;)
1468 alvherre@alvh.no-ip. 1554 :CBC 6 : {
1555 : : XLogRecPtr restart_lsn;
1556 : : NameData slotname;
1038 1557 : 288 : int active_pid = 0;
23 akapila@postgresql.o 1558 :GNC 288 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1559 : :
1038 alvherre@alvh.no-ip. 1560 [ - + ]:CBC 288 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1561 : :
1468 1562 [ - + ]: 288 : if (!s->in_use)
1563 : : {
1038 alvherre@alvh.no-ip. 1564 [ # # ]:UBC 0 : if (released_lock)
1565 : 0 : LWLockRelease(ReplicationSlotControlLock);
1566 : 0 : break;
1567 : : }
1568 : :
1569 : : /*
1570 : : * Check if the slot needs to be invalidated. If it needs to be
1571 : : * invalidated, and is not currently acquired, acquire it and mark it
1572 : : * as having been invalidated. We do this with the spinlock held to
1573 : : * avoid race conditions -- for example the restart_lsn could move
1574 : : * forward, or the slot could be dropped.
1575 : : */
1468 alvherre@alvh.no-ip. 1576 [ + + ]:CBC 288 : SpinLockAcquire(&s->mutex);
1577 : :
1578 : 288 : restart_lsn = s->data.restart_lsn;
1579 : :
1580 : : /* we do nothing if the slot is already invalid */
373 andres@anarazel.de 1581 [ + + ]: 288 : if (s->data.invalidated == RS_INVAL_NONE)
1582 : : {
1583 : : /*
1584 : : * The slot's mutex will be released soon, and it is possible that
1585 : : * those values change since the process holding the slot has been
1586 : : * terminated (if any), so record them here to ensure that we
1587 : : * would report the correct invalidation cause.
1588 : : */
54 michael@paquier.xyz 1589 [ + + ]: 252 : if (!terminated)
1590 : : {
1591 : 246 : initial_restart_lsn = s->data.restart_lsn;
1592 : 246 : initial_effective_xmin = s->effective_xmin;
1593 : 246 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1594 : : }
1595 : :
373 andres@anarazel.de 1596 [ + + + - : 252 : switch (cause)
- ]
1597 : : {
1598 : 225 : case RS_INVAL_WAL_REMOVED:
54 michael@paquier.xyz 1599 [ + + + + ]: 225 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1600 : : initial_restart_lsn < oldestLSN)
23 akapila@postgresql.o 1601 :GNC 4 : invalidation_cause = cause;
373 andres@anarazel.de 1602 :CBC 225 : break;
1603 : 24 : case RS_INVAL_HORIZON:
1604 [ - + ]: 24 : if (!SlotIsLogical(s))
373 andres@anarazel.de 1605 :UBC 0 : break;
1606 : : /* invalid DB oid signals a shared relation */
373 andres@anarazel.de 1607 [ + + - + ]:CBC 24 : if (dboid != InvalidOid && dboid != s->data.database)
373 andres@anarazel.de 1608 :UBC 0 : break;
54 michael@paquier.xyz 1609 [ - + - - ]:CBC 24 : if (TransactionIdIsValid(initial_effective_xmin) &&
54 michael@paquier.xyz 1610 :UBC 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1611 : : snapshotConflictHorizon))
23 akapila@postgresql.o 1612 :UNC 0 : invalidation_cause = cause;
54 michael@paquier.xyz 1613 [ + - + + ]:CBC 48 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1614 : 24 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1615 : : snapshotConflictHorizon))
23 akapila@postgresql.o 1616 :GNC 12 : invalidation_cause = cause;
373 andres@anarazel.de 1617 :CBC 24 : break;
1618 : 3 : case RS_INVAL_WAL_LEVEL:
1619 [ + - ]: 3 : if (SlotIsLogical(s))
23 akapila@postgresql.o 1620 :GNC 3 : invalidation_cause = cause;
373 andres@anarazel.de 1621 :CBC 3 : break;
373 andres@anarazel.de 1622 :UBC 0 : case RS_INVAL_NONE:
1623 : 0 : pg_unreachable();
1624 : : }
1625 : : }
1626 : :
1627 : : /*
1628 : : * The invalidation cause recorded previously should not change while
1629 : : * the process owning the slot (if any) has been terminated.
1630 : : */
23 akapila@postgresql.o 1631 [ + + + - :GNC 288 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
- + ]
1632 : : invalidation_cause_prev != invalidation_cause));
1633 : :
1634 : : /* if there's no invalidation, we're done */
1635 [ + + ]: 288 : if (invalidation_cause == RS_INVAL_NONE)
1636 : : {
1038 alvherre@alvh.no-ip. 1637 :CBC 269 : SpinLockRelease(&s->mutex);
1638 [ - + ]: 269 : if (released_lock)
1038 alvherre@alvh.no-ip. 1639 :UBC 0 : LWLockRelease(ReplicationSlotControlLock);
1038 alvherre@alvh.no-ip. 1640 :CBC 269 : break;
1641 : : }
1642 : :
1643 : 19 : slotname = s->data.name;
1644 : 19 : active_pid = s->active_pid;
1645 : :
1646 : : /*
1647 : : * If the slot can be acquired, do so and mark it invalidated
1648 : : * immediately. Otherwise we'll signal the owning process, below, and
1649 : : * retry.
1650 : : */
1651 [ + + ]: 19 : if (active_pid == 0)
1652 : : {
1653 : 13 : MyReplicationSlot = s;
1654 : 13 : s->active_pid = MyProcPid;
23 akapila@postgresql.o 1655 :GNC 13 : s->data.invalidated = invalidation_cause;
1656 : :
1657 : : /*
1658 : : * XXX: We should consider not overwriting restart_lsn and instead
1659 : : * just rely on .invalidated.
1660 : : */
1661 [ + + ]: 13 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
373 andres@anarazel.de 1662 :CBC 3 : s->data.restart_lsn = InvalidXLogRecPtr;
1663 : :
1664 : : /* Let caller know */
1003 alvherre@alvh.no-ip. 1665 : 13 : *invalidated = true;
1666 : : }
1667 : :
1038 1668 : 19 : SpinLockRelease(&s->mutex);
1669 : :
1670 : : /*
1671 : : * The logical replication slots shouldn't be invalidated as GUC
1672 : : * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
1673 : : * check_old_cluster_for_valid_slots() where we ensure that no
1674 : : * invalidated before the upgrade.
1675 : : */
156 akapila@postgresql.o 1676 [ + + + + :GNC 19 : Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
- + ]
1677 : :
1038 alvherre@alvh.no-ip. 1678 [ + + ]:CBC 19 : if (active_pid != 0)
1679 : : {
1680 : : /*
1681 : : * Prepare the sleep on the slot's condition variable before
1682 : : * releasing the lock, to close a possible race condition if the
1683 : : * slot is released before the sleep below.
1684 : : */
1685 : 6 : ConditionVariablePrepareToSleep(&s->active_cv);
1686 : :
1687 : 6 : LWLockRelease(ReplicationSlotControlLock);
1688 : 6 : released_lock = true;
1689 : :
1690 : : /*
1691 : : * Signal to terminate the process that owns the slot, if we
1692 : : * haven't already signalled it. (Avoidance of repeated
1693 : : * signalling is the only reason for there to be a loop in this
1694 : : * routine; otherwise we could rely on caller's restart loop.)
1695 : : *
1696 : : * There is the race condition that other process may own the slot
1697 : : * after its current owner process is terminated and before this
1698 : : * process owns it. To handle that, we signal only if the PID of
1699 : : * the owning process has changed from the previous time. (This
1700 : : * logic assumes that the same PID is not reused very quickly.)
1701 : : */
1702 [ + - ]: 6 : if (last_signaled_pid != active_pid)
1703 : : {
23 akapila@postgresql.o 1704 :GNC 6 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1705 : : slotname, restart_lsn,
1706 : : oldestLSN, snapshotConflictHorizon);
1707 : :
373 andres@anarazel.de 1708 [ + + ]:CBC 6 : if (MyBackendType == B_STARTUP)
1709 : 5 : (void) SendProcSignal(active_pid,
1710 : : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1711 : : INVALID_PROC_NUMBER);
1712 : : else
1713 : 1 : (void) kill(active_pid, SIGTERM);
1714 : :
1038 alvherre@alvh.no-ip. 1715 : 6 : last_signaled_pid = active_pid;
54 michael@paquier.xyz 1716 : 6 : terminated = true;
23 akapila@postgresql.o 1717 :GNC 6 : invalidation_cause_prev = invalidation_cause;
1718 : : }
1719 : :
1720 : : /* Wait until the slot is released. */
1038 alvherre@alvh.no-ip. 1721 :CBC 6 : ConditionVariableSleep(&s->active_cv,
1722 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
1723 : :
1724 : : /*
1725 : : * Re-acquire lock and start over; we expect to invalidate the
1726 : : * slot next time (unless another process acquires the slot in the
1727 : : * meantime).
1728 : : */
1729 : 6 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1730 : 6 : continue;
1731 : : }
1732 : : else
1733 : : {
1734 : : /*
1735 : : * We hold the slot now and have already invalidated it; flush it
1736 : : * to ensure that state persists.
1737 : : *
1738 : : * Don't want to hold ReplicationSlotControlLock across file
1739 : : * system operations, so release it now but be sure to tell caller
1740 : : * to restart from scratch.
1741 : : */
1742 : 13 : LWLockRelease(ReplicationSlotControlLock);
1743 : 13 : released_lock = true;
1744 : :
1745 : : /* Make sure the invalidated state persists across server restart */
1746 : 13 : ReplicationSlotMarkDirty();
1747 : 13 : ReplicationSlotSave();
1748 : 13 : ReplicationSlotRelease();
1749 : :
23 akapila@postgresql.o 1750 :GNC 13 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
1751 : : slotname, restart_lsn,
1752 : : oldestLSN, snapshotConflictHorizon);
1753 : :
1754 : : /* done with this slot for now */
1038 alvherre@alvh.no-ip. 1755 :CBC 13 : break;
1756 : : }
1757 : : }
1758 : :
1759 [ - + ]: 282 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1760 : :
1761 : 282 : return released_lock;
1762 : : }
1763 : :
1764 : : /*
1765 : : * Invalidate slots that require resources about to be removed.
1766 : : *
1767 : : * Returns true when any slot have got invalidated.
1768 : : *
1769 : : * Whether a slot needs to be invalidated depends on the cause. A slot is
1770 : : * removed if it:
1771 : : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1772 : : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1773 : : * db; dboid may be InvalidOid for shared relations
1774 : : * - RS_INVAL_WAL_LEVEL: is logical
1775 : : *
1776 : : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1777 : : */
1778 : : bool
373 andres@anarazel.de 1779 : 1169 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
1780 : : XLogSegNo oldestSegno, Oid dboid,
1781 : : TransactionId snapshotConflictHorizon)
1782 : : {
1783 : : XLogRecPtr oldestLSN;
1003 alvherre@alvh.no-ip. 1784 : 1169 : bool invalidated = false;
1785 : :
373 andres@anarazel.de 1786 [ + + - + ]: 1169 : Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1787 [ + + - + ]: 1169 : Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1788 [ - + ]: 1169 : Assert(cause != RS_INVAL_NONE);
1789 : :
1790 [ + + ]: 1169 : if (max_replication_slots == 0)
373 andres@anarazel.de 1791 :GBC 1 : return invalidated;
1792 : :
1038 alvherre@alvh.no-ip. 1793 :CBC 1168 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1794 : :
1795 : 1181 : restart:
1796 : 1181 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1797 [ + + ]: 12479 : for (int i = 0; i < max_replication_slots; i++)
1798 : : {
1799 : 11311 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1800 : :
1801 [ + + ]: 11311 : if (!s->in_use)
1802 : 11029 : continue;
1803 : :
373 andres@anarazel.de 1804 [ + + ]: 282 : if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1805 : : snapshotConflictHorizon,
1806 : : &invalidated))
1807 : : {
1808 : : /* if the lock was released, start from scratch */
1038 alvherre@alvh.no-ip. 1809 : 13 : goto restart;
1810 : : }
1811 : : }
1468 1812 : 1168 : LWLockRelease(ReplicationSlotControlLock);
1813 : :
1814 : : /*
1815 : : * If any slots have been invalidated, recalculate the resource limits.
1816 : : */
1003 1817 [ + + ]: 1168 : if (invalidated)
1818 : : {
1819 : 8 : ReplicationSlotsComputeRequiredXmin(false);
1820 : 8 : ReplicationSlotsComputeRequiredLSN();
1821 : : }
1822 : :
1823 : 1168 : return invalidated;
1824 : : }
1825 : :
1826 : : /*
1827 : : * Flush all replication slots to disk.
1828 : : *
1829 : : * It is convenient to flush dirty replication slots at the time of checkpoint.
1830 : : * Additionally, in case of a shutdown checkpoint, we also identify the slots
1831 : : * for which the confirmed_flush LSN has been updated since the last time it
1832 : : * was saved and flush them.
1833 : : */
1834 : : void
213 akapila@postgresql.o 1835 :GNC 1153 : CheckPointReplicationSlots(bool is_shutdown)
1836 : : {
1837 : : int i;
1838 : :
3442 peter_e@gmx.net 1839 [ + + ]:CBC 1153 : elog(DEBUG1, "performing replication slot checkpoint");
1840 : :
1841 : : /*
1842 : : * Prevent any slot from being created/dropped while we're active. As we
1843 : : * explicitly do *not* want to block iterating over replication_slots or
1844 : : * acquiring a slot we cannot take the control lock - but that's OK,
1845 : : * because holding ReplicationSlotAllocationLock is strictly stronger, and
1846 : : * enough to guarantee that nobody can change the in_use bits on us.
1847 : : */
3726 rhaas@postgresql.org 1848 : 1153 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1849 : :
1850 [ + + ]: 12378 : for (i = 0; i < max_replication_slots; i++)
1851 : : {
1852 : 11225 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1853 : : char path[MAXPGPATH];
1854 : :
1855 [ + + ]: 11225 : if (!s->in_use)
1856 : 10995 : continue;
1857 : :
1858 : : /* save the slot to disk, locking is handled in SaveSlotToPath() */
1859 : 230 : sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1860 : :
1861 : : /*
1862 : : * Slot's data is not flushed each time the confirmed_flush LSN is
1863 : : * updated as that could lead to frequent writes. However, we decide
1864 : : * to force a flush of all logical slot's data at the time of shutdown
1865 : : * if the confirmed_flush LSN is changed since we last flushed it to
1866 : : * disk. This helps in avoiding an unnecessary retreat of the
1867 : : * confirmed_flush LSN after restart.
1868 : : */
213 akapila@postgresql.o 1869 [ + + + + ]:GNC 230 : if (is_shutdown && SlotIsLogical(s))
1870 : : {
1871 [ - + ]: 59 : SpinLockAcquire(&s->mutex);
1872 : :
1873 [ - + ]: 59 : Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush);
1874 : :
1875 [ + - ]: 59 : if (s->data.invalidated == RS_INVAL_NONE &&
1876 [ + + ]: 59 : s->data.confirmed_flush != s->last_saved_confirmed_flush)
1877 : : {
1878 : 34 : s->just_dirtied = true;
1879 : 34 : s->dirty = true;
1880 : : }
1881 : 59 : SpinLockRelease(&s->mutex);
1882 : : }
1883 : :
3726 rhaas@postgresql.org 1884 :CBC 230 : SaveSlotToPath(s, path, LOG);
1885 : : }
1886 : 1153 : LWLockRelease(ReplicationSlotAllocationLock);
1887 : 1153 : }
1888 : :
1889 : : /*
1890 : : * Load all replication slots from disk into memory at server startup. This
1891 : : * needs to be run before we start crash recovery.
1892 : : */
1893 : : void
3594 andres@anarazel.de 1894 : 823 : StartupReplicationSlots(void)
1895 : : {
1896 : : DIR *replication_dir;
1897 : : struct dirent *replication_de;
1898 : :
3442 peter_e@gmx.net 1899 [ + + ]: 823 : elog(DEBUG1, "starting up replication slots");
1900 : :
1901 : : /* restore all slots by iterating over all on-disk entries */
3726 rhaas@postgresql.org 1902 : 823 : replication_dir = AllocateDir("pg_replslot");
1903 [ + + ]: 2528 : while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1904 : : {
1905 : : char path[MAXPGPATH + 12];
1906 : : PGFileType de_type;
1907 : :
1908 [ + + ]: 1705 : if (strcmp(replication_de->d_name, ".") == 0 ||
1909 [ + + ]: 882 : strcmp(replication_de->d_name, "..") == 0)
1910 : 1646 : continue;
1911 : :
2560 peter_e@gmx.net 1912 : 59 : snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
590 michael@paquier.xyz 1913 : 59 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1914 : :
1915 : : /* we're only creating directories here, skip if it's not our's */
1916 [ + - - + ]: 59 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
3726 rhaas@postgresql.org 1917 :UBC 0 : continue;
1918 : :
1919 : : /* we crashed while a slot was being setup or deleted, clean up */
3389 andres@anarazel.de 1920 [ - + ]:CBC 59 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
1921 : : {
3726 rhaas@postgresql.org 1922 [ # # ]:UBC 0 : if (!rmtree(path, true))
1923 : : {
1924 [ # # ]: 0 : ereport(WARNING,
1925 : : (errmsg("could not remove directory \"%s\"",
1926 : : path)));
1927 : 0 : continue;
1928 : : }
1929 : 0 : fsync_fname("pg_replslot", true);
1930 : 0 : continue;
1931 : : }
1932 : :
1933 : : /* looks like a slot in a normal state, restore */
3726 rhaas@postgresql.org 1934 :CBC 59 : RestoreSlotFromDisk(replication_de->d_name);
1935 : : }
1936 : 823 : FreeDir(replication_dir);
1937 : :
1938 : : /* currently no slots exist, we're done. */
1939 [ + + ]: 823 : if (max_replication_slots <= 0)
3726 rhaas@postgresql.org 1940 :GBC 1 : return;
1941 : :
1942 : : /* Now that we have recovered all the data, compute replication xmin */
3695 rhaas@postgresql.org 1943 :CBC 822 : ReplicationSlotsComputeRequiredXmin(false);
3726 1944 : 822 : ReplicationSlotsComputeRequiredLSN();
1945 : : }
1946 : :
1947 : : /* ----
1948 : : * Manipulation of on-disk state of replication slots
1949 : : *
1950 : : * NB: none of the routines below should take any notice whether a slot is the
1951 : : * current one or not, that's all handled a layer above.
1952 : : * ----
1953 : : */
1954 : : static void
1955 : 557 : CreateSlotOnDisk(ReplicationSlot *slot)
1956 : : {
1957 : : char tmppath[MAXPGPATH];
1958 : : char path[MAXPGPATH];
1959 : : struct stat st;
1960 : :
1961 : : /*
1962 : : * No need to take out the io_in_progress_lock, nobody else can see this
1963 : : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1964 : : * takes out the lock, if we'd take the lock here, we'd deadlock.
1965 : : */
1966 : :
1967 : 557 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1968 : 557 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1969 : :
1970 : : /*
1971 : : * It's just barely possible that some previous effort to create or drop a
1972 : : * slot with this name left a temp directory lying around. If that seems
1973 : : * to be the case, try to remove it. If the rmtree() fails, we'll error
1974 : : * out at the MakePGDirectory() below, so we don't bother checking
1975 : : * success.
1976 : : */
1977 [ - + - - ]: 557 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
3726 rhaas@postgresql.org 1978 :UBC 0 : rmtree(tmppath, true);
1979 : :
1980 : : /* Create and fsync the temporary slot directory. */
2199 sfrost@snowman.net 1981 [ - + ]:CBC 557 : if (MakePGDirectory(tmppath) < 0)
3726 rhaas@postgresql.org 1982 [ # # ]:UBC 0 : ereport(ERROR,
1983 : : (errcode_for_file_access(),
1984 : : errmsg("could not create directory \"%s\": %m",
1985 : : tmppath)));
3726 rhaas@postgresql.org 1986 :CBC 557 : fsync_fname(tmppath, true);
1987 : :
1988 : : /* Write the actual state file. */
3631 bruce@momjian.us 1989 : 557 : slot->dirty = true; /* signal that we really need to write */
3726 rhaas@postgresql.org 1990 : 557 : SaveSlotToPath(slot, tmppath, ERROR);
1991 : :
1992 : : /* Rename the directory into place. */
1993 [ - + ]: 557 : if (rename(tmppath, path) != 0)
3726 rhaas@postgresql.org 1994 [ # # ]:UBC 0 : ereport(ERROR,
1995 : : (errcode_for_file_access(),
1996 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1997 : : tmppath, path)));
1998 : :
1999 : : /*
2000 : : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2001 : : * would persist after an OS crash or not - so, force a restart. The
2002 : : * restart would try to fsync this again till it works.
2003 : : */
3726 rhaas@postgresql.org 2004 :CBC 557 : START_CRIT_SECTION();
2005 : :
2006 : 557 : fsync_fname(path, true);
2007 : 557 : fsync_fname("pg_replslot", true);
2008 : :
2009 [ - + ]: 557 : END_CRIT_SECTION();
2010 : 557 : }
2011 : :
2012 : : /*
2013 : : * Shared functionality between saving and creating a replication slot.
2014 : : */
2015 : : static void
2016 : 2006 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2017 : : {
2018 : : char tmppath[MAXPGPATH];
2019 : : char path[MAXPGPATH];
2020 : : int fd;
2021 : : ReplicationSlotOnDisk cp;
2022 : : bool was_dirty;
2023 : :
2024 : : /* first check whether there's something to write out */
3113 2025 [ - + ]: 2006 : SpinLockAcquire(&slot->mutex);
2026 : 2006 : was_dirty = slot->dirty;
2027 : 2006 : slot->just_dirtied = false;
2028 : 2006 : SpinLockRelease(&slot->mutex);
2029 : :
2030 : : /* and don't do anything if there's nothing to write */
3726 2031 [ + + ]: 2006 : if (!was_dirty)
2032 : 124 : return;
2033 : :
2998 2034 : 1882 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2035 : :
2036 : : /* silence valgrind :( */
3726 2037 : 1882 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2038 : :
2039 : 1882 : sprintf(tmppath, "%s/state.tmp", dir);
2040 : 1882 : sprintf(path, "%s/state", dir);
2041 : :
2395 peter_e@gmx.net 2042 : 1882 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3726 rhaas@postgresql.org 2043 [ - + ]: 1882 : if (fd < 0)
2044 : : {
2045 : : /*
2046 : : * If not an ERROR, then release the lock before returning. In case
2047 : : * of an ERROR, the error recovery path automatically releases the
2048 : : * lock, but no harm in explicitly releasing even in that case. Note
2049 : : * that LWLockRelease() could affect errno.
2050 : : */
1470 peter@eisentraut.org 2051 :UBC 0 : int save_errno = errno;
2052 : :
1480 2053 : 0 : LWLockRelease(&slot->io_in_progress_lock);
1470 2054 : 0 : errno = save_errno;
3726 rhaas@postgresql.org 2055 [ # # ]: 0 : ereport(elevel,
2056 : : (errcode_for_file_access(),
2057 : : errmsg("could not create file \"%s\": %m",
2058 : : tmppath)));
2059 : 0 : return;
2060 : : }
2061 : :
3726 rhaas@postgresql.org 2062 :CBC 1882 : cp.magic = SLOT_MAGIC;
3449 heikki.linnakangas@i 2063 : 1882 : INIT_CRC32C(cp.checksum);
3441 andres@anarazel.de 2064 : 1882 : cp.version = SLOT_VERSION;
2065 : 1882 : cp.length = ReplicationSlotOnDiskV2Size;
2066 : :
3726 rhaas@postgresql.org 2067 [ - + ]: 1882 : SpinLockAcquire(&slot->mutex);
2068 : :
2069 : 1882 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2070 : :
2071 : 1882 : SpinLockRelease(&slot->mutex);
2072 : :
3449 heikki.linnakangas@i 2073 : 1882 : COMP_CRC32C(cp.checksum,
2074 : : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2075 : : ReplicationSlotOnDiskChecksummedSize);
3441 andres@anarazel.de 2076 : 1882 : FIN_CRC32C(cp.checksum);
2077 : :
2079 michael@paquier.xyz 2078 : 1882 : errno = 0;
2584 rhaas@postgresql.org 2079 : 1882 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
3726 2080 [ - + ]: 1882 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2081 : : {
3631 bruce@momjian.us 2082 :UBC 0 : int save_errno = errno;
2083 : :
2584 rhaas@postgresql.org 2084 : 0 : pgstat_report_wait_end();
3726 2085 : 0 : CloseTransientFile(fd);
1480 peter@eisentraut.org 2086 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2087 : :
2088 : : /* if write didn't set errno, assume problem is no disk space */
2120 michael@paquier.xyz 2089 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
3726 rhaas@postgresql.org 2090 [ # # ]: 0 : ereport(elevel,
2091 : : (errcode_for_file_access(),
2092 : : errmsg("could not write to file \"%s\": %m",
2093 : : tmppath)));
2094 : 0 : return;
2095 : : }
2584 rhaas@postgresql.org 2096 :CBC 1882 : pgstat_report_wait_end();
2097 : :
2098 : : /* fsync the temporary file */
2099 : 1882 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
3726 2100 [ - + ]: 1882 : if (pg_fsync(fd) != 0)
2101 : : {
3631 bruce@momjian.us 2102 :UBC 0 : int save_errno = errno;
2103 : :
2584 rhaas@postgresql.org 2104 : 0 : pgstat_report_wait_end();
3726 2105 : 0 : CloseTransientFile(fd);
1480 peter@eisentraut.org 2106 : 0 : LWLockRelease(&slot->io_in_progress_lock);
3726 rhaas@postgresql.org 2107 : 0 : errno = save_errno;
2108 [ # # ]: 0 : ereport(elevel,
2109 : : (errcode_for_file_access(),
2110 : : errmsg("could not fsync file \"%s\": %m",
2111 : : tmppath)));
2112 : 0 : return;
2113 : : }
2584 rhaas@postgresql.org 2114 :CBC 1882 : pgstat_report_wait_end();
2115 : :
1744 peter@eisentraut.org 2116 [ - + ]: 1882 : if (CloseTransientFile(fd) != 0)
2117 : : {
1470 peter@eisentraut.org 2118 :UBC 0 : int save_errno = errno;
2119 : :
1480 2120 : 0 : LWLockRelease(&slot->io_in_progress_lock);
1470 2121 : 0 : errno = save_errno;
1863 michael@paquier.xyz 2122 [ # # ]: 0 : ereport(elevel,
2123 : : (errcode_for_file_access(),
2124 : : errmsg("could not close file \"%s\": %m",
2125 : : tmppath)));
1824 2126 : 0 : return;
2127 : : }
2128 : :
2129 : : /* rename to permanent file, fsync file and directory */
3726 rhaas@postgresql.org 2130 [ - + ]:CBC 1882 : if (rename(tmppath, path) != 0)
2131 : : {
1470 peter@eisentraut.org 2132 :UBC 0 : int save_errno = errno;
2133 : :
1480 2134 : 0 : LWLockRelease(&slot->io_in_progress_lock);
1470 2135 : 0 : errno = save_errno;
3726 rhaas@postgresql.org 2136 [ # # ]: 0 : ereport(elevel,
2137 : : (errcode_for_file_access(),
2138 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2139 : : tmppath, path)));
2140 : 0 : return;
2141 : : }
2142 : :
2143 : : /*
2144 : : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2145 : : */
3726 rhaas@postgresql.org 2146 :CBC 1882 : START_CRIT_SECTION();
2147 : :
2148 : 1882 : fsync_fname(path, false);
2958 andres@anarazel.de 2149 : 1882 : fsync_fname(dir, true);
3726 rhaas@postgresql.org 2150 : 1882 : fsync_fname("pg_replslot", true);
2151 : :
2152 [ - + ]: 1882 : END_CRIT_SECTION();
2153 : :
2154 : : /*
2155 : : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2156 : : * already and remember the confirmed_flush LSN value.
2157 : : */
3113 2158 [ - + ]: 1882 : SpinLockAcquire(&slot->mutex);
2159 [ + + ]: 1882 : if (!slot->just_dirtied)
2160 : 1879 : slot->dirty = false;
213 akapila@postgresql.o 2161 :GNC 1882 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
3113 rhaas@postgresql.org 2162 :CBC 1882 : SpinLockRelease(&slot->mutex);
2163 : :
2998 2164 : 1882 : LWLockRelease(&slot->io_in_progress_lock);
2165 : : }
2166 : :
2167 : : /*
2168 : : * Load a single slot from disk into memory.
2169 : : */
2170 : : static void
3726 2171 : 59 : RestoreSlotFromDisk(const char *name)
2172 : : {
2173 : : ReplicationSlotOnDisk cp;
2174 : : int i;
2175 : : char slotdir[MAXPGPATH + 12];
2176 : : char path[MAXPGPATH + 22];
2177 : : int fd;
2178 : 59 : bool restored = false;
2179 : : int readBytes;
2180 : : pg_crc32c checksum;
2181 : :
2182 : : /* no need to lock here, no concurrent access allowed yet */
2183 : :
2184 : : /* delete temp file if it exists */
2051 michael@paquier.xyz 2185 : 59 : sprintf(slotdir, "pg_replslot/%s", name);
2186 : 59 : sprintf(path, "%s/state.tmp", slotdir);
3726 rhaas@postgresql.org 2187 [ + - - + ]: 59 : if (unlink(path) < 0 && errno != ENOENT)
3726 rhaas@postgresql.org 2188 [ # # ]:UBC 0 : ereport(PANIC,
2189 : : (errcode_for_file_access(),
2190 : : errmsg("could not remove file \"%s\": %m", path)));
2191 : :
2051 michael@paquier.xyz 2192 :CBC 59 : sprintf(path, "%s/state", slotdir);
2193 : :
3726 rhaas@postgresql.org 2194 [ + + ]: 59 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2195 : :
2196 : : /* on some operating systems fsyncing a file requires O_RDWR */
1654 andres@anarazel.de 2197 : 59 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2198 : :
2199 : : /*
2200 : : * We do not need to handle this as we are rename()ing the directory into
2201 : : * place only after we fsync()ed the state file.
2202 : : */
3726 rhaas@postgresql.org 2203 [ - + ]: 59 : if (fd < 0)
3726 rhaas@postgresql.org 2204 [ # # ]:UBC 0 : ereport(PANIC,
2205 : : (errcode_for_file_access(),
2206 : : errmsg("could not open file \"%s\": %m", path)));
2207 : :
2208 : : /*
2209 : : * Sync state file before we're reading from it. We might have crashed
2210 : : * while it wasn't synced yet and we shouldn't continue on that basis.
2211 : : */
2584 rhaas@postgresql.org 2212 :CBC 59 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
3726 2213 [ - + ]: 59 : if (pg_fsync(fd) != 0)
3726 rhaas@postgresql.org 2214 [ # # ]:UBC 0 : ereport(PANIC,
2215 : : (errcode_for_file_access(),
2216 : : errmsg("could not fsync file \"%s\": %m",
2217 : : path)));
2584 rhaas@postgresql.org 2218 :CBC 59 : pgstat_report_wait_end();
2219 : :
2220 : : /* Also sync the parent directory */
3726 2221 : 59 : START_CRIT_SECTION();
2051 michael@paquier.xyz 2222 : 59 : fsync_fname(slotdir, true);
3726 rhaas@postgresql.org 2223 [ - + ]: 59 : END_CRIT_SECTION();
2224 : :
2225 : : /* read part of statefile that's guaranteed to be version independent */
2584 2226 : 59 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
3726 2227 : 59 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2584 2228 : 59 : pgstat_report_wait_end();
3726 2229 [ - + ]: 59 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2230 : : {
2097 michael@paquier.xyz 2231 [ # # ]:UBC 0 : if (readBytes < 0)
2232 [ # # ]: 0 : ereport(PANIC,
2233 : : (errcode_for_file_access(),
2234 : : errmsg("could not read file \"%s\": %m", path)));
2235 : : else
2236 [ # # ]: 0 : ereport(PANIC,
2237 : : (errcode(ERRCODE_DATA_CORRUPTED),
2238 : : errmsg("could not read file \"%s\": read %d of %zu",
2239 : : path, readBytes,
2240 : : (Size) ReplicationSlotOnDiskConstantSize)));
2241 : : }
2242 : :
2243 : : /* verify magic */
3726 rhaas@postgresql.org 2244 [ - + ]:CBC 59 : if (cp.magic != SLOT_MAGIC)
3726 rhaas@postgresql.org 2245 [ # # ]:UBC 0 : ereport(PANIC,
2246 : : (errcode(ERRCODE_DATA_CORRUPTED),
2247 : : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2248 : : path, cp.magic, SLOT_MAGIC)));
2249 : :
2250 : : /* verify version */
3726 rhaas@postgresql.org 2251 [ - + ]:CBC 59 : if (cp.version != SLOT_VERSION)
3726 rhaas@postgresql.org 2252 [ # # ]:UBC 0 : ereport(PANIC,
2253 : : (errcode(ERRCODE_DATA_CORRUPTED),
2254 : : errmsg("replication slot file \"%s\" has unsupported version %u",
2255 : : path, cp.version)));
2256 : :
2257 : : /* boundary check on length */
3441 andres@anarazel.de 2258 [ - + ]:CBC 59 : if (cp.length != ReplicationSlotOnDiskV2Size)
3726 rhaas@postgresql.org 2259 [ # # ]:UBC 0 : ereport(PANIC,
2260 : : (errcode(ERRCODE_DATA_CORRUPTED),
2261 : : errmsg("replication slot file \"%s\" has corrupted length %u",
2262 : : path, cp.length)));
2263 : :
2264 : : /* Now that we know the size, read the entire file */
2584 rhaas@postgresql.org 2265 :CBC 59 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
3726 2266 : 118 : readBytes = read(fd,
2267 : : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2268 : 59 : cp.length);
2584 2269 : 59 : pgstat_report_wait_end();
3726 2270 [ - + ]: 59 : if (readBytes != cp.length)
2271 : : {
2097 michael@paquier.xyz 2272 [ # # ]:UBC 0 : if (readBytes < 0)
2273 [ # # ]: 0 : ereport(PANIC,
2274 : : (errcode_for_file_access(),
2275 : : errmsg("could not read file \"%s\": %m", path)));
2276 : : else
2277 [ # # ]: 0 : ereport(PANIC,
2278 : : (errcode(ERRCODE_DATA_CORRUPTED),
2279 : : errmsg("could not read file \"%s\": read %d of %zu",
2280 : : path, readBytes, (Size) cp.length)));
2281 : : }
2282 : :
1744 peter@eisentraut.org 2283 [ - + ]:CBC 59 : if (CloseTransientFile(fd) != 0)
1863 michael@paquier.xyz 2284 [ # # ]:UBC 0 : ereport(PANIC,
2285 : : (errcode_for_file_access(),
2286 : : errmsg("could not close file \"%s\": %m", path)));
2287 : :
2288 : : /* now verify the CRC */
3449 heikki.linnakangas@i 2289 :CBC 59 : INIT_CRC32C(checksum);
2290 : 59 : COMP_CRC32C(checksum,
2291 : : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2292 : : ReplicationSlotOnDiskChecksummedSize);
3441 andres@anarazel.de 2293 : 59 : FIN_CRC32C(checksum);
2294 : :
3449 heikki.linnakangas@i 2295 [ - + ]: 59 : if (!EQ_CRC32C(checksum, cp.checksum))
3726 rhaas@postgresql.org 2296 [ # # ]:UBC 0 : ereport(PANIC,
2297 : : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2298 : : path, checksum, cp.checksum)));
2299 : :
2300 : : /*
2301 : : * If we crashed with an ephemeral slot active, don't restore but delete
2302 : : * it.
2303 : : */
3552 andres@anarazel.de 2304 [ - + ]:CBC 59 : if (cp.slotdata.persistency != RS_PERSISTENT)
2305 : : {
2051 michael@paquier.xyz 2306 [ # # ]:UBC 0 : if (!rmtree(slotdir, true))
2307 : : {
3552 andres@anarazel.de 2308 [ # # ]: 0 : ereport(WARNING,
2309 : : (errmsg("could not remove directory \"%s\"",
2310 : : slotdir)));
2311 : : }
2312 : 0 : fsync_fname("pg_replslot", true);
2313 : 0 : return;
2314 : : }
2315 : :
2316 : : /*
2317 : : * Verify that requirements for the specific slot type are met. That's
2318 : : * important because if these aren't met we're not guaranteed to retain
2319 : : * all the necessary resources for the slot.
2320 : : *
2321 : : * NB: We have to do so *after* the above checks for ephemeral slots,
2322 : : * because otherwise a slot that shouldn't exist anymore could prevent
2323 : : * restarts.
2324 : : *
2325 : : * NB: Changing the requirements here also requires adapting
2326 : : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2327 : : */
1992 andres@anarazel.de 2328 [ + + - + ]:CBC 59 : if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1992 andres@anarazel.de 2329 [ # # ]:UBC 0 : ereport(FATAL,
2330 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2331 : : errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
2332 : : NameStr(cp.slotdata.name)),
2333 : : errhint("Change wal_level to be logical or higher.")));
1992 andres@anarazel.de 2334 [ - + ]:CBC 59 : else if (wal_level < WAL_LEVEL_REPLICA)
1992 andres@anarazel.de 2335 [ # # ]:UBC 0 : ereport(FATAL,
2336 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2337 : : errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
2338 : : NameStr(cp.slotdata.name)),
2339 : : errhint("Change wal_level to be replica or higher.")));
2340 : :
2341 : : /* nothing can be active yet, don't lock anything */
3726 rhaas@postgresql.org 2342 [ + - ]:CBC 85 : for (i = 0; i < max_replication_slots; i++)
2343 : : {
2344 : : ReplicationSlot *slot;
2345 : :
2346 : 85 : slot = &ReplicationSlotCtl->replication_slots[i];
2347 : :
2348 [ + + ]: 85 : if (slot->in_use)
2349 : 26 : continue;
2350 : :
2351 : : /* restore the entire set of persistent data */
2352 : 59 : memcpy(&slot->data, &cp.slotdata,
2353 : : sizeof(ReplicationSlotPersistentData));
2354 : :
2355 : : /* initialize in memory state */
2356 : 59 : slot->effective_xmin = cp.slotdata.xmin;
3695 2357 : 59 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
213 akapila@postgresql.o 2358 :GNC 59 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2359 : :
3695 rhaas@postgresql.org 2360 :CBC 59 : slot->candidate_catalog_xmin = InvalidTransactionId;
2361 : 59 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2362 : 59 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2363 : 59 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2364 : :
3726 2365 : 59 : slot->in_use = true;
3281 andres@anarazel.de 2366 : 59 : slot->active_pid = 0;
2367 : :
2368 : : /*
2369 : : * Set the time since the slot has become inactive after loading the
2370 : : * slot from the disk into memory. Whoever acquires the slot i.e.
2371 : : * makes the slot active will reset it.
2372 : : */
9 akapila@postgresql.o 2373 :GNC 59 : slot->inactive_since = GetCurrentTimestamp();
2374 : :
3726 rhaas@postgresql.org 2375 :CBC 59 : restored = true;
2376 : 59 : break;
2377 : : }
2378 : :
2379 [ - + ]: 59 : if (!restored)
1990 michael@paquier.xyz 2380 [ # # ]:UBC 0 : ereport(FATAL,
2381 : : (errmsg("too many replication slots active before shutdown"),
2382 : : errhint("Increase max_replication_slots and try again.")));
2383 : : }
2384 : :
2385 : : /*
2386 : : * Maps an invalidation reason for a replication slot to
2387 : : * ReplicationSlotInvalidationCause.
2388 : : */
2389 : : ReplicationSlotInvalidationCause
23 akapila@postgresql.o 2390 :UNC 0 : GetSlotInvalidationCause(const char *invalidation_reason)
2391 : : {
2392 : : ReplicationSlotInvalidationCause cause;
52 michael@paquier.xyz 2393 : 0 : ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
2394 : 0 : bool found PG_USED_FOR_ASSERTS_ONLY = false;
2395 : :
23 akapila@postgresql.o 2396 [ # # ]: 0 : Assert(invalidation_reason);
2397 : :
52 michael@paquier.xyz 2398 [ # # ]: 0 : for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
2399 : : {
23 akapila@postgresql.o 2400 [ # # ]: 0 : if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
2401 : : {
52 michael@paquier.xyz 2402 : 0 : found = true;
2403 : 0 : result = cause;
2404 : 0 : break;
2405 : : }
2406 : : }
2407 : :
2408 [ # # ]: 0 : Assert(found);
2409 : 0 : return result;
2410 : : }
2411 : :
2412 : : /*
2413 : : * A helper function to validate slots specified in GUC standby_slot_names.
2414 : : *
2415 : : * The rawname will be parsed, and the result will be saved into *elemlist.
2416 : : */
2417 : : static bool
37 akapila@postgresql.o 2418 :GNC 23 : validate_standby_slots(char *rawname, List **elemlist)
2419 : : {
2420 : : bool ok;
2421 : :
2422 : : /* Verify syntax and parse string into a list of identifiers */
2423 : 23 : ok = SplitIdentifierString(rawname, ',', elemlist);
2424 : :
2425 [ - + ]: 23 : if (!ok)
2426 : : {
37 akapila@postgresql.o 2427 :UNC 0 : GUC_check_errdetail("List syntax is invalid.");
2428 : : }
37 akapila@postgresql.o 2429 [ + - ]:GNC 23 : else if (!ReplicationSlotCtl)
2430 : : {
2431 : : /*
2432 : : * We cannot validate the replication slot if the replication slots'
2433 : : * data has not been initialized. This is ok as we will anyway
2434 : : * validate the specified slot when waiting for them to catch up. See
2435 : : * StandbySlotsHaveCaughtup() for details.
2436 : : */
2437 : : }
2438 : : else
2439 : : {
2440 : : /* Check that the specified slots exist and are logical slots */
2441 : 23 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2442 : :
2443 [ + - + + : 69 : foreach_ptr(char, name, *elemlist)
+ + ]
2444 : : {
2445 : : ReplicationSlot *slot;
2446 : :
2447 : 23 : slot = SearchNamedReplicationSlot(name, false);
2448 : :
2449 [ - + ]: 23 : if (!slot)
2450 : : {
37 akapila@postgresql.o 2451 :UNC 0 : GUC_check_errdetail("replication slot \"%s\" does not exist",
2452 : : name);
2453 : 0 : ok = false;
2454 : 0 : break;
2455 : : }
2456 : :
37 akapila@postgresql.o 2457 [ - + ]:GNC 23 : if (!SlotIsPhysical(slot))
2458 : : {
37 akapila@postgresql.o 2459 :UNC 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot",
2460 : : name);
2461 : 0 : ok = false;
2462 : 0 : break;
2463 : : }
2464 : : }
2465 : :
37 akapila@postgresql.o 2466 :GNC 23 : LWLockRelease(ReplicationSlotControlLock);
2467 : : }
2468 : :
2469 : 23 : return ok;
2470 : : }
2471 : :
2472 : : /*
2473 : : * GUC check_hook for standby_slot_names
2474 : : */
2475 : : bool
2476 : 967 : check_standby_slot_names(char **newval, void **extra, GucSource source)
2477 : : {
2478 : : char *rawname;
2479 : : char *ptr;
2480 : : List *elemlist;
2481 : : int size;
2482 : : bool ok;
2483 : : StandbySlotNamesConfigData *config;
2484 : :
2485 [ + + ]: 967 : if ((*newval)[0] == '\0')
2486 : 944 : return true;
2487 : :
2488 : : /* Need a modifiable copy of the GUC string */
2489 : 23 : rawname = pstrdup(*newval);
2490 : :
2491 : : /* Now verify if the specified slots exist and have correct type */
2492 : 23 : ok = validate_standby_slots(rawname, &elemlist);
2493 : :
2494 [ + - - + ]: 23 : if (!ok || elemlist == NIL)
2495 : : {
37 akapila@postgresql.o 2496 :UNC 0 : pfree(rawname);
2497 : 0 : list_free(elemlist);
2498 : 0 : return ok;
2499 : : }
2500 : :
2501 : : /* Compute the size required for the StandbySlotNamesConfigData struct */
37 akapila@postgresql.o 2502 :GNC 23 : size = offsetof(StandbySlotNamesConfigData, slot_names);
2503 [ + - + + : 69 : foreach_ptr(char, slot_name, elemlist)
+ + ]
2504 : 23 : size += strlen(slot_name) + 1;
2505 : :
2506 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
2507 : 23 : config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size);
2508 : :
2509 : : /* Transform the data into StandbySlotNamesConfigData */
2510 : 23 : config->nslotnames = list_length(elemlist);
2511 : :
2512 : 23 : ptr = config->slot_names;
2513 [ + - + + : 69 : foreach_ptr(char, slot_name, elemlist)
+ + ]
2514 : : {
2515 : 23 : strcpy(ptr, slot_name);
2516 : 23 : ptr += strlen(slot_name) + 1;
2517 : : }
2518 : :
2519 : 23 : *extra = (void *) config;
2520 : :
2521 : 23 : pfree(rawname);
2522 : 23 : list_free(elemlist);
2523 : 23 : return true;
2524 : : }
2525 : :
2526 : : /*
2527 : : * GUC assign_hook for standby_slot_names
2528 : : */
2529 : : void
2530 : 967 : assign_standby_slot_names(const char *newval, void *extra)
2531 : : {
2532 : : /*
2533 : : * The standby slots may have changed, so we must recompute the oldest
2534 : : * LSN.
2535 : : */
2536 : 967 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2537 : :
2538 : 967 : standby_slot_names_config = (StandbySlotNamesConfigData *) extra;
2539 : 967 : }
2540 : :
2541 : : /*
2542 : : * Check if the passed slot_name is specified in the standby_slot_names GUC.
2543 : : */
2544 : : bool
2545 : 20922 : SlotExistsInStandbySlotNames(const char *slot_name)
2546 : : {
2547 : : const char *standby_slot_name;
2548 : :
2549 : : /* Return false if there is no value in standby_slot_names */
2550 [ + + ]: 20922 : if (standby_slot_names_config == NULL)
2551 : 20911 : return false;
2552 : :
2553 : : /*
2554 : : * XXX: We are not expecting this list to be long so a linear search
2555 : : * shouldn't hurt but if that turns out not to be true then we can cache
2556 : : * this information for each WalSender as well.
2557 : : */
2558 : 11 : standby_slot_name = standby_slot_names_config->slot_names;
2559 [ + + ]: 17 : for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
2560 : : {
2561 [ + + ]: 11 : if (strcmp(standby_slot_name, slot_name) == 0)
2562 : 5 : return true;
2563 : :
2564 : 6 : standby_slot_name += strlen(standby_slot_name) + 1;
2565 : : }
2566 : :
2567 : 6 : return false;
2568 : : }
2569 : :
2570 : : /*
2571 : : * Return true if the slots specified in standby_slot_names have caught up to
2572 : : * the given WAL location, false otherwise.
2573 : : *
2574 : : * The elevel parameter specifies the error level used for logging messages
2575 : : * related to slots that do not exist, are invalidated, or are inactive.
2576 : : */
2577 : : bool
2578 : 619 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2579 : : {
2580 : : const char *name;
2581 : 619 : int caught_up_slot_num = 0;
2582 : 619 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2583 : :
2584 : : /*
2585 : : * Don't need to wait for the standbys to catch up if there is no value in
2586 : : * standby_slot_names.
2587 : : */
2588 [ + + ]: 619 : if (standby_slot_names_config == NULL)
2589 : 606 : return true;
2590 : :
2591 : : /*
2592 : : * Don't need to wait for the standbys to catch up if we are on a standby
2593 : : * server, since we do not support syncing slots to cascading standbys.
2594 : : */
2595 [ - + ]: 13 : if (RecoveryInProgress())
37 akapila@postgresql.o 2596 :UNC 0 : return true;
2597 : :
2598 : : /*
2599 : : * Don't need to wait for the standbys to catch up if they are already
2600 : : * beyond the specified WAL location.
2601 : : */
37 akapila@postgresql.o 2602 [ + + ]:GNC 13 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2603 [ + + ]: 9 : ss_oldest_flush_lsn >= wait_for_lsn)
2604 : 5 : return true;
2605 : :
2606 : : /*
2607 : : * To prevent concurrent slot dropping and creation while filtering the
2608 : : * slots, take the ReplicationSlotControlLock outside of the loop.
2609 : : */
2610 : 8 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2611 : :
2612 : 8 : name = standby_slot_names_config->slot_names;
2613 [ + + ]: 11 : for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
2614 : : {
2615 : : XLogRecPtr restart_lsn;
2616 : : bool invalidated;
2617 : : bool inactive;
2618 : : ReplicationSlot *slot;
2619 : :
2620 : 8 : slot = SearchNamedReplicationSlot(name, false);
2621 : :
2622 [ - + ]: 8 : if (!slot)
2623 : : {
2624 : : /*
2625 : : * If a slot name provided in standby_slot_names does not exist,
2626 : : * report a message and exit the loop. A user can specify a slot
2627 : : * name that does not exist just before the server startup. The
2628 : : * GUC check_hook(validate_standby_slots) cannot validate such a
2629 : : * slot during startup as the ReplicationSlotCtl shared memory is
2630 : : * not initialized at that time. It is also possible for a user to
2631 : : * drop the slot in standby_slot_names afterwards.
2632 : : */
37 akapila@postgresql.o 2633 [ # # ]:UNC 0 : ereport(elevel,
2634 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2635 : : errmsg("replication slot \"%s\" specified in parameter %s does not exist",
2636 : : name, "standby_slot_names"),
2637 : : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2638 : : name),
2639 : : errhint("Consider creating the slot \"%s\" or amend parameter %s.",
2640 : : name, "standby_slot_names"));
2641 : 0 : break;
2642 : : }
2643 : :
37 akapila@postgresql.o 2644 [ - + ]:GNC 8 : if (SlotIsLogical(slot))
2645 : : {
2646 : : /*
2647 : : * If a logical slot name is provided in standby_slot_names,
2648 : : * report a message and exit the loop. Similar to the non-existent
2649 : : * case, a user can specify a logical slot name in
2650 : : * standby_slot_names before the server startup, or drop an
2651 : : * existing physical slot and recreate a logical slot with the
2652 : : * same name.
2653 : : */
37 akapila@postgresql.o 2654 [ # # ]:UNC 0 : ereport(elevel,
2655 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2656 : : errmsg("cannot have logical replication slot \"%s\" in parameter %s",
2657 : : name, "standby_slot_names"),
2658 : : errdetail("Logical replication is waiting for correction on \"%s\".",
2659 : : name),
2660 : : errhint("Consider removing logical slot \"%s\" from parameter %s.",
2661 : : name, "standby_slot_names"));
2662 : 0 : break;
2663 : : }
2664 : :
37 akapila@postgresql.o 2665 [ - + ]:GNC 8 : SpinLockAcquire(&slot->mutex);
2666 : 8 : restart_lsn = slot->data.restart_lsn;
2667 : 8 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2668 : 8 : inactive = slot->active_pid == 0;
2669 : 8 : SpinLockRelease(&slot->mutex);
2670 : :
2671 [ - + ]: 8 : if (invalidated)
2672 : : {
2673 : : /* Specified physical slot has been invalidated */
37 akapila@postgresql.o 2674 [ # # ]:UNC 0 : ereport(elevel,
2675 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2676 : : errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
2677 : : name, "standby_slot_names"),
2678 : : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2679 : : name),
2680 : : errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
2681 : : name, "standby_slot_names"));
2682 : 0 : break;
2683 : : }
2684 : :
37 akapila@postgresql.o 2685 [ + - + + ]:GNC 8 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2686 : : {
2687 : : /* Log a message if no active_pid for this physical slot */
2688 [ + + ]: 5 : if (inactive)
2689 [ + - ]: 4 : ereport(elevel,
2690 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2691 : : errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
2692 : : name, "standby_slot_names"),
2693 : : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2694 : : name),
2695 : : errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
2696 : : name, "standby_slot_names"));
2697 : :
2698 : : /* Continue if the current slot hasn't caught up. */
2699 : 5 : break;
2700 : : }
2701 : :
2702 [ - + ]: 3 : Assert(restart_lsn >= wait_for_lsn);
2703 : :
2704 [ - + - - ]: 3 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2705 : : min_restart_lsn > restart_lsn)
2706 : 3 : min_restart_lsn = restart_lsn;
2707 : :
2708 : 3 : caught_up_slot_num++;
2709 : :
2710 : 3 : name += strlen(name) + 1;
2711 : : }
2712 : :
2713 : 8 : LWLockRelease(ReplicationSlotControlLock);
2714 : :
2715 : : /*
2716 : : * Return false if not all the standbys have caught up to the specified
2717 : : * WAL location.
2718 : : */
2719 [ + + ]: 8 : if (caught_up_slot_num != standby_slot_names_config->nslotnames)
2720 : 5 : return false;
2721 : :
2722 : : /* The ss_oldest_flush_lsn must not retreat. */
2723 [ + + - + ]: 3 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
2724 : : min_restart_lsn >= ss_oldest_flush_lsn);
2725 : :
2726 : 3 : ss_oldest_flush_lsn = min_restart_lsn;
2727 : :
2728 : 3 : return true;
2729 : : }
2730 : :
2731 : : /*
2732 : : * Wait for physical standbys to confirm receiving the given lsn.
2733 : : *
2734 : : * Used by logical decoding SQL functions. It waits for physical standbys
2735 : : * corresponding to the physical slots specified in the standby_slot_names GUC.
2736 : : */
2737 : : void
2738 : 200 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
2739 : : {
2740 : : /*
2741 : : * Don't need to wait for the standby to catch up if the current acquired
2742 : : * slot is not a logical failover slot, or there is no value in
2743 : : * standby_slot_names.
2744 : : */
2745 [ + + + + ]: 200 : if (!MyReplicationSlot->data.failover || !standby_slot_names_config)
2746 : 199 : return;
2747 : :
2748 : 1 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
2749 : :
2750 : : for (;;)
2751 : : {
2752 [ - + ]: 2 : CHECK_FOR_INTERRUPTS();
2753 : :
2754 [ + + ]: 2 : if (ConfigReloadPending)
2755 : : {
2756 : 1 : ConfigReloadPending = false;
2757 : 1 : ProcessConfigFile(PGC_SIGHUP);
2758 : : }
2759 : :
2760 : : /* Exit if done waiting for every slot. */
2761 [ + + ]: 2 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2762 : 1 : break;
2763 : :
2764 : : /*
2765 : : * Wait for the slots in the standby_slot_names to catch up, but use a
2766 : : * timeout (1s) so we can also check if the standby_slot_names has
2767 : : * been changed.
2768 : : */
2769 : 1 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
2770 : : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2771 : : }
2772 : :
2773 : 1 : ConditionVariableCancelSleep();
2774 : : }
|