Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 : * directory. Inside that directory the state file will contain the slot's
25 : * own data. Additional data can be stored alongside that file if required.
26 : * While the server is running, the state data is also cached in memory for
27 : * efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "replication/slot.h"
50 : #include "storage/fd.h"
51 : #include "storage/ipc.h"
52 : #include "storage/proc.h"
53 : #include "storage/procarray.h"
54 : #include "utils/builtins.h"
55 :
56 : /*
57 : * Replication slot on-disk data structure.
58 : */
59 : typedef struct ReplicationSlotOnDisk
60 : {
61 : /* first part of this struct needs to be version independent */
62 :
63 : /* data not covered by checksum */
64 : uint32 magic;
65 : pg_crc32c checksum;
66 :
67 : /* data covered by checksum */
68 : uint32 version;
69 : uint32 length;
70 :
71 : /*
72 : * The actual data in the slot that follows can differ based on the above
73 : * 'version'.
74 : */
75 :
76 : ReplicationSlotPersistentData slotdata;
77 : } ReplicationSlotOnDisk;
78 :
79 : /* size of version independent data */
80 : #define ReplicationSlotOnDiskConstantSize \
81 : offsetof(ReplicationSlotOnDisk, slotdata)
82 : /* size of the part of the slot not covered by the checksum */
83 : #define ReplicationSlotOnDiskNotChecksummedSize \
84 : offsetof(ReplicationSlotOnDisk, version)
85 : /* size of the part covered by the checksum */
86 : #define ReplicationSlotOnDiskChecksummedSize \
87 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
88 : /* size of the slot data that is version dependent */
89 : #define ReplicationSlotOnDiskV2Size \
90 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
91 :
92 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
93 : #define SLOT_VERSION 3 /* version for new files */
94 :
95 : /* Control array for replication slot management */
96 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
97 :
98 : /* My backend's replication slot in the shared memory array */
99 : ReplicationSlot *MyReplicationSlot = NULL;
100 :
101 : /* GUC variable */
102 : int max_replication_slots = 10; /* the maximum number of replication
103 : * slots */
104 :
105 : static void ReplicationSlotShmemExit(int code, Datum arg);
106 : static void ReplicationSlotDropAcquired(void);
107 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
108 :
109 : /* internal persistency functions */
110 : static void RestoreSlotFromDisk(const char *name);
111 : static void CreateSlotOnDisk(ReplicationSlot *slot);
112 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
113 :
114 : /*
115 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
116 : */
117 : Size
3355 rhaas 118 GIC 6390 : ReplicationSlotsShmemSize(void)
119 : {
3355 rhaas 120 CBC 6390 : Size size = 0;
121 :
122 6390 : if (max_replication_slots == 0)
3355 rhaas 123 UIC 0 : return size;
3355 rhaas 124 ECB :
3355 rhaas 125 GBC 6390 : size = offsetof(ReplicationSlotCtlData, replication_slots);
3355 rhaas 126 GIC 6390 : size = add_size(size,
3355 rhaas 127 ECB : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
128 :
3355 rhaas 129 GIC 6390 : return size;
130 : }
3355 rhaas 131 ECB :
132 : /*
133 : * Allocate and initialize shared memory for replication slots.
134 : */
135 : void
3355 rhaas 136 GIC 1826 : ReplicationSlotsShmemInit(void)
137 : {
3355 rhaas 138 ECB : bool found;
139 :
3355 rhaas 140 GIC 1826 : if (max_replication_slots == 0)
3355 rhaas 141 UIC 0 : return;
3355 rhaas 142 ECB :
3355 rhaas 143 GBC 1826 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
3355 rhaas 144 GIC 1826 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
3355 rhaas 145 ECB : &found);
146 :
3355 rhaas 147 GIC 1826 : if (!found)
148 : {
3355 rhaas 149 ECB : int i;
150 :
151 : /* First time through, so initialize */
3355 rhaas 152 GIC 2189 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
153 :
3355 rhaas 154 CBC 19967 : for (i = 0; i < max_replication_slots; i++)
155 : {
156 18141 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
157 :
3355 rhaas 158 ECB : /* everything else is zeroed by the memset above */
3355 rhaas 159 GIC 18141 : SpinLockInit(&slot->mutex);
1059 tgl 160 18141 : LWLockInitialize(&slot->io_in_progress_lock,
1059 tgl 161 ECB : LWTRANCHE_REPLICATION_SLOT_IO);
2084 alvherre 162 CBC 18141 : ConditionVariableInit(&slot->active_cv);
163 : }
3355 rhaas 164 ECB : }
165 : }
166 :
167 : /*
168 : * Register the callback for replication slot cleanup and releasing.
169 : */
170 : void
419 andres 171 GIC 13289 : ReplicationSlotInitialize(void)
172 : {
419 andres 173 CBC 13289 : before_shmem_exit(ReplicationSlotShmemExit, 0);
419 andres 174 GIC 13289 : }
419 andres 175 ECB :
176 : /*
177 : * Release and cleanup replication slots.
178 : */
179 : static void
419 andres 180 GIC 13289 : ReplicationSlotShmemExit(int code, Datum arg)
181 : {
419 andres 182 ECB : /* Make sure active replication slots are released */
419 andres 183 GIC 13289 : if (MyReplicationSlot != NULL)
184 153 : ReplicationSlotRelease();
419 andres 185 ECB :
186 : /* Also cleanup all the temporary slots. */
419 andres 187 GIC 13289 : ReplicationSlotCleanup();
188 13289 : }
419 andres 189 ECB :
3355 rhaas 190 : /*
191 : * Check whether the passed slot name is valid and report errors at elevel.
192 : *
193 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
194 : * the name to be used as a directory name on every supported OS.
195 : *
196 : * Returns whether the directory name is valid or not if elevel < ERROR.
197 : */
198 : bool
3355 rhaas 199 GIC 603 : ReplicationSlotValidateName(const char *name, int elevel)
200 : {
3355 rhaas 201 ECB : const char *cp;
202 :
3355 rhaas 203 GIC 603 : if (strlen(name) == 0)
204 : {
3355 rhaas 205 CBC 3 : ereport(elevel,
206 : (errcode(ERRCODE_INVALID_NAME),
3355 rhaas 207 ECB : errmsg("replication slot name \"%s\" is too short",
208 : name)));
3355 rhaas 209 UIC 0 : return false;
210 : }
3355 rhaas 211 EUB :
3355 rhaas 212 GIC 600 : if (strlen(name) >= NAMEDATALEN)
213 : {
3355 rhaas 214 LBC 0 : ereport(elevel,
215 : (errcode(ERRCODE_NAME_TOO_LONG),
3355 rhaas 216 EUB : errmsg("replication slot name \"%s\" is too long",
217 : name)));
3355 rhaas 218 UIC 0 : return false;
219 : }
3355 rhaas 220 EUB :
3355 rhaas 221 GIC 13080 : for (cp = name; *cp; cp++)
222 : {
3355 rhaas 223 CBC 12481 : if (!((*cp >= 'a' && *cp <= 'z')
3355 rhaas 224 GIC 6433 : || (*cp >= '0' && *cp <= '9')
3355 rhaas 225 CBC 1189 : || (*cp == '_')))
3355 rhaas 226 ECB : {
3355 rhaas 227 CBC 1 : ereport(elevel,
228 : (errcode(ERRCODE_INVALID_NAME),
2118 tgl 229 ECB : errmsg("replication slot name \"%s\" contains invalid character",
230 : name),
231 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
3355 rhaas 232 UIC 0 : return false;
233 : }
3355 rhaas 234 EUB : }
3355 rhaas 235 GIC 599 : return true;
236 : }
3355 rhaas 237 ECB :
238 : /*
239 : * Create a new replication slot and mark it as used by this backend.
240 : *
241 : * name: Name of the slot
242 : * db_specific: logical decoding is db specific; if the slot is going to
243 : * be used for that pass true, otherwise false.
244 : * two_phase: Allows decoding of prepared transactions. We allow this option
245 : * to be enabled only at the slot creation time. If we allow this option
246 : * to be changed during decoding then it is quite possible that we skip
247 : * prepare first time because this option was not enabled. Now next time
248 : * during getting changes, if the two_phase option is enabled it can skip
249 : * prepare because by that time start decoding point has been moved. So the
250 : * user will only get commit prepared.
251 : */
252 : void
3324 rhaas 253 GIC 474 : ReplicationSlotCreate(const char *name, bool db_specific,
254 : ReplicationSlotPersistency persistency, bool two_phase)
3355 rhaas 255 ECB : {
3355 rhaas 256 GIC 474 : ReplicationSlot *slot = NULL;
257 : int i;
3355 rhaas 258 ECB :
3355 rhaas 259 GIC 474 : Assert(MyReplicationSlot == NULL);
260 :
3355 rhaas 261 CBC 474 : ReplicationSlotValidateName(name, ERROR);
262 :
3355 rhaas 263 ECB : /*
264 : * If some other backend ran this code concurrently with us, we'd likely
265 : * both allocate the same slot, and that would be bad. We'd also be at
266 : * risk of missing a name collision. Also, we don't want to try to create
267 : * a new slot while somebody's busy cleaning up an old one, because we
268 : * might both be monkeying with the same directory.
269 : */
3355 rhaas 270 GIC 473 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
271 :
3355 rhaas 272 ECB : /*
273 : * Check for name collision, and identify an allocatable slot. We need to
274 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
275 : * else can change the in_use flags while we're looking at them.
276 : */
3355 rhaas 277 GIC 473 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
278 4383 : for (i = 0; i < max_replication_slots; i++)
3355 rhaas 279 ECB : {
3355 rhaas 280 CBC 3913 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
281 :
282 3913 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
3355 rhaas 283 GIC 3 : ereport(ERROR,
3355 rhaas 284 ECB : (errcode(ERRCODE_DUPLICATE_OBJECT),
285 : errmsg("replication slot \"%s\" already exists", name)));
3355 rhaas 286 GIC 3910 : if (!s->in_use && slot == NULL)
287 469 : slot = s;
3355 rhaas 288 ECB : }
3355 rhaas 289 CBC 470 : LWLockRelease(ReplicationSlotControlLock);
290 :
3355 rhaas 291 ECB : /* If all slots are in use, we're out of luck. */
3355 rhaas 292 GIC 470 : if (slot == NULL)
293 1 : ereport(ERROR,
3355 rhaas 294 ECB : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
295 : errmsg("all replication slots are in use"),
296 : errhint("Free one or increase max_replication_slots.")));
297 :
298 : /*
299 : * Since this slot is not in use, nobody should be looking at any part of
300 : * it other than the in_use field unless they're trying to allocate it.
301 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
302 : * be doing that. So it's safe to initialize the slot.
303 : */
3355 rhaas 304 GIC 469 : Assert(!slot->in_use);
2910 andres 305 469 : Assert(slot->active_pid == 0);
2426 andres 306 ECB :
307 : /* first initialize persistent data */
2426 andres 308 GIC 469 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
972 peter 309 469 : namestrcpy(&slot->data.name, name);
3355 rhaas 310 CBC 469 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
2426 andres 311 469 : slot->data.persistency = persistency;
767 akapila 312 469 : slot->data.two_phase = two_phase;
634 313 469 : slot->data.two_phase_at = InvalidXLogRecPtr;
2426 andres 314 ECB :
315 : /* and then data only present in shared memory */
2426 andres 316 GIC 469 : slot->just_dirtied = false;
317 469 : slot->dirty = false;
2426 andres 318 CBC 469 : slot->effective_xmin = InvalidTransactionId;
319 469 : slot->effective_catalog_xmin = InvalidTransactionId;
320 469 : slot->candidate_catalog_xmin = InvalidTransactionId;
321 469 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
322 469 : slot->candidate_restart_valid = InvalidXLogRecPtr;
323 469 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
3355 rhaas 324 ECB :
325 : /*
326 : * Create the slot on disk. We haven't actually marked the slot allocated
327 : * yet, so no special cleanup is required if this errors out.
328 : */
3355 rhaas 329 GIC 469 : CreateSlotOnDisk(slot);
330 :
3355 rhaas 331 ECB : /*
332 : * We need to briefly prevent any other backend from iterating over the
333 : * slots while we flip the in_use flag. We also need to set the active
334 : * flag while holding the ControlLock as otherwise a concurrent
335 : * ReplicationSlotAcquire() could acquire the slot as well.
336 : */
3355 rhaas 337 GIC 469 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
338 :
3355 rhaas 339 CBC 469 : slot->in_use = true;
340 :
3355 rhaas 341 ECB : /* We can now mark the slot active, and that makes it our slot. */
2742 rhaas 342 GIC 469 : SpinLockAcquire(&slot->mutex);
343 469 : Assert(slot->active_pid == 0);
2742 rhaas 344 CBC 469 : slot->active_pid = MyProcPid;
345 469 : SpinLockRelease(&slot->mutex);
346 469 : MyReplicationSlot = slot;
3355 rhaas 347 ECB :
3355 rhaas 348 CBC 469 : LWLockRelease(ReplicationSlotControlLock);
349 :
913 akapila 350 ECB : /*
351 : * Create statistics entry for the new logical slot. We don't collect any
352 : * stats for physical slots, so no need to create an entry for the same.
353 : * See ReplicationSlotDropPtr for why we need to do this before releasing
354 : * ReplicationSlotAllocationLock.
355 : */
913 akapila 356 GIC 469 : if (SlotIsLogical(slot))
368 andres 357 344 : pgstat_create_replslot(slot);
913 akapila 358 ECB :
3355 rhaas 359 : /*
360 : * Now that the slot has been marked as in_use and active, it's safe to
361 : * let somebody else try to allocate a slot.
362 : */
3355 rhaas 363 GIC 469 : LWLockRelease(ReplicationSlotAllocationLock);
364 :
2084 alvherre 365 ECB : /* Let everybody know we've modified this slot */
2084 alvherre 366 GIC 469 : ConditionVariableBroadcast(&slot->active_cv);
3355 rhaas 367 469 : }
3355 rhaas 368 ECB :
369 : /*
370 : * Search for the named replication slot.
371 : *
372 : * Return the replication slot if found, otherwise NULL.
373 : */
374 : ReplicationSlot *
712 akapila 375 GIC 985 : SearchNamedReplicationSlot(const char *name, bool need_lock)
376 : {
3355 rhaas 377 ECB : int i;
712 akapila 378 GIC 985 : ReplicationSlot *slot = NULL;
379 :
712 akapila 380 CBC 985 : if (need_lock)
712 akapila 381 GIC 66 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3355 rhaas 382 ECB :
3355 rhaas 383 CBC 1643 : for (i = 0; i < max_replication_slots; i++)
384 : {
385 1627 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
386 :
387 1627 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
388 : {
389 969 : slot = s;
3355 rhaas 390 GIC 969 : break;
3355 rhaas 391 ECB : }
392 : }
393 :
712 akapila 394 GIC 985 : if (need_lock)
395 66 : LWLockRelease(ReplicationSlotControlLock);
712 akapila 396 ECB :
1024 fujii 397 CBC 985 : return slot;
398 : }
1024 fujii 399 ECB :
400 : /*
401 : * Return the index of the replication slot in
402 : * ReplicationSlotCtl->replication_slots.
403 : *
404 : * This is mainly useful to have an efficient key for storing replication slot
405 : * stats.
406 : */
407 : int
368 andres 408 GIC 6891 : ReplicationSlotIndex(ReplicationSlot *slot)
409 : {
368 andres 410 CBC 6891 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
411 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
368 andres 412 ECB :
368 andres 413 GIC 6891 : return slot - ReplicationSlotCtl->replication_slots;
414 : }
368 andres 415 ECB :
416 : /*
417 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
418 : * the slot's name and true is returned.
419 : *
420 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
421 : * cases there are obvious TOCTOU issues.
422 : */
423 : bool
183 andres 424 GIC 43 : ReplicationSlotName(int index, Name name)
425 : {
183 andres 426 ECB : ReplicationSlot *slot;
427 : bool found;
428 :
183 andres 429 GIC 43 : slot = &ReplicationSlotCtl->replication_slots[index];
430 :
183 andres 431 ECB : /*
432 : * Ensure that the slot cannot be dropped while we copy the name. Don't
433 : * need the spinlock as the name of an existing slot cannot change.
434 : */
183 andres 435 GIC 43 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
436 43 : found = slot->in_use;
183 andres 437 CBC 43 : if (slot->in_use)
438 43 : namestrcpy(name, NameStr(slot->data.name));
439 43 : LWLockRelease(ReplicationSlotControlLock);
183 andres 440 ECB :
183 andres 441 CBC 43 : return found;
442 : }
183 andres 443 ECB :
444 : /*
445 : * Find a previously created slot and mark it as used by this process.
446 : *
447 : * An error is raised if nowait is true and the slot is currently in use. If
448 : * nowait is false, we sleep until the slot is released by the owning process.
449 : */
450 : void
667 alvherre 451 GIC 913 : ReplicationSlotAcquire(const char *name, bool nowait)
452 : {
1024 fujii 453 ECB : ReplicationSlot *s;
454 : int active_pid;
455 :
163 peter 456 GNC 913 : Assert(name != NULL);
457 :
1024 fujii 458 CBC 913 : retry:
1024 fujii 459 GIC 913 : Assert(MyReplicationSlot == NULL);
1024 fujii 460 ECB :
1024 fujii 461 CBC 913 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
462 :
1024 fujii 463 ECB : /*
464 : * Search for the slot with the specified name if the slot to acquire is
465 : * not given. If the slot is not found, we either return -1 or error out.
466 : */
667 alvherre 467 GIC 913 : s = SearchNamedReplicationSlot(name, false);
1024 fujii 468 913 : if (s == NULL || !s->in_use)
1024 fujii 469 ECB : {
1024 fujii 470 CBC 9 : LWLockRelease(ReplicationSlotControlLock);
471 :
3355 rhaas 472 9 : ereport(ERROR,
473 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1024 fujii 474 ECB : errmsg("replication slot \"%s\" does not exist",
475 : name)));
476 : }
477 :
478 : /*
479 : * This is the slot we want; check if it's active under some other
480 : * process. In single user mode, we don't need this check.
481 : */
1024 fujii 482 GIC 904 : if (IsUnderPostmaster)
483 : {
1024 fujii 484 ECB : /*
485 : * Get ready to sleep on the slot in case it is active. (We may end
486 : * up not sleeping, but we don't want to do this while holding the
487 : * spinlock.)
488 : */
667 alvherre 489 GIC 904 : if (!nowait)
1024 fujii 490 189 : ConditionVariablePrepareToSleep(&s->active_cv);
1024 fujii 491 ECB :
1024 fujii 492 CBC 904 : SpinLockAcquire(&s->mutex);
1024 fujii 493 GIC 904 : if (s->active_pid == 0)
1024 fujii 494 CBC 804 : s->active_pid = MyProcPid;
495 904 : active_pid = s->active_pid;
496 904 : SpinLockRelease(&s->mutex);
1024 fujii 497 ECB : }
498 : else
1024 fujii 499 UIC 0 : active_pid = MyProcPid;
1024 fujii 500 GIC 904 : LWLockRelease(ReplicationSlotControlLock);
1024 fujii 501 EUB :
1024 fujii 502 ECB : /*
503 : * If we found the slot but it's already active in another process, we
504 : * wait until the owning process signals us that it's been released, or
505 : * error out.
506 : */
2313 peter_e 507 GIC 904 : if (active_pid != MyProcPid)
508 : {
667 alvherre 509 LBC 0 : if (!nowait)
510 : {
664 alvherre 511 EUB : /* Wait here until we get signaled, and then restart */
664 alvherre 512 UIC 0 : ConditionVariableSleep(&s->active_cv,
513 : WAIT_EVENT_REPLICATION_SLOT_DROP);
664 alvherre 514 UBC 0 : ConditionVariableCancelSleep();
664 alvherre 515 UIC 0 : goto retry;
664 alvherre 516 EUB : }
517 :
664 alvherre 518 UIC 0 : ereport(ERROR,
519 : (errcode(ERRCODE_OBJECT_IN_USE),
664 alvherre 520 EUB : errmsg("replication slot \"%s\" is active for PID %d",
521 : NameStr(s->data.name), active_pid)));
522 : }
667 alvherre 523 GIC 904 : else if (!nowait)
697 tgl 524 189 : ConditionVariableCancelSleep(); /* no sleep needed after all */
2084 alvherre 525 ECB :
526 : /* Let everybody know we've modified this slot */
1024 fujii 527 GIC 904 : ConditionVariableBroadcast(&s->active_cv);
528 :
3355 rhaas 529 ECB : /* We made this slot active, so it's ours now. */
1024 fujii 530 GIC 904 : MyReplicationSlot = s;
531 :
368 andres 532 ECB : /*
533 : * The call to pgstat_acquire_replslot() protects against stats for a
534 : * different slot, from before a restart or such, being present during
535 : * pgstat_report_replslot().
536 : */
368 andres 537 GIC 904 : if (SlotIsLogical(s))
538 767 : pgstat_acquire_replslot(s);
3355 rhaas 539 CBC 904 : }
3355 rhaas 540 ECB :
541 : /*
542 : * Release the replication slot that this backend considers to own.
543 : *
544 : * This or another backend can re-acquire the slot later.
545 : * Resources this slot requires will be preserved.
546 : */
547 : void
3355 rhaas 548 GIC 1092 : ReplicationSlotRelease(void)
549 : {
3355 rhaas 550 CBC 1092 : ReplicationSlot *slot = MyReplicationSlot;
551 :
2910 andres 552 1092 : Assert(slot != NULL && slot->active_pid != 0);
553 :
3324 rhaas 554 1092 : if (slot->data.persistency == RS_EPHEMERAL)
555 : {
3324 rhaas 556 ECB : /*
557 : * Delete the slot. There is no !PANIC case where this is allowed to
558 : * fail, all that may happen is an incomplete cleanup of the on-disk
559 : * data.
560 : */
3324 rhaas 561 GIC 5 : ReplicationSlotDropAcquired();
562 : }
2177 andres 563 ECB :
564 : /*
565 : * If slot needed to temporarily restrain both data and catalog xmin to
566 : * create the catalog snapshot, remove that temporary constraint.
567 : * Snapshots can only be exported while the initial snapshot is still
568 : * acquired.
569 : */
2177 andres 570 GIC 1092 : if (!TransactionIdIsValid(slot->data.xmin) &&
571 1087 : TransactionIdIsValid(slot->effective_xmin))
2177 andres 572 ECB : {
2177 andres 573 CBC 155 : SpinLockAcquire(&slot->mutex);
2177 andres 574 GIC 155 : slot->effective_xmin = InvalidTransactionId;
2177 andres 575 CBC 155 : SpinLockRelease(&slot->mutex);
576 155 : ReplicationSlotsComputeRequiredXmin(false);
2177 andres 577 ECB : }
578 :
2084 alvherre 579 GIC 1092 : if (slot->data.persistency == RS_PERSISTENT)
580 : {
2084 alvherre 581 ECB : /*
582 : * Mark persistent slot inactive. We're not freeing it, just
583 : * disconnecting, but wake up others that may be waiting for it.
584 : */
2084 alvherre 585 GIC 886 : SpinLockAcquire(&slot->mutex);
586 886 : slot->active_pid = 0;
2084 alvherre 587 CBC 886 : SpinLockRelease(&slot->mutex);
588 886 : ConditionVariableBroadcast(&slot->active_cv);
2084 alvherre 589 ECB : }
590 :
3324 rhaas 591 GIC 1092 : MyReplicationSlot = NULL;
592 :
3324 rhaas 593 ECB : /* might not have been set when we've been a plain slot */
514 alvherre 594 GIC 1092 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
874 595 1092 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
874 alvherre 596 CBC 1092 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
3324 rhaas 597 1092 : LWLockRelease(ProcArrayLock);
3355 598 1092 : }
3355 rhaas 599 ECB :
2313 peter_e 600 : /*
601 : * Cleanup all temporary slots created in current session.
602 : */
603 : void
2195 andres 604 GIC 31075 : ReplicationSlotCleanup(void)
605 : {
2313 peter_e 606 ECB : int i;
607 :
2313 peter_e 608 GIC 31075 : Assert(MyReplicationSlot == NULL);
609 :
2084 alvherre 610 CBC 31075 : restart:
2084 alvherre 611 GIC 31176 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2313 peter_e 612 CBC 338131 : for (i = 0; i < max_replication_slots; i++)
2313 peter_e 613 ECB : {
2313 peter_e 614 CBC 307056 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
615 :
2084 alvherre 616 307056 : if (!s->in_use)
2084 alvherre 617 GIC 297495 : continue;
2084 alvherre 618 ECB :
2084 alvherre 619 CBC 9561 : SpinLockAcquire(&s->mutex);
2313 peter_e 620 GIC 9561 : if (s->active_pid == MyProcPid)
2313 peter_e 621 ECB : {
2084 alvherre 622 CBC 101 : Assert(s->data.persistency == RS_TEMPORARY);
2084 alvherre 623 GIC 101 : SpinLockRelease(&s->mutex);
2084 alvherre 624 CBC 101 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
2313 peter_e 625 ECB :
2313 peter_e 626 CBC 101 : ReplicationSlotDropPtr(s);
627 :
2084 alvherre 628 101 : ConditionVariableBroadcast(&s->active_cv);
2084 alvherre 629 GIC 101 : goto restart;
2313 peter_e 630 ECB : }
2084 alvherre 631 : else
2084 alvherre 632 GIC 9460 : SpinLockRelease(&s->mutex);
633 : }
2084 alvherre 634 ECB :
2084 alvherre 635 GIC 31075 : LWLockRelease(ReplicationSlotControlLock);
2313 peter_e 636 31075 : }
2313 peter_e 637 ECB :
3355 rhaas 638 : /*
639 : * Permanently drop replication slot identified by the passed in name.
640 : */
641 : void
2084 alvherre 642 GIC 299 : ReplicationSlotDrop(const char *name, bool nowait)
643 : {
3324 rhaas 644 CBC 299 : Assert(MyReplicationSlot == NULL);
645 :
667 alvherre 646 299 : ReplicationSlotAcquire(name, nowait);
647 :
3324 rhaas 648 294 : ReplicationSlotDropAcquired();
3324 rhaas 649 GIC 294 : }
3324 rhaas 650 ECB :
651 : /*
652 : * Permanently drop the currently acquired replication slot.
653 : */
654 : static void
3324 rhaas 655 GIC 304 : ReplicationSlotDropAcquired(void)
656 : {
3324 rhaas 657 CBC 304 : ReplicationSlot *slot = MyReplicationSlot;
658 :
659 304 : Assert(MyReplicationSlot != NULL);
660 :
3324 rhaas 661 ECB : /* slot isn't acquired anymore */
3324 rhaas 662 GIC 304 : MyReplicationSlot = NULL;
663 :
2313 peter_e 664 CBC 304 : ReplicationSlotDropPtr(slot);
2313 peter_e 665 GIC 304 : }
2313 peter_e 666 ECB :
667 : /*
668 : * Permanently drop the replication slot which will be released by the point
669 : * this function returns.
670 : */
671 : static void
2313 peter_e 672 GIC 405 : ReplicationSlotDropPtr(ReplicationSlot *slot)
673 : {
2313 peter_e 674 ECB : char path[MAXPGPATH];
675 : char tmppath[MAXPGPATH];
676 :
677 : /*
678 : * If some other backend ran this code concurrently with us, we might try
679 : * to delete a slot with a certain name while someone else was trying to
680 : * create a slot with the same name.
681 : */
3355 rhaas 682 GIC 405 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
683 :
3355 rhaas 684 ECB : /* Generate pathnames. */
3355 rhaas 685 GIC 405 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
686 405 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
3355 rhaas 687 ECB :
688 : /*
689 : * Rename the slot directory on disk, so that we'll no longer recognize
690 : * this as a valid slot. Note that if this fails, we've got to mark the
691 : * slot inactive before bailing out. If we're dropping an ephemeral or a
692 : * temporary slot, we better never fail hard as the caller won't expect
693 : * the slot to survive and this might get called during error handling.
694 : */
3324 rhaas 695 GIC 405 : if (rename(path, tmppath) == 0)
696 : {
3324 rhaas 697 ECB : /*
698 : * We need to fsync() the directory we just renamed and its parent to
699 : * make sure that our changes are on disk in a crash-safe fashion. If
700 : * fsync() fails, we can't be sure whether the changes are on disk or
701 : * not. For now, we handle that by panicking;
702 : * StartupReplicationSlots() will try to straighten it out after
703 : * restart.
704 : */
3324 rhaas 705 GIC 405 : START_CRIT_SECTION();
706 405 : fsync_fname(tmppath, true);
3324 rhaas 707 CBC 405 : fsync_fname("pg_replslot", true);
708 405 : END_CRIT_SECTION();
3324 rhaas 709 ECB : }
710 : else
711 : {
2313 peter_e 712 UIC 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
713 :
3355 rhaas 714 UBC 0 : SpinLockAcquire(&slot->mutex);
2742 rhaas 715 UIC 0 : slot->active_pid = 0;
3355 rhaas 716 UBC 0 : SpinLockRelease(&slot->mutex);
3355 rhaas 717 EUB :
2084 alvherre 718 : /* wake up anyone waiting on this slot */
2084 alvherre 719 UIC 0 : ConditionVariableBroadcast(&slot->active_cv);
720 :
3324 rhaas 721 UBC 0 : ereport(fail_softly ? WARNING : ERROR,
722 : (errcode_for_file_access(),
3138 peter_e 723 EUB : errmsg("could not rename file \"%s\" to \"%s\": %m",
724 : path, tmppath)));
725 : }
726 :
727 : /*
728 : * The slot is definitely gone. Lock out concurrent scans of the array
729 : * long enough to kill it. It's OK to clear the active PID here without
730 : * grabbing the mutex because nobody else can be scanning the array here,
731 : * and nobody can be attached to this slot and thus access it without
732 : * scanning the array.
733 : *
734 : * Also wake up processes waiting for it.
735 : */
3355 rhaas 736 GIC 405 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
2910 andres 737 405 : slot->active_pid = 0;
3355 rhaas 738 CBC 405 : slot->in_use = false;
739 405 : LWLockRelease(ReplicationSlotControlLock);
2084 alvherre 740 405 : ConditionVariableBroadcast(&slot->active_cv);
3355 rhaas 741 ECB :
742 : /*
743 : * Slot is dead and doesn't prevent resource removal anymore, recompute
744 : * limits.
745 : */
3324 rhaas 746 GIC 405 : ReplicationSlotsComputeRequiredXmin(false);
3355 747 405 : ReplicationSlotsComputeRequiredLSN();
3355 rhaas 748 ECB :
749 : /*
750 : * If removing the directory fails, the worst thing that will happen is
751 : * that the user won't be able to create a new slot with the same name
752 : * until the next server restart. We warn about it, but that's all.
753 : */
3355 rhaas 754 GIC 405 : if (!rmtree(tmppath, true))
3355 rhaas 755 UIC 0 : ereport(WARNING,
1678 michael 756 ECB : (errmsg("could not remove directory \"%s\"", tmppath)));
3355 rhaas 757 EUB :
758 : /*
759 : * Drop the statistics entry for the replication slot. Do this while
760 : * holding ReplicationSlotAllocationLock so that we don't drop a
761 : * statistics entry for another slot with the same name just created in
762 : * another session.
763 : */
913 akapila 764 GIC 405 : if (SlotIsLogical(slot))
368 andres 765 298 : pgstat_drop_replslot(slot);
913 akapila 766 ECB :
3355 rhaas 767 : /*
768 : * We release this at the very end, so that nobody starts trying to create
769 : * a slot while we're still cleaning up the detritus of the old one.
770 : */
3355 rhaas 771 GIC 405 : LWLockRelease(ReplicationSlotAllocationLock);
772 405 : }
3355 rhaas 773 ECB :
774 : /*
775 : * Serialize the currently acquired slot's state from memory to disk, thereby
776 : * guaranteeing the current state will survive a crash.
777 : */
778 : void
3355 rhaas 779 GIC 971 : ReplicationSlotSave(void)
780 : {
3355 rhaas 781 ECB : char path[MAXPGPATH];
782 :
3355 rhaas 783 GIC 971 : Assert(MyReplicationSlot != NULL);
784 :
3355 rhaas 785 CBC 971 : sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
3355 rhaas 786 GIC 971 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
3355 rhaas 787 CBC 971 : }
3355 rhaas 788 ECB :
789 : /*
790 : * Signal that it would be useful if the currently acquired slot would be
791 : * flushed out to disk.
792 : *
793 : * Note that the actual flush to disk can be delayed for a long time, if
794 : * required for correctness explicitly do a ReplicationSlotSave().
795 : */
796 : void
3355 rhaas 797 GIC 18803 : ReplicationSlotMarkDirty(void)
798 : {
2742 rhaas 799 CBC 18803 : ReplicationSlot *slot = MyReplicationSlot;
800 :
3355 801 18803 : Assert(MyReplicationSlot != NULL);
802 :
2742 803 18803 : SpinLockAcquire(&slot->mutex);
2742 rhaas 804 GIC 18803 : MyReplicationSlot->just_dirtied = true;
2742 rhaas 805 CBC 18803 : MyReplicationSlot->dirty = true;
806 18803 : SpinLockRelease(&slot->mutex);
3355 807 18803 : }
3355 rhaas 808 ECB :
3324 809 : /*
810 : * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
811 : * guaranteeing it will be there after an eventual crash.
812 : */
813 : void
3324 rhaas 814 GIC 331 : ReplicationSlotPersist(void)
815 : {
3324 rhaas 816 CBC 331 : ReplicationSlot *slot = MyReplicationSlot;
817 :
818 331 : Assert(slot != NULL);
3324 rhaas 819 GIC 331 : Assert(slot->data.persistency != RS_PERSISTENT);
3324 rhaas 820 ECB :
2742 rhaas 821 CBC 331 : SpinLockAcquire(&slot->mutex);
2742 rhaas 822 GIC 331 : slot->data.persistency = RS_PERSISTENT;
2742 rhaas 823 CBC 331 : SpinLockRelease(&slot->mutex);
3324 rhaas 824 ECB :
3324 rhaas 825 CBC 331 : ReplicationSlotMarkDirty();
3324 rhaas 826 GIC 331 : ReplicationSlotSave();
3324 rhaas 827 CBC 331 : }
3324 rhaas 828 ECB :
3355 829 : /*
830 : * Compute the oldest xmin across all slots and store it in the ProcArray.
831 : *
832 : * If already_locked is true, ProcArrayLock has already been acquired
833 : * exclusively.
834 : */
835 : void
3324 rhaas 836 GIC 2189 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
837 : {
3355 rhaas 838 ECB : int i;
3355 rhaas 839 GIC 2189 : TransactionId agg_xmin = InvalidTransactionId;
3324 840 2189 : TransactionId agg_catalog_xmin = InvalidTransactionId;
3355 rhaas 841 ECB :
3355 rhaas 842 CBC 2189 : Assert(ReplicationSlotCtl != NULL);
843 :
2177 andres 844 2189 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
845 :
3355 rhaas 846 22253 : for (i = 0; i < max_replication_slots; i++)
847 : {
848 20064 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
849 : TransactionId effective_xmin;
3260 bruce 850 ECB : TransactionId effective_catalog_xmin;
851 : bool invalidated;
852 :
3355 rhaas 853 GIC 20064 : if (!s->in_use)
854 18542 : continue;
3355 rhaas 855 ECB :
2742 rhaas 856 CBC 1522 : SpinLockAcquire(&s->mutex);
2742 rhaas 857 GIC 1522 : effective_xmin = s->effective_xmin;
2742 rhaas 858 CBC 1522 : effective_catalog_xmin = s->effective_catalog_xmin;
2 andres 859 GNC 1522 : invalidated = s->data.invalidated != RS_INVAL_NONE;
2742 rhaas 860 CBC 1522 : SpinLockRelease(&s->mutex);
3355 rhaas 861 ECB :
862 : /* invalidated slots need not apply */
138 alvherre 863 GIC 1522 : if (invalidated)
138 alvherre 864 CBC 21 : continue;
138 alvherre 865 ECB :
866 : /* check the data xmin */
3355 rhaas 867 GIC 1501 : if (TransactionIdIsValid(effective_xmin) &&
3355 rhaas 868 CBC 3 : (!TransactionIdIsValid(agg_xmin) ||
869 3 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
870 210 : agg_xmin = effective_xmin;
3324 rhaas 871 ECB :
872 : /* check the catalog xmin */
3324 rhaas 873 GIC 1501 : if (TransactionIdIsValid(effective_catalog_xmin) &&
3324 rhaas 874 CBC 599 : (!TransactionIdIsValid(agg_catalog_xmin) ||
875 599 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
876 789 : agg_catalog_xmin = effective_catalog_xmin;
3355 rhaas 877 ECB : }
878 :
2177 andres 879 GIC 2189 : LWLockRelease(ReplicationSlotControlLock);
3324 rhaas 880 ECB :
3324 rhaas 881 GIC 2189 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
3355 rhaas 882 CBC 2189 : }
3355 rhaas 883 ECB :
884 : /*
885 : * Compute the oldest restart LSN across all slots and inform xlog module.
886 : *
887 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
888 : * purpose, we don't try to account for that, because this module doesn't
889 : * know what to compare against.
890 : */
891 : void
3355 rhaas 892 GIC 19626 : ReplicationSlotsComputeRequiredLSN(void)
3355 rhaas 893 ECB : {
894 : int i;
3260 bruce 895 GIC 19626 : XLogRecPtr min_required = InvalidXLogRecPtr;
3355 rhaas 896 ECB :
3355 rhaas 897 GIC 19626 : Assert(ReplicationSlotCtl != NULL);
3355 rhaas 898 ECB :
3355 rhaas 899 GIC 19626 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3355 rhaas 900 CBC 212248 : for (i = 0; i < max_replication_slots; i++)
3355 rhaas 901 ECB : {
3355 rhaas 902 GIC 192622 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
3260 bruce 903 ECB : XLogRecPtr restart_lsn;
904 : bool invalidated;
905 :
3355 rhaas 906 GIC 192622 : if (!s->in_use)
907 173627 : continue;
3355 rhaas 908 ECB :
2742 rhaas 909 CBC 18995 : SpinLockAcquire(&s->mutex);
2742 rhaas 910 GIC 18995 : restart_lsn = s->data.restart_lsn;
2 andres 911 GNC 18995 : invalidated = s->data.invalidated != RS_INVAL_NONE;
2742 rhaas 912 CBC 18995 : SpinLockRelease(&s->mutex);
3355 rhaas 913 ECB :
914 : /* invalidated slots need not apply */
2 andres 915 GNC 18995 : if (invalidated)
916 38 : continue;
917 :
3355 rhaas 918 CBC 18957 : if (restart_lsn != InvalidXLogRecPtr &&
919 648 : (min_required == InvalidXLogRecPtr ||
920 : restart_lsn < min_required))
3355 rhaas 921 GIC 18389 : min_required = restart_lsn;
3355 rhaas 922 ECB : }
3355 rhaas 923 CBC 19626 : LWLockRelease(ReplicationSlotControlLock);
924 :
925 19626 : XLogSetReplicationSlotMinimumLSN(min_required);
926 19626 : }
927 :
3324 rhaas 928 ECB : /*
929 : * Compute the oldest WAL LSN required by *logical* decoding slots..
930 : *
931 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
932 : * slots exist.
933 : *
934 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
935 : * ignores physical replication slots.
936 : *
937 : * The results aren't required frequently, so we don't maintain a precomputed
938 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
939 : */
940 : XLogRecPtr
3324 rhaas 941 GIC 4726 : ReplicationSlotsComputeLogicalRestartLSN(void)
942 : {
943 4726 : XLogRecPtr result = InvalidXLogRecPtr;
944 : int i;
945 :
946 4726 : if (max_replication_slots <= 0)
3324 rhaas 947 UIC 0 : return InvalidXLogRecPtr;
3324 rhaas 948 ECB :
3324 rhaas 949 GIC 4726 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3324 rhaas 950 ECB :
3324 rhaas 951 GIC 51678 : for (i = 0; i < max_replication_slots; i++)
952 : {
2742 rhaas 953 ECB : ReplicationSlot *s;
3260 bruce 954 EUB : XLogRecPtr restart_lsn;
955 : bool invalidated;
956 :
3324 rhaas 957 CBC 46952 : s = &ReplicationSlotCtl->replication_slots[i];
958 :
3324 rhaas 959 ECB : /* cannot change while ReplicationSlotCtlLock is held */
3324 rhaas 960 GIC 46952 : if (!s->in_use)
961 46714 : continue;
962 :
963 : /* we're only interested in logical slots */
2798 andres 964 238 : if (!SlotIsLogical(s))
3324 rhaas 965 CBC 124 : continue;
966 :
967 : /* read once, it's ok if it increases while we're checking */
968 114 : SpinLockAcquire(&s->mutex);
969 114 : restart_lsn = s->data.restart_lsn;
2 andres 970 GNC 114 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3324 rhaas 971 GIC 114 : SpinLockRelease(&s->mutex);
972 :
973 : /* invalidated slots need not apply */
2 andres 974 GNC 114 : if (invalidated)
2 andres 975 UNC 0 : continue;
976 :
1097 alvherre 977 CBC 114 : if (restart_lsn == InvalidXLogRecPtr)
1097 alvherre 978 LBC 0 : continue;
979 :
3324 rhaas 980 GIC 114 : if (result == InvalidXLogRecPtr ||
3324 rhaas 981 ECB : restart_lsn < result)
3324 rhaas 982 CBC 98 : result = restart_lsn;
3324 rhaas 983 ECB : }
984 :
3324 rhaas 985 GIC 4726 : LWLockRelease(ReplicationSlotControlLock);
986 :
3324 rhaas 987 CBC 4726 : return result;
3324 rhaas 988 EUB : }
989 :
3324 rhaas 990 ECB : /*
3324 rhaas 991 EUB : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
992 : * passed database oid.
3324 rhaas 993 ECB : *
994 : * Returns true if there are any slots referencing the database. *nslots will
995 : * be set to the absolute number of slots in the database, *nactive to ones
996 : * currently active.
997 : */
998 : bool
3324 rhaas 999 GIC 21 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
3324 rhaas 1000 ECB : {
1001 : int i;
1002 :
3324 rhaas 1003 GIC 21 : *nslots = *nactive = 0;
1004 :
1005 21 : if (max_replication_slots <= 0)
3324 rhaas 1006 UIC 0 : return false;
1007 :
3324 rhaas 1008 GIC 21 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1009 218 : for (i = 0; i < max_replication_slots; i++)
1010 : {
1011 : ReplicationSlot *s;
3324 rhaas 1012 ECB :
3324 rhaas 1013 GIC 197 : s = &ReplicationSlotCtl->replication_slots[i];
1014 :
1015 : /* cannot change while ReplicationSlotCtlLock is held */
3324 rhaas 1016 CBC 197 : if (!s->in_use)
3324 rhaas 1017 GIC 183 : continue;
3324 rhaas 1018 ECB :
2798 andres 1019 EUB : /* only logical slots are database specific, skip */
2798 andres 1020 GIC 14 : if (!SlotIsLogical(s))
3320 bruce 1021 CBC 8 : continue;
3324 rhaas 1022 ECB :
1023 : /* not our database, skip */
3324 rhaas 1024 GIC 6 : if (s->data.database != dboid)
1025 3 : continue;
3324 rhaas 1026 ECB :
1027 : /* NB: intentionally counting invalidated slots */
1028 :
1029 : /* count slots with spinlock held */
3324 rhaas 1030 GIC 3 : SpinLockAcquire(&s->mutex);
3324 rhaas 1031 CBC 3 : (*nslots)++;
2910 andres 1032 3 : if (s->active_pid != 0)
3324 rhaas 1033 GIC 1 : (*nactive)++;
1034 3 : SpinLockRelease(&s->mutex);
3324 rhaas 1035 ECB : }
3324 rhaas 1036 CBC 21 : LWLockRelease(ReplicationSlotControlLock);
1037 :
3324 rhaas 1038 GIC 21 : if (*nslots > 0)
3324 rhaas 1039 CBC 3 : return true;
1040 18 : return false;
1041 : }
1042 :
1043 : /*
1044 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
2203 simon 1045 ECB : * passed database oid. The caller should hold an exclusive lock on the
1046 : * pg_database oid for the database to prevent creation of new slots on the db
1047 : * or replay from existing slots.
1048 : *
1049 : * Another session that concurrently acquires an existing slot on the target DB
1050 : * (most likely to drop it) may cause this function to ERROR. If that happens
1051 : * it may have dropped some but not all slots.
1052 : *
2195 andres 1053 : * This routine isn't as efficient as it could be - but we don't drop
1054 : * databases often, especially databases with lots of slots.
2203 simon 1055 : */
1056 : void
2203 simon 1057 GIC 29 : ReplicationSlotsDropDBSlots(Oid dboid)
1058 : {
1059 : int i;
1060 :
1061 29 : if (max_replication_slots <= 0)
2203 simon 1062 UIC 0 : return;
1063 :
2203 simon 1064 GIC 29 : restart:
1065 34 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1066 303 : for (i = 0; i < max_replication_slots; i++)
1067 : {
1068 : ReplicationSlot *s;
1069 : char *slotname;
1070 : int active_pid;
1071 :
2203 simon 1072 CBC 274 : s = &ReplicationSlotCtl->replication_slots[i];
1073 :
1074 : /* cannot change while ReplicationSlotCtlLock is held */
2203 simon 1075 GIC 274 : if (!s->in_use)
2203 simon 1076 CBC 252 : continue;
2203 simon 1077 EUB :
1078 : /* only logical slots are database specific, skip */
2203 simon 1079 CBC 22 : if (!SlotIsLogical(s))
1080 10 : continue;
2203 simon 1081 ECB :
1082 : /* not our database, skip */
2203 simon 1083 GIC 12 : if (s->data.database != dboid)
1084 7 : continue;
1085 :
1086 : /* NB: intentionally including invalidated slots */
1087 :
1088 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
2203 simon 1089 CBC 5 : SpinLockAcquire(&s->mutex);
1090 : /* can't change while ReplicationSlotControlLock is held */
2195 andres 1091 GIC 5 : slotname = NameStr(s->data.name);
2203 simon 1092 CBC 5 : active_pid = s->active_pid;
1093 5 : if (active_pid == 0)
1094 : {
2203 simon 1095 GIC 5 : MyReplicationSlot = s;
2203 simon 1096 CBC 5 : s->active_pid = MyProcPid;
2203 simon 1097 ECB : }
2203 simon 1098 GIC 5 : SpinLockRelease(&s->mutex);
1099 :
2203 simon 1100 ECB : /*
2195 andres 1101 : * Even though we hold an exclusive lock on the database object a
1102 : * logical slot for that DB can still be active, e.g. if it's
1103 : * concurrently being dropped by a backend connected to another DB.
1104 : *
1105 : * That's fairly unlikely in practice, so we'll just bail out.
2203 simon 1106 : */
2203 simon 1107 GIC 5 : if (active_pid)
2195 andres 1108 LBC 0 : ereport(ERROR,
2195 andres 1109 ECB : (errcode(ERRCODE_OBJECT_IN_USE),
1110 : errmsg("replication slot \"%s\" is active for PID %d",
1111 : slotname, active_pid)));
2203 simon 1112 :
1113 : /*
1114 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
2195 andres 1115 : * holding ReplicationSlotControlLock over filesystem operations,
1116 : * release ReplicationSlotControlLock and use
1117 : * ReplicationSlotDropAcquired.
1118 : *
1119 : * As that means the set of slots could change, restart scan from the
1120 : * beginning each time we release the lock.
1121 : */
2203 simon 1122 GIC 5 : LWLockRelease(ReplicationSlotControlLock);
1123 5 : ReplicationSlotDropAcquired();
2203 simon 1124 CBC 5 : goto restart;
2203 simon 1125 EUB : }
2203 simon 1126 GIC 29 : LWLockRelease(ReplicationSlotControlLock);
1127 : }
1128 :
1129 :
1130 : /*
1131 : * Check whether the server's configuration supports using replication
1132 : * slots.
1133 : */
1134 : void
3355 rhaas 1135 1320 : CheckSlotRequirements(void)
1136 : {
1137 : /*
1138 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1621 andres 1139 ECB : * needs the same check.
1140 : */
1141 :
3355 rhaas 1142 GIC 1320 : if (max_replication_slots == 0)
3355 rhaas 1143 LBC 0 : ereport(ERROR,
1144 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1145 : errmsg("replication slots can only be used if max_replication_slots > 0")));
1146 :
2596 peter_e 1147 GIC 1320 : if (wal_level < WAL_LEVEL_REPLICA)
3355 rhaas 1148 UIC 0 : ereport(ERROR,
1149 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1150 : errmsg("replication slots can only be used if wal_level >= replica")));
3355 rhaas 1151 GIC 1320 : }
3355 rhaas 1152 ECB :
1153 : /*
1154 : * Check whether the user has privilege to use replication slots.
1155 : */
1156 : void
572 michael 1157 GIC 441 : CheckSlotPermissions(void)
1158 : {
24 peter 1159 GNC 441 : if (!has_rolreplication(GetUserId()))
572 michael 1160 GBC 4 : ereport(ERROR,
1161 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1162 : errmsg("permission denied to use replication slots"),
1163 : errdetail("Only roles with the %s attribute may use replication slots.",
1164 : "REPLICATION")));
572 michael 1165 GIC 437 : }
572 michael 1166 ECB :
2798 andres 1167 EUB : /*
1168 : * Reserve WAL for the currently active slot.
1169 : *
2798 andres 1170 ECB : * Compute and set restart_lsn in a manner that's appropriate for the type of
1171 : * the slot and concurrency safe.
1172 : */
1173 : void
2798 andres 1174 GIC 442 : ReplicationSlotReserveWal(void)
1175 : {
2798 andres 1176 CBC 442 : ReplicationSlot *slot = MyReplicationSlot;
1177 :
1178 442 : Assert(slot != NULL);
1179 442 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1180 :
1181 : /*
1182 : * The replication slot mechanism is used to prevent removal of required
1183 : * WAL. As there is no interlock between this routine and checkpoints, WAL
2798 andres 1184 ECB : * segments could concurrently be removed when a now stale return value of
1185 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1186 : * this happens we'll just retry.
1187 : */
1188 : while (true)
2798 andres 1189 UIC 0 : {
1190 : XLogSegNo segno;
1191 : XLogRecPtr restart_lsn;
1192 :
2798 andres 1193 ECB : /*
1194 : * For logical slots log a standby snapshot and start logical decoding
1195 : * at exactly that position. That allows the slot to start up more
1196 : * quickly. But on a standby we cannot do WAL writes, so just use the
1197 : * replay pointer; effectively, an attempt to create a logical slot on
1198 : * standby will cause it to wait for an xl_running_xact record to be
1199 : * logged independently on the primary, so that a snapshot can be
1200 : * built using the record.
1201 : *
1202 : * None of this is needed (or indeed helpful) for physical slots as
1203 : * they'll start replay at the last logged checkpoint anyway. Instead
1204 : * return the location of the last redo LSN. While that slightly
1205 : * increases the chance that we have to retry, it's where a base
1206 : * backup has to start replay at.
1207 : */
1 andres 1208 GNC 442 : if (SlotIsPhysical(slot))
1 andres 1209 GIC 106 : restart_lsn = GetRedoRecPtr();
1 andres 1210 GNC 336 : else if (RecoveryInProgress())
1211 21 : restart_lsn = GetXLogReplayRecPtr(NULL);
1212 : else
1762 michael 1213 315 : restart_lsn = GetXLogInsertRecPtr();
1214 :
1 andres 1215 442 : SpinLockAcquire(&slot->mutex);
1216 442 : slot->data.restart_lsn = restart_lsn;
1217 442 : SpinLockRelease(&slot->mutex);
2798 andres 1218 ECB :
1219 : /* prevent WAL removal as fast as possible */
2798 andres 1220 CBC 442 : ReplicationSlotsComputeRequiredLSN();
2798 andres 1221 ECB :
1222 : /*
1223 : * If all required WAL is still there, great, otherwise retry. The
1224 : * slot should prevent further removal of WAL, unless there's a
1225 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1226 : * the new restart_lsn above, so normally we should never need to loop
1227 : * more than twice.
1228 : */
2028 andres 1229 GIC 442 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
2798 andres 1230 CBC 442 : if (XLogGetLastRemovedSegno() < segno)
2798 andres 1231 GIC 442 : break;
1232 : }
1233 :
1 andres 1234 GNC 442 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1235 : {
1236 : XLogRecPtr flushptr;
1237 :
1238 : /* make sure we have enough information to start */
1239 315 : flushptr = LogStandbySnapshot();
1240 :
1241 : /* and make sure it's fsynced to disk */
1242 315 : XLogFlush(flushptr);
1243 : }
2798 andres 1244 GIC 442 : }
1245 :
1246 : /*
1247 : * Report that replication slot needs to be invalidated
1248 : */
1249 : static void
2 andres 1250 GNC 20 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1251 : bool terminating,
1252 : int pid,
1253 : NameData slotname,
1254 : XLogRecPtr restart_lsn,
1255 : XLogRecPtr oldestLSN,
1256 : TransactionId snapshotConflictHorizon)
1257 : {
1258 : StringInfoData err_detail;
1259 20 : bool hint = false;
1260 :
1261 20 : initStringInfo(&err_detail);
1262 :
1263 20 : switch (cause)
1264 : {
1265 5 : case RS_INVAL_WAL_REMOVED:
1266 5 : hint = true;
1267 5 : appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
1268 5 : LSN_FORMAT_ARGS(restart_lsn),
1269 5 : (unsigned long long) (oldestLSN - restart_lsn));
1270 5 : break;
1271 12 : case RS_INVAL_HORIZON:
1272 12 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1273 : snapshotConflictHorizon);
1274 12 : break;
1275 :
1276 3 : case RS_INVAL_WAL_LEVEL:
1277 3 : appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
1278 3 : break;
2 andres 1279 UNC 0 : case RS_INVAL_NONE:
1280 0 : pg_unreachable();
1281 : }
1282 :
2 andres 1283 GNC 20 : ereport(LOG,
1284 : terminating ?
1285 : errmsg("terminating process %d to release replication slot \"%s\"",
1286 : pid, NameStr(slotname)) :
1287 : errmsg("invalidating obsolete replication slot \"%s\"",
1288 : NameStr(slotname)),
1289 : errdetail_internal("%s", err_detail.data),
1290 : hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
1291 :
1292 20 : pfree(err_detail.data);
1293 20 : }
1294 :
1295 : /*
1296 : * Helper for InvalidateObsoleteReplicationSlots
1297 : *
1298 : * Acquires the given slot and mark it invalid, if necessary and possible.
1299 : *
667 alvherre 1300 ECB : * Returns whether ReplicationSlotControlLock was released in the interim (and
1301 : * in that case we're not holding the lock at return, otherwise we are).
1302 : *
1303 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1304 : *
1305 : * This is inherently racy, because we release the LWLock
1306 : * for syscalls, so caller must restart if we return true.
1307 : */
1308 : static bool
2 andres 1309 GNC 201 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
1310 : ReplicationSlot *s,
1311 : XLogRecPtr oldestLSN,
1312 : Oid dboid, TransactionId snapshotConflictHorizon,
632 alvherre 1313 ECB : bool *invalidated)
1314 : {
667 alvherre 1315 GIC 201 : int last_signaled_pid = 0;
667 alvherre 1316 CBC 201 : bool released_lock = false;
1317 :
667 alvherre 1318 ECB : for (;;)
1097 alvherre 1319 GIC 7 : {
1320 : XLogRecPtr restart_lsn;
1321 : NameData slotname;
667 1322 208 : int active_pid = 0;
2 andres 1323 GNC 208 : ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
1324 :
667 alvherre 1325 CBC 208 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1326 :
1097 alvherre 1327 GIC 208 : if (!s->in_use)
1328 : {
667 alvherre 1329 UIC 0 : if (released_lock)
1330 0 : LWLockRelease(ReplicationSlotControlLock);
1331 0 : break;
1332 : }
1333 :
667 alvherre 1334 ECB : /*
1335 : * Check if the slot needs to be invalidated. If it needs to be
1336 : * invalidated, and is not currently acquired, acquire it and mark it
1337 : * as having been invalidated. We do this with the spinlock held to
1338 : * avoid race conditions -- for example the restart_lsn could move
1339 : * forward, or the slot could be dropped.
1340 : */
1097 alvherre 1341 CBC 208 : SpinLockAcquire(&s->mutex);
667 alvherre 1342 ECB :
1097 alvherre 1343 CBC 208 : restart_lsn = s->data.restart_lsn;
1024 fujii 1344 ECB :
667 alvherre 1345 : /*
1346 : * If the slot is already invalid or is a non conflicting slot, we
1347 : * don't need to do anything.
1348 : */
2 andres 1349 GNC 208 : if (s->data.invalidated == RS_INVAL_NONE)
1350 : {
1351 174 : switch (cause)
1352 : {
1353 117 : case RS_INVAL_WAL_REMOVED:
1354 117 : if (s->data.restart_lsn != InvalidXLogRecPtr &&
1355 108 : s->data.restart_lsn < oldestLSN)
1356 5 : conflict = cause;
1357 117 : break;
1358 52 : case RS_INVAL_HORIZON:
1359 52 : if (!SlotIsLogical(s))
1360 22 : break;
1361 : /* invalid DB oid signals a shared relation */
1362 30 : if (dboid != InvalidOid && dboid != s->data.database)
2 andres 1363 UNC 0 : break;
2 andres 1364 GNC 30 : if (TransactionIdIsValid(s->effective_xmin) &&
2 andres 1365 UNC 0 : TransactionIdPrecedesOrEquals(s->effective_xmin,
1366 : snapshotConflictHorizon))
1367 0 : conflict = cause;
2 andres 1368 GNC 60 : else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
1369 30 : TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
1370 : snapshotConflictHorizon))
1371 12 : conflict = cause;
1372 30 : break;
1373 5 : case RS_INVAL_WAL_LEVEL:
1374 5 : if (SlotIsLogical(s))
1375 3 : conflict = cause;
1376 5 : break;
2 andres 1377 UNC 0 : case RS_INVAL_NONE:
1378 0 : pg_unreachable();
1379 : }
1380 : }
1381 :
1382 : /* if there's no conflict, we're done */
2 andres 1383 GNC 208 : if (conflict == RS_INVAL_NONE)
1384 : {
667 alvherre 1385 CBC 188 : SpinLockRelease(&s->mutex);
1386 188 : if (released_lock)
667 alvherre 1387 LBC 0 : LWLockRelease(ReplicationSlotControlLock);
667 alvherre 1388 GBC 188 : break;
667 alvherre 1389 EUB : }
1390 :
667 alvherre 1391 GIC 20 : slotname = s->data.name;
667 alvherre 1392 CBC 20 : active_pid = s->active_pid;
1393 :
1394 : /*
1395 : * If the slot can be acquired, do so and mark it invalidated
1396 : * immediately. Otherwise we'll signal the owning process, below, and
1397 : * retry.
1398 : */
667 alvherre 1399 GIC 20 : if (active_pid == 0)
1400 : {
667 alvherre 1401 CBC 13 : MyReplicationSlot = s;
1402 13 : s->active_pid = MyProcPid;
2 andres 1403 GNC 13 : s->data.invalidated = conflict;
1404 :
1405 : /*
1406 : * XXX: We should consider not overwriting restart_lsn and instead
1407 : * just rely on .invalidated.
1408 : */
1409 13 : if (conflict == RS_INVAL_WAL_REMOVED)
1410 3 : s->data.restart_lsn = InvalidXLogRecPtr;
1411 :
1412 : /* Let caller know */
632 alvherre 1413 GIC 13 : *invalidated = true;
1414 : }
1415 :
667 1416 20 : SpinLockRelease(&s->mutex);
1417 :
1418 20 : if (active_pid != 0)
1419 : {
1420 : /*
1421 : * Prepare the sleep on the slot's condition variable before
1422 : * releasing the lock, to close a possible race condition if the
1423 : * slot is released before the sleep below.
1024 fujii 1424 ECB : */
667 alvherre 1425 GIC 7 : ConditionVariablePrepareToSleep(&s->active_cv);
1426 :
1427 7 : LWLockRelease(ReplicationSlotControlLock);
1428 7 : released_lock = true;
1429 :
1024 fujii 1430 ECB : /*
667 alvherre 1431 : * Signal to terminate the process that owns the slot, if we
1432 : * haven't already signalled it. (Avoidance of repeated
1433 : * signalling is the only reason for there to be a loop in this
1434 : * routine; otherwise we could rely on caller's restart loop.)
1435 : *
1436 : * There is the race condition that other process may own the slot
1437 : * after its current owner process is terminated and before this
1438 : * process owns it. To handle that, we signal only if the PID of
1439 : * the owning process has changed from the previous time. (This
1440 : * logic assumes that the same PID is not reused very quickly.)
1441 : */
667 alvherre 1442 CBC 7 : if (last_signaled_pid != active_pid)
1443 : {
2 andres 1444 GNC 7 : ReportSlotInvalidation(conflict, true, active_pid,
1445 : slotname, restart_lsn,
1446 : oldestLSN, snapshotConflictHorizon);
1447 :
1448 7 : if (MyBackendType == B_STARTUP)
1449 5 : (void) SendProcSignal(active_pid,
1450 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1451 : InvalidBackendId);
1452 : else
1453 2 : (void) kill(active_pid, SIGTERM);
1454 :
667 alvherre 1455 GIC 7 : last_signaled_pid = active_pid;
1456 : }
1457 :
1458 : /* Wait until the slot is released. */
1459 7 : ConditionVariableSleep(&s->active_cv,
1460 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1461 :
667 alvherre 1462 ECB : /*
1463 : * Re-acquire lock and start over; we expect to invalidate the
1464 : * slot next time (unless another process acquires the slot in the
1465 : * meantime).
1466 : */
667 alvherre 1467 GIC 7 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1468 7 : continue;
1469 : }
667 alvherre 1470 ECB : else
1471 : {
1472 : /*
1473 : * We hold the slot now and have already invalidated it; flush it
1474 : * to ensure that state persists.
1475 : *
1476 : * Don't want to hold ReplicationSlotControlLock across file
1477 : * system operations, so release it now but be sure to tell caller
1478 : * to restart from scratch.
1479 : */
667 alvherre 1480 CBC 13 : LWLockRelease(ReplicationSlotControlLock);
1481 13 : released_lock = true;
1482 :
667 alvherre 1483 ECB : /* Make sure the invalidated state persists across server restart */
667 alvherre 1484 GBC 13 : ReplicationSlotMarkDirty();
667 alvherre 1485 CBC 13 : ReplicationSlotSave();
667 alvherre 1486 GBC 13 : ReplicationSlotRelease();
2 andres 1487 GNC 13 : pgstat_drop_replslot(s);
1488 :
1489 13 : ReportSlotInvalidation(conflict, false, active_pid,
1490 : slotname, restart_lsn,
1491 : oldestLSN, snapshotConflictHorizon);
1097 alvherre 1492 ECB :
667 1493 : /* done with this slot for now */
667 alvherre 1494 CBC 13 : break;
667 alvherre 1495 ECB : }
1496 : }
1017 1497 :
667 alvherre 1498 GBC 201 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1097 alvherre 1499 EUB :
667 alvherre 1500 GIC 201 : return released_lock;
1501 : }
1502 :
1503 : /*
1504 : * Invalidate slots that require resources about to be removed.
667 alvherre 1505 ECB : *
632 1506 : * Returns true when any slot have got invalidated.
632 alvherre 1507 EUB : *
1508 : * Whether a slot needs to be invalidated depends on the cause. A slot is
1509 : * removed if it:
1510 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1511 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1512 : * db; dboid may be InvalidOid for shared relations
1513 : * - RS_INVAL_WAL_LEVEL: is logical
1514 : *
667 alvherre 1515 ECB : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1516 : */
1517 : bool
2 andres 1518 GNC 2383 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
1519 : XLogSegNo oldestSegno, Oid dboid,
1520 : TransactionId snapshotConflictHorizon)
667 alvherre 1521 ECB : {
1522 : XLogRecPtr oldestLSN;
632 alvherre 1523 GIC 2383 : bool invalidated = false;
1524 :
2 andres 1525 GNC 2383 : Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1526 2383 : Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1527 2383 : Assert(cause != RS_INVAL_NONE);
1528 :
1529 2383 : if (max_replication_slots == 0)
2 andres 1530 UNC 0 : return invalidated;
1531 :
667 alvherre 1532 GIC 2383 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1533 :
1534 2396 : restart:
667 alvherre 1535 CBC 2396 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
667 alvherre 1536 GIC 25961 : for (int i = 0; i < max_replication_slots; i++)
667 alvherre 1537 ECB : {
667 alvherre 1538 CBC 23578 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
667 alvherre 1539 ECB :
667 alvherre 1540 GIC 23578 : if (!s->in_use)
1541 23377 : continue;
1542 :
2 andres 1543 GNC 201 : if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1544 : snapshotConflictHorizon,
1545 : &invalidated))
1546 : {
667 alvherre 1547 ECB : /* if the lock was released, start from scratch */
667 alvherre 1548 CBC 13 : goto restart;
1549 : }
1550 : }
1097 1551 2383 : LWLockRelease(ReplicationSlotControlLock);
1552 :
1553 : /*
632 alvherre 1554 ECB : * If any slots have been invalidated, recalculate the resource limits.
1555 : */
632 alvherre 1556 CBC 2383 : if (invalidated)
1557 : {
632 alvherre 1558 GIC 8 : ReplicationSlotsComputeRequiredXmin(false);
1559 8 : ReplicationSlotsComputeRequiredLSN();
1560 : }
1561 :
1562 2383 : return invalidated;
1097 alvherre 1563 ECB : }
1564 :
3355 rhaas 1565 : /*
1566 : * Flush all replication slots to disk.
1567 : *
1568 : * This needn't actually be part of a checkpoint, but it's a convenient
1569 : * location.
1570 : */
1571 : void
3355 rhaas 1572 GIC 2363 : CheckPointReplicationSlots(void)
1573 : {
1574 : int i;
1575 :
3071 peter_e 1576 2363 : elog(DEBUG1, "performing replication slot checkpoint");
1577 :
1578 : /*
1579 : * Prevent any slot from being created/dropped while we're active. As we
3355 rhaas 1580 ECB : * explicitly do *not* want to block iterating over replication_slots or
1581 : * acquiring a slot we cannot take the control lock - but that's OK,
3260 bruce 1582 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
1583 : * enough to guarantee that nobody can change the in_use bits on us.
1584 : */
3355 rhaas 1585 GIC 2363 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
3355 rhaas 1586 ECB :
3355 rhaas 1587 CBC 25839 : for (i = 0; i < max_replication_slots; i++)
1588 : {
3355 rhaas 1589 GIC 23476 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1590 : char path[MAXPGPATH];
3355 rhaas 1591 ECB :
3355 rhaas 1592 GIC 23476 : if (!s->in_use)
3355 rhaas 1593 CBC 23357 : continue;
1594 :
1595 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
3355 rhaas 1596 GIC 119 : sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
3355 rhaas 1597 CBC 119 : SaveSlotToPath(s, path, LOG);
1598 : }
3355 rhaas 1599 GIC 2363 : LWLockRelease(ReplicationSlotAllocationLock);
1600 2363 : }
1601 :
1602 : /*
1603 : * Load all replication slots from disk into memory at server startup. This
1604 : * needs to be run before we start crash recovery.
3355 rhaas 1605 ECB : */
1606 : void
3223 andres 1607 GIC 1176 : StartupReplicationSlots(void)
1608 : {
1609 : DIR *replication_dir;
1610 : struct dirent *replication_de;
1611 :
3071 peter_e 1612 1176 : elog(DEBUG1, "starting up replication slots");
1613 :
1614 : /* restore all slots by iterating over all on-disk entries */
3355 rhaas 1615 1176 : replication_dir = AllocateDir("pg_replslot");
1616 3562 : while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1617 : {
2189 peter_e 1618 ECB : char path[MAXPGPATH + 12];
1619 : PGFileType de_type;
1620 :
3355 rhaas 1621 GIC 2386 : if (strcmp(replication_de->d_name, ".") == 0 ||
3355 rhaas 1622 CBC 1210 : strcmp(replication_de->d_name, "..") == 0)
1623 2352 : continue;
3355 rhaas 1624 ECB :
2189 peter_e 1625 CBC 34 : snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
219 michael 1626 GNC 34 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1627 :
3355 rhaas 1628 ECB : /* we're only creating directories here, skip if it's not our's */
219 michael 1629 GNC 34 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
3355 rhaas 1630 UIC 0 : continue;
1631 :
1632 : /* we crashed while a slot was being setup or deleted, clean up */
3018 andres 1633 CBC 34 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
1634 : {
3355 rhaas 1635 UIC 0 : if (!rmtree(path, true))
1636 : {
3355 rhaas 1637 LBC 0 : ereport(WARNING,
1638 : (errmsg("could not remove directory \"%s\"",
1678 michael 1639 ECB : path)));
3355 rhaas 1640 UIC 0 : continue;
1641 : }
1642 0 : fsync_fname("pg_replslot", true);
1643 0 : continue;
1644 : }
1645 :
1646 : /* looks like a slot in a normal state, restore */
3355 rhaas 1647 GIC 34 : RestoreSlotFromDisk(replication_de->d_name);
1648 : }
1649 1176 : FreeDir(replication_dir);
1650 :
1651 : /* currently no slots exist, we're done. */
1652 1176 : if (max_replication_slots <= 0)
3355 rhaas 1653 UIC 0 : return;
1654 :
1655 : /* Now that we have recovered all the data, compute replication xmin */
3324 rhaas 1656 GIC 1176 : ReplicationSlotsComputeRequiredXmin(false);
3355 rhaas 1657 CBC 1176 : ReplicationSlotsComputeRequiredLSN();
1658 : }
1659 :
1660 : /* ----
1661 : * Manipulation of on-disk state of replication slots
3355 rhaas 1662 ECB : *
1663 : * NB: none of the routines below should take any notice whether a slot is the
1664 : * current one or not, that's all handled a layer above.
1665 : * ----
1666 : */
1667 : static void
3355 rhaas 1668 CBC 469 : CreateSlotOnDisk(ReplicationSlot *slot)
3355 rhaas 1669 EUB : {
1670 : char tmppath[MAXPGPATH];
3355 rhaas 1671 ECB : char path[MAXPGPATH];
1672 : struct stat st;
1673 :
1674 : /*
1675 : * No need to take out the io_in_progress_lock, nobody else can see this
1676 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1677 : * takes out the lock, if we'd take the lock here, we'd deadlock.
1678 : */
1679 :
3355 rhaas 1680 CBC 469 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
3355 rhaas 1681 GIC 469 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
3355 rhaas 1682 ECB :
1683 : /*
1684 : * It's just barely possible that some previous effort to create or drop a
1685 : * slot with this name left a temp directory lying around. If that seems
1686 : * to be the case, try to remove it. If the rmtree() fails, we'll error
1828 sfrost 1687 : * out at the MakePGDirectory() below, so we don't bother checking
1688 : * success.
1689 : */
3355 rhaas 1690 CBC 469 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
3355 rhaas 1691 UIC 0 : rmtree(tmppath, true);
1692 :
1693 : /* Create and fsync the temporary slot directory. */
1828 sfrost 1694 GIC 469 : if (MakePGDirectory(tmppath) < 0)
3355 rhaas 1695 LBC 0 : ereport(ERROR,
1696 : (errcode_for_file_access(),
3355 rhaas 1697 ECB : errmsg("could not create directory \"%s\": %m",
1698 : tmppath)));
3355 rhaas 1699 GIC 469 : fsync_fname(tmppath, true);
1700 :
3355 rhaas 1701 ECB : /* Write the actual state file. */
3260 bruce 1702 GIC 469 : slot->dirty = true; /* signal that we really need to write */
3355 rhaas 1703 469 : SaveSlotToPath(slot, tmppath, ERROR);
1704 :
1705 : /* Rename the directory into place. */
1706 469 : if (rename(tmppath, path) != 0)
3355 rhaas 1707 UIC 0 : ereport(ERROR,
1708 : (errcode_for_file_access(),
1709 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1710 : tmppath, path)));
3355 rhaas 1711 ECB :
1712 : /*
1713 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
1714 : * would persist after an OS crash or not - so, force a restart. The
2620 1715 : * restart would try to fsync this again till it works.
1716 : */
3355 rhaas 1717 GIC 469 : START_CRIT_SECTION();
1718 :
1719 469 : fsync_fname(path, true);
1720 469 : fsync_fname("pg_replslot", true);
1721 :
1722 469 : END_CRIT_SECTION();
1723 469 : }
3355 rhaas 1724 ECB :
1725 : /*
1726 : * Shared functionality between saving and creating a replication slot.
1727 : */
1728 : static void
3355 rhaas 1729 GIC 1559 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1730 : {
3355 rhaas 1731 ECB : char tmppath[MAXPGPATH];
1732 : char path[MAXPGPATH];
1733 : int fd;
1734 : ReplicationSlotOnDisk cp;
1735 : bool was_dirty;
1736 :
1737 : /* first check whether there's something to write out */
2742 rhaas 1738 CBC 1559 : SpinLockAcquire(&slot->mutex);
1739 1559 : was_dirty = slot->dirty;
2742 rhaas 1740 GIC 1559 : slot->just_dirtied = false;
1741 1559 : SpinLockRelease(&slot->mutex);
1742 :
1743 : /* and don't do anything if there's nothing to write */
3355 1744 1559 : if (!was_dirty)
1745 79 : return;
3355 rhaas 1746 ECB :
2627 rhaas 1747 GIC 1480 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1748 :
1749 : /* silence valgrind :( */
3355 1750 1480 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
3355 rhaas 1751 ECB :
3355 rhaas 1752 GIC 1480 : sprintf(tmppath, "%s/state.tmp", dir);
1753 1480 : sprintf(path, "%s/state", dir);
3355 rhaas 1754 ECB :
2024 peter_e 1755 CBC 1480 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3355 rhaas 1756 GIC 1480 : if (fd < 0)
1757 : {
1758 : /*
1759 : * If not an ERROR, then release the lock before returning. In case
1109 peter 1760 ECB : * of an ERROR, the error recovery path automatically releases the
1099 1761 : * lock, but no harm in explicitly releasing even in that case. Note
1762 : * that LWLockRelease() could affect errno.
1763 : */
1099 peter 1764 LBC 0 : int save_errno = errno;
1099 peter 1765 ECB :
1109 peter 1766 UIC 0 : LWLockRelease(&slot->io_in_progress_lock);
1099 1767 0 : errno = save_errno;
3355 rhaas 1768 LBC 0 : ereport(elevel,
3355 rhaas 1769 EUB : (errcode_for_file_access(),
1770 : errmsg("could not create file \"%s\": %m",
1771 : tmppath)));
3355 rhaas 1772 LBC 0 : return;
1773 : }
3355 rhaas 1774 EUB :
3355 rhaas 1775 GIC 1480 : cp.magic = SLOT_MAGIC;
3078 heikki.linnakangas 1776 GBC 1480 : INIT_CRC32C(cp.checksum);
3070 andres 1777 GIC 1480 : cp.version = SLOT_VERSION;
1778 1480 : cp.length = ReplicationSlotOnDiskV2Size;
3355 rhaas 1779 EUB :
3355 rhaas 1780 GIC 1480 : SpinLockAcquire(&slot->mutex);
3355 rhaas 1781 EUB :
3355 rhaas 1782 GBC 1480 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1783 :
3355 rhaas 1784 GIC 1480 : SpinLockRelease(&slot->mutex);
1785 :
3078 heikki.linnakangas 1786 CBC 1480 : COMP_CRC32C(cp.checksum,
1787 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
501 akapila 1788 ECB : ReplicationSlotOnDiskChecksummedSize);
3070 andres 1789 GIC 1480 : FIN_CRC32C(cp.checksum);
1790 :
1708 michael 1791 CBC 1480 : errno = 0;
2213 rhaas 1792 GBC 1480 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
3355 rhaas 1793 GIC 1480 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1794 : {
3260 bruce 1795 LBC 0 : int save_errno = errno;
3260 bruce 1796 ECB :
2213 rhaas 1797 UIC 0 : pgstat_report_wait_end();
3355 1798 0 : CloseTransientFile(fd);
1109 peter 1799 0 : LWLockRelease(&slot->io_in_progress_lock);
1800 :
1801 : /* if write didn't set errno, assume problem is no disk space */
1749 michael 1802 0 : errno = save_errno ? save_errno : ENOSPC;
3355 rhaas 1803 0 : ereport(elevel,
1804 : (errcode_for_file_access(),
1805 : errmsg("could not write to file \"%s\": %m",
1806 : tmppath)));
3355 rhaas 1807 LBC 0 : return;
1808 : }
2213 rhaas 1809 GIC 1480 : pgstat_report_wait_end();
1810 :
1811 : /* fsync the temporary file */
1812 1480 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
3355 1813 1480 : if (pg_fsync(fd) != 0)
1814 : {
3260 bruce 1815 UIC 0 : int save_errno = errno;
1816 :
2213 rhaas 1817 0 : pgstat_report_wait_end();
3355 1818 0 : CloseTransientFile(fd);
1109 peter 1819 LBC 0 : LWLockRelease(&slot->io_in_progress_lock);
3355 rhaas 1820 0 : errno = save_errno;
3355 rhaas 1821 UIC 0 : ereport(elevel,
1822 : (errcode_for_file_access(),
1823 : errmsg("could not fsync file \"%s\": %m",
1824 : tmppath)));
1825 0 : return;
1826 : }
2213 rhaas 1827 GIC 1480 : pgstat_report_wait_end();
1828 :
1373 peter 1829 CBC 1480 : if (CloseTransientFile(fd) != 0)
1453 michael 1830 EUB : {
1099 peter 1831 UIC 0 : int save_errno = errno;
1832 :
1109 peter 1833 LBC 0 : LWLockRelease(&slot->io_in_progress_lock);
1099 peter 1834 UBC 0 : errno = save_errno;
1492 michael 1835 UIC 0 : ereport(elevel,
1836 : (errcode_for_file_access(),
1837 : errmsg("could not close file \"%s\": %m",
1492 michael 1838 ECB : tmppath)));
1453 michael 1839 UIC 0 : return;
1840 : }
3355 rhaas 1841 ECB :
1842 : /* rename to permanent file, fsync file and directory */
3355 rhaas 1843 GIC 1480 : if (rename(tmppath, path) != 0)
1844 : {
1099 peter 1845 LBC 0 : int save_errno = errno;
1099 peter 1846 EUB :
1109 peter 1847 UIC 0 : LWLockRelease(&slot->io_in_progress_lock);
1099 1848 0 : errno = save_errno;
3355 rhaas 1849 0 : ereport(elevel,
1850 : (errcode_for_file_access(),
1851 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1852 : tmppath, path)));
1853 0 : return;
1854 : }
1855 :
1378 michael 1856 ECB : /*
1857 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1858 : */
3355 rhaas 1859 CBC 1480 : START_CRIT_SECTION();
1860 :
1861 1480 : fsync_fname(path, false);
2587 andres 1862 1480 : fsync_fname(dir, true);
3355 rhaas 1863 GIC 1480 : fsync_fname("pg_replslot", true);
1864 :
1865 1480 : END_CRIT_SECTION();
1866 :
1867 : /*
3355 rhaas 1868 ECB : * Successfully wrote, unset dirty bit, unless somebody dirtied again
1869 : * already.
1870 : */
2742 rhaas 1871 GIC 1480 : SpinLockAcquire(&slot->mutex);
1872 1480 : if (!slot->just_dirtied)
1873 1479 : slot->dirty = false;
1874 1480 : SpinLockRelease(&slot->mutex);
1875 :
2627 1876 1480 : LWLockRelease(&slot->io_in_progress_lock);
3355 rhaas 1877 ECB : }
1878 :
1879 : /*
1880 : * Load a single slot from disk into memory.
1881 : */
1882 : static void
3355 rhaas 1883 CBC 34 : RestoreSlotFromDisk(const char *name)
3355 rhaas 1884 ECB : {
1885 : ReplicationSlotOnDisk cp;
1886 : int i;
1887 : char slotdir[MAXPGPATH + 12];
1888 : char path[MAXPGPATH + 22];
1889 : int fd;
3355 rhaas 1890 GIC 34 : bool restored = false;
3355 rhaas 1891 ECB : int readBytes;
2917 heikki.linnakangas 1892 : pg_crc32c checksum;
1893 :
3355 rhaas 1894 : /* no need to lock here, no concurrent access allowed yet */
1895 :
1896 : /* delete temp file if it exists */
1680 michael 1897 GIC 34 : sprintf(slotdir, "pg_replslot/%s", name);
1898 34 : sprintf(path, "%s/state.tmp", slotdir);
3355 rhaas 1899 34 : if (unlink(path) < 0 && errno != ENOENT)
3355 rhaas 1900 UIC 0 : ereport(PANIC,
1901 : (errcode_for_file_access(),
1902 : errmsg("could not remove file \"%s\": %m", path)));
3355 rhaas 1903 EUB :
1680 michael 1904 GIC 34 : sprintf(path, "%s/state", slotdir);
3355 rhaas 1905 EUB :
3355 rhaas 1906 GBC 34 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
3355 rhaas 1907 EUB :
1908 : /* on some operating systems fsyncing a file requires O_RDWR */
1283 andres 1909 GIC 34 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1910 :
3355 rhaas 1911 EUB : /*
1912 : * We do not need to handle this as we are rename()ing the directory into
1913 : * place only after we fsync()ed the state file.
3355 rhaas 1914 ECB : */
3355 rhaas 1915 CBC 34 : if (fd < 0)
3355 rhaas 1916 LBC 0 : ereport(PANIC,
3355 rhaas 1917 ECB : (errcode_for_file_access(),
1918 : errmsg("could not open file \"%s\": %m", path)));
1919 :
1920 : /*
1921 : * Sync state file before we're reading from it. We might have crashed
1922 : * while it wasn't synced yet and we shouldn't continue on that basis.
1923 : */
2213 rhaas 1924 GIC 34 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
3355 rhaas 1925 CBC 34 : if (pg_fsync(fd) != 0)
3355 rhaas 1926 UIC 0 : ereport(PANIC,
1927 : (errcode_for_file_access(),
3355 rhaas 1928 ECB : errmsg("could not fsync file \"%s\": %m",
1929 : path)));
2213 rhaas 1930 CBC 34 : pgstat_report_wait_end();
3355 rhaas 1931 ECB :
1932 : /* Also sync the parent directory */
3355 rhaas 1933 GIC 34 : START_CRIT_SECTION();
1680 michael 1934 GBC 34 : fsync_fname(slotdir, true);
3355 rhaas 1935 GIC 34 : END_CRIT_SECTION();
3355 rhaas 1936 EUB :
1937 : /* read part of statefile that's guaranteed to be version independent */
2213 rhaas 1938 GBC 34 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
3355 rhaas 1939 GIC 34 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2213 1940 34 : pgstat_report_wait_end();
3355 rhaas 1941 GBC 34 : if (readBytes != ReplicationSlotOnDiskConstantSize)
3355 rhaas 1942 EUB : {
1726 michael 1943 UIC 0 : if (readBytes < 0)
1944 0 : ereport(PANIC,
1945 : (errcode_for_file_access(),
1726 michael 1946 EUB : errmsg("could not read file \"%s\": %m", path)));
1947 : else
1726 michael 1948 LBC 0 : ereport(PANIC,
1949 : (errcode(ERRCODE_DATA_CORRUPTED),
1950 : errmsg("could not read file \"%s\": read %d of %zu",
1726 michael 1951 ECB : path, readBytes,
1952 : (Size) ReplicationSlotOnDiskConstantSize)));
1953 : }
3355 rhaas 1954 EUB :
1955 : /* verify magic */
3355 rhaas 1956 GBC 34 : if (cp.magic != SLOT_MAGIC)
3355 rhaas 1957 UBC 0 : ereport(PANIC,
1678 michael 1958 EUB : (errcode(ERRCODE_DATA_CORRUPTED),
2720 peter_e 1959 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
3355 rhaas 1960 : path, cp.magic, SLOT_MAGIC)));
1961 :
1962 : /* verify version */
3355 rhaas 1963 GIC 34 : if (cp.version != SLOT_VERSION)
3355 rhaas 1964 UBC 0 : ereport(PANIC,
1965 : (errcode(ERRCODE_DATA_CORRUPTED),
2118 tgl 1966 ECB : errmsg("replication slot file \"%s\" has unsupported version %u",
1967 : path, cp.version)));
3355 rhaas 1968 :
1969 : /* boundary check on length */
3070 andres 1970 GBC 34 : if (cp.length != ReplicationSlotOnDiskV2Size)
3355 rhaas 1971 UIC 0 : ereport(PANIC,
1678 michael 1972 EUB : (errcode(ERRCODE_DATA_CORRUPTED),
2118 tgl 1973 : errmsg("replication slot file \"%s\" has corrupted length %u",
1974 : path, cp.length)));
1975 :
1976 : /* Now that we know the size, read the entire file */
2213 rhaas 1977 GIC 34 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
3355 rhaas 1978 GBC 68 : readBytes = read(fd,
1979 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
3355 rhaas 1980 GIC 34 : cp.length);
2213 1981 34 : pgstat_report_wait_end();
3355 rhaas 1982 CBC 34 : if (readBytes != cp.length)
1983 : {
1726 michael 1984 UBC 0 : if (readBytes < 0)
1726 michael 1985 UIC 0 : ereport(PANIC,
1726 michael 1986 EUB : (errcode_for_file_access(),
1987 : errmsg("could not read file \"%s\": %m", path)));
1988 : else
1726 michael 1989 UIC 0 : ereport(PANIC,
1990 : (errcode(ERRCODE_DATA_CORRUPTED),
1991 : errmsg("could not read file \"%s\": read %d of %zu",
1726 michael 1992 EUB : path, readBytes, (Size) cp.length)));
1993 : }
1994 :
1373 peter 1995 GIC 34 : if (CloseTransientFile(fd) != 0)
1492 michael 1996 UIC 0 : ereport(PANIC,
1997 : (errcode_for_file_access(),
1492 michael 1998 ECB : errmsg("could not close file \"%s\": %m", path)));
1999 :
3078 heikki.linnakangas 2000 : /* now verify the CRC */
3078 heikki.linnakangas 2001 CBC 34 : INIT_CRC32C(checksum);
2002 34 : COMP_CRC32C(checksum,
2003 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
501 akapila 2004 ECB : ReplicationSlotOnDiskChecksummedSize);
3070 andres 2005 GIC 34 : FIN_CRC32C(checksum);
2006 :
3078 heikki.linnakangas 2007 34 : if (!EQ_CRC32C(checksum, cp.checksum))
3355 rhaas 2008 UIC 0 : ereport(PANIC,
2009 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
3355 rhaas 2010 ECB : path, checksum, cp.checksum)));
2011 :
3181 andres 2012 : /*
2013 : * If we crashed with an ephemeral slot active, don't restore but delete
2014 : * it.
2015 : */
3181 andres 2016 GIC 34 : if (cp.slotdata.persistency != RS_PERSISTENT)
2017 : {
1680 michael 2018 UIC 0 : if (!rmtree(slotdir, true))
2019 : {
3181 andres 2020 0 : ereport(WARNING,
2021 : (errmsg("could not remove directory \"%s\"",
1678 michael 2022 ECB : slotdir)));
2023 : }
3181 andres 2024 UIC 0 : fsync_fname("pg_replslot", true);
2025 0 : return;
2026 : }
2027 :
2028 : /*
1621 andres 2029 ECB : * Verify that requirements for the specific slot type are met. That's
2030 : * important because if these aren't met we're not guaranteed to retain
2031 : * all the necessary resources for the slot.
2032 : *
2033 : * NB: We have to do so *after* the above checks for ephemeral slots,
2034 : * because otherwise a slot that shouldn't exist anymore could prevent
2035 : * restarts.
2036 : *
2037 : * NB: Changing the requirements here also requires adapting
2038 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1621 andres 2039 EUB : */
1621 andres 2040 GIC 34 : if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1621 andres 2041 UIC 0 : ereport(FATAL,
2042 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1620 andres 2043 ECB : errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
2044 : NameStr(cp.slotdata.name)),
1616 2045 : errhint("Change wal_level to be logical or higher.")));
1621 andres 2046 GIC 34 : else if (wal_level < WAL_LEVEL_REPLICA)
1621 andres 2047 UIC 0 : ereport(FATAL,
1621 andres 2048 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2049 : errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
2050 : NameStr(cp.slotdata.name)),
2051 : errhint("Change wal_level to be replica or higher.")));
2052 :
2053 : /* nothing can be active yet, don't lock anything */
3355 rhaas 2054 CBC 50 : for (i = 0; i < max_replication_slots; i++)
3355 rhaas 2055 EUB : {
2056 : ReplicationSlot *slot;
2057 :
3355 rhaas 2058 GIC 50 : slot = &ReplicationSlotCtl->replication_slots[i];
2059 :
2060 50 : if (slot->in_use)
2061 16 : continue;
2062 :
3355 rhaas 2063 ECB : /* restore the entire set of persistent data */
3355 rhaas 2064 CBC 34 : memcpy(&slot->data, &cp.slotdata,
3355 rhaas 2065 EUB : sizeof(ReplicationSlotPersistentData));
2066 :
2067 : /* initialize in memory state */
3355 rhaas 2068 GIC 34 : slot->effective_xmin = cp.slotdata.xmin;
3324 rhaas 2069 CBC 34 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2070 :
3324 rhaas 2071 GIC 34 : slot->candidate_catalog_xmin = InvalidTransactionId;
3324 rhaas 2072 CBC 34 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2073 34 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2074 34 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2075 :
3355 rhaas 2076 GIC 34 : slot->in_use = true;
2910 andres 2077 CBC 34 : slot->active_pid = 0;
3355 rhaas 2078 ECB :
3355 rhaas 2079 CBC 34 : restored = true;
2080 34 : break;
2081 : }
3355 rhaas 2082 EUB :
3355 rhaas 2083 GBC 34 : if (!restored)
1619 michael 2084 UIC 0 : ereport(FATAL,
2085 : (errmsg("too many replication slots active before shutdown"),
2086 : errhint("Increase max_replication_slots and try again.")));
3355 rhaas 2087 EUB : }
|