Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/logical.h"
86 : #include "replication/origin.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/copydir.h"
89 : #include "storage/fd.h"
90 : #include "storage/ipc.h"
91 : #include "storage/lmgr.h"
92 : #include "utils/builtins.h"
93 : #include "utils/fmgroids.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /*
100 : * Replay progress of a single remote node.
101 : */
102 : typedef struct ReplicationState
103 : {
104 : /*
105 : * Local identifier for the remote node.
106 : */
107 : RepOriginId roident;
108 :
109 : /*
110 : * Location of the latest commit from the remote side.
111 : */
112 : XLogRecPtr remote_lsn;
113 :
114 : /*
115 : * Remember the local lsn of the commit record so we can XLogFlush() to it
116 : * during a checkpoint so we know the commit record actually is safe on
117 : * disk.
118 : */
119 : XLogRecPtr local_lsn;
120 :
121 : /*
122 : * PID of backend that's acquired slot, or 0 if none.
123 : */
124 : int acquired_by;
125 :
126 : /*
127 : * Condition variable that's signaled when acquired_by changes.
128 : */
129 : ConditionVariable origin_cv;
130 :
131 : /*
132 : * Lock protecting remote_lsn and local_lsn.
133 : */
134 : LWLock lock;
135 : } ReplicationState;
136 :
137 : /*
138 : * On disk version of ReplicationState.
139 : */
140 : typedef struct ReplicationStateOnDisk
141 : {
142 : RepOriginId roident;
143 : XLogRecPtr remote_lsn;
144 : } ReplicationStateOnDisk;
145 :
146 :
147 : typedef struct ReplicationStateCtl
148 : {
149 : /* Tranche to use for per-origin LWLocks */
150 : int tranche_id;
151 : /* Array of length max_replication_slots */
152 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
153 : } ReplicationStateCtl;
154 :
155 : /* external variables */
156 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
157 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
158 : TimestampTz replorigin_session_origin_timestamp = 0;
159 :
160 : /*
161 : * Base address into a shared memory array of replication states of size
162 : * max_replication_slots.
163 : *
164 : * XXX: Should we use a separate variable to size this rather than
165 : * max_replication_slots?
166 : */
167 : static ReplicationState *replication_states;
168 :
169 : /*
170 : * Actual shared memory block (replication_states[] is now part of this).
171 : */
172 : static ReplicationStateCtl *replication_states_ctl;
173 :
174 : /*
175 : * Backend-local, cached element from ReplicationState for use in a backend
176 : * replaying remote commits, so we don't have to search ReplicationState for
177 : * the backends current RepOriginId.
178 : */
179 : static ReplicationState *session_replication_state = NULL;
180 :
181 : /* Magic for on disk files. */
182 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
183 :
184 : static void
2902 andres 185 GIC 31 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
2902 andres 186 ECB : {
2902 andres 187 GIC 31 : if (check_slots && max_replication_slots == 0)
2902 andres 188 LBC 0 : ereport(ERROR,
2902 andres 189 EUB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 : errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 :
2902 andres 192 GIC 31 : if (!recoveryOK && RecoveryInProgress())
2902 andres 193 LBC 0 : ereport(ERROR,
2902 andres 194 EUB : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195 : errmsg("cannot manipulate replication origins during recovery")));
2902 andres 196 GIC 31 : }
2902 andres 197 ECB :
198 :
199 : /*
200 : * IsReservedOriginName
201 : * True iff name is either "none" or "any".
202 : */
203 : static bool
262 akapila 204 GNC 7 : IsReservedOriginName(const char *name)
205 : {
206 13 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
207 6 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
208 : }
209 :
210 : /* ---------------------------------------------------------------------------
211 : * Functions for working with replication origins themselves.
212 : * ---------------------------------------------------------------------------
213 : */
214 :
215 : /*
2902 andres 216 ECB : * Check for a persistent replication origin identified by name.
217 : *
218 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
219 : */
220 : RepOriginId
668 peter 221 GIC 694 : replorigin_by_name(const char *roname, bool missing_ok)
222 : {
223 : Form_pg_replication_origin ident;
2878 bruce 224 694 : Oid roident = InvalidOid;
225 : HeapTuple tuple;
226 : Datum roname_d;
227 :
2902 andres 228 694 : roname_d = CStringGetTextDatum(roname);
229 :
230 694 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
231 694 : if (HeapTupleIsValid(tuple))
232 : {
2902 andres 233 CBC 390 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
2902 andres 234 GIC 390 : roident = ident->roident;
235 390 : ReleaseSysCache(tuple);
2902 andres 236 ECB : }
2902 andres 237 GIC 304 : else if (!missing_ok)
2012 rhaas 238 4 : ereport(ERROR,
239 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2012 rhaas 240 ECB : errmsg("replication origin \"%s\" does not exist",
241 : roname)));
2902 andres 242 :
2902 andres 243 CBC 690 : return roident;
244 : }
2902 andres 245 ECB :
246 : /*
247 : * Create a replication origin.
248 : *
249 : * Needs to be called in a transaction.
250 : */
251 : RepOriginId
668 peter 252 GIC 271 : replorigin_create(const char *roname)
253 : {
254 : Oid roident;
2878 bruce 255 CBC 271 : HeapTuple tuple = NULL;
256 : Relation rel;
257 : Datum roname_d;
258 : SnapshotData SnapshotDirty;
259 : SysScanDesc scan;
260 : ScanKeyData key;
261 :
2902 andres 262 GIC 271 : roname_d = CStringGetTextDatum(roname);
263 :
2902 andres 264 CBC 271 : Assert(IsTransactionState());
265 :
266 : /*
2902 andres 267 ECB : * We need the numeric replication origin to be 16bit wide, so we cannot
268 : * rely on the normal oid allocation. Instead we simply scan
269 : * pg_replication_origin for the first unused id. That's not particularly
270 : * efficient, but this should be a fairly infrequent operation - we can
271 : * easily spend a bit more code on this when it turns out it needs to be
272 : * faster.
273 : *
274 : * We handle concurrency by taking an exclusive lock (allowing reads!)
275 : * over the table for the duration of the search. Because we use a "dirty
276 : * snapshot" we can read rows that other in-progress sessions have
277 : * written, even though they would be invisible with normal snapshots. Due
278 : * to the exclusive lock there's no danger that new rows can appear while
279 : * we're checking.
280 : */
2902 andres 281 GIC 271 : InitDirtySnapshot(SnapshotDirty);
282 :
1539 283 271 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
284 :
2901 285 492 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
286 : {
287 : bool nulls[Natts_pg_replication_origin];
288 : Datum values[Natts_pg_replication_origin];
289 : bool collides;
290 :
2902 291 492 : CHECK_FOR_INTERRUPTS();
292 :
2902 andres 293 CBC 492 : ScanKeyInit(&key,
294 : Anum_pg_replication_origin_roident,
2902 andres 295 ECB : BTEqualStrategyNumber, F_OIDEQ,
296 : ObjectIdGetDatum(roident));
297 :
2902 andres 298 GIC 492 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
299 : true /* indexOK */ ,
300 : &SnapshotDirty,
301 : 1, &key);
302 :
2902 andres 303 CBC 492 : collides = HeapTupleIsValid(systable_getnext(scan));
304 :
305 492 : systable_endscan(scan);
306 :
2902 andres 307 GIC 492 : if (!collides)
308 : {
309 : /*
2902 andres 310 ECB : * Ok, found an unused roident, insert the new row and do a CCI,
311 : * so our callers can look it up if they want to.
312 : */
2902 andres 313 GIC 271 : memset(&nulls, 0, sizeof(nulls));
314 :
2878 bruce 315 CBC 271 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
2902 andres 316 GIC 271 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
2902 andres 317 ECB :
2902 andres 318 GIC 271 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
2259 alvherre 319 CBC 271 : CatalogTupleInsert(rel, tuple);
2902 andres 320 GIC 270 : CommandCounterIncrement();
321 270 : break;
322 : }
323 : }
324 :
2878 bruce 325 ECB : /* now release lock again, */
1539 andres 326 GIC 270 : table_close(rel, ExclusiveLock);
2902 andres 327 ECB :
2902 andres 328 CBC 270 : if (tuple == NULL)
2902 andres 329 UIC 0 : ereport(ERROR,
2902 andres 330 ECB : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
234 john.naylor 331 : errmsg("could not find free replication origin ID")));
2902 andres 332 :
2902 andres 333 CBC 270 : heap_freetuple(tuple);
2902 andres 334 GIC 270 : return roident;
335 : }
336 :
337 : /*
788 akapila 338 ECB : * Helper function to drop a replication origin.
339 : */
340 : static void
65 akapila 341 GNC 221 : replorigin_state_clear(RepOriginId roident, bool nowait)
342 : {
343 : int i;
2902 andres 344 ECB :
1916 tgl 345 : /*
346 : * Clean up the slot state info, if there is any matching slot.
347 : */
2070 alvherre 348 GIC 221 : restart:
2902 andres 349 221 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
350 :
2902 andres 351 CBC 752 : for (i = 0; i < max_replication_slots; i++)
352 : {
2902 andres 353 GIC 717 : ReplicationState *state = &replication_states[i];
354 :
355 717 : if (state->roident == roident)
356 : {
357 : /* found our slot, is it busy? */
2902 andres 358 CBC 186 : if (state->acquired_by != 0)
2902 andres 359 ECB : {
360 : ConditionVariable *cv;
2070 alvherre 361 :
2070 alvherre 362 UIC 0 : if (nowait)
2070 alvherre 363 LBC 0 : ereport(ERROR,
364 : (errcode(ERRCODE_OBJECT_IN_USE),
229 john.naylor 365 ECB : errmsg("could not drop replication origin with ID %d, in use by PID %d",
366 : state->roident,
367 : state->acquired_by)));
1916 tgl 368 :
369 : /*
370 : * We must wait and then retry. Since we don't know which CV
371 : * to wait on until here, we can't readily use
1916 tgl 372 EUB : * ConditionVariablePrepareToSleep (calling it here would be
373 : * wrong, since we could miss the signal if we did so); just
374 : * use ConditionVariableSleep directly.
375 : */
2070 alvherre 376 UIC 0 : cv = &state->origin_cv;
377 :
378 0 : LWLockRelease(ReplicationOriginLock);
379 :
380 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
381 0 : goto restart;
382 : }
383 :
384 : /* first make a WAL log entry */
385 : {
2902 andres 386 EUB : xl_replorigin_drop xlrec;
387 :
2902 andres 388 GBC 186 : xlrec.node_id = roident;
2902 andres 389 GIC 186 : XLogBeginInsert();
2902 andres 390 GBC 186 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
391 186 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
392 : }
393 :
394 : /* then clear the in-memory slot */
2902 andres 395 GIC 186 : state->roident = InvalidRepOriginId;
396 186 : state->remote_lsn = InvalidXLogRecPtr;
397 186 : state->local_lsn = InvalidXLogRecPtr;
2902 andres 398 CBC 186 : break;
2902 andres 399 ECB : }
400 : }
2902 andres 401 CBC 221 : LWLockRelease(ReplicationOriginLock);
1916 tgl 402 GIC 221 : ConditionVariableCancelSleep();
2902 andres 403 221 : }
404 :
405 : /*
406 : * Drop replication origin (by name).
407 : *
788 akapila 408 ECB : * Needs to be called in a transaction.
409 : */
410 : void
668 peter 411 GIC 367 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
412 : {
413 : RepOriginId roident;
788 akapila 414 ECB : Relation rel;
415 : HeapTuple tuple;
416 :
788 akapila 417 CBC 367 : Assert(IsTransactionState());
418 :
65 akapila 419 GNC 367 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
788 akapila 420 EUB :
788 akapila 421 GIC 367 : roident = replorigin_by_name(name, missing_ok);
422 :
423 : /* Lock the origin to prevent concurrent drops. */
65 akapila 424 GNC 366 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
425 : AccessExclusiveLock);
426 :
427 366 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
428 366 : if (!HeapTupleIsValid(tuple))
429 : {
430 145 : if (!missing_ok)
65 akapila 431 UNC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
432 : roident);
433 :
434 : /*
435 : * We don't need to retain the locks if the origin is already dropped.
436 : */
65 akapila 437 GNC 145 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
438 : AccessExclusiveLock);
439 145 : table_close(rel, RowExclusiveLock);
440 145 : return;
441 : }
442 :
443 221 : replorigin_state_clear(roident, nowait);
444 :
445 : /*
446 : * Now, we can delete the catalog entry.
447 : */
448 221 : CatalogTupleDelete(rel, &tuple->t_self);
449 221 : ReleaseSysCache(tuple);
450 :
451 221 : CommandCounterIncrement();
452 :
788 akapila 453 ECB : /* We keep the lock on pg_replication_origin until commit */
788 akapila 454 GIC 221 : table_close(rel, NoLock);
788 akapila 455 ECB : }
2902 andres 456 :
457 : /*
458 : * Lookup replication origin via its oid and return the name.
459 : *
460 : * The external name is palloc'd in the calling context.
461 : *
462 : * Returns true if the origin is known, false otherwise.
463 : */
464 : bool
2902 andres 465 CBC 9 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
466 : {
2878 bruce 467 ECB : HeapTuple tuple;
468 : Form_pg_replication_origin ric;
469 :
2902 andres 470 CBC 9 : Assert(OidIsValid((Oid) roident));
2902 andres 471 GIC 9 : Assert(roident != InvalidRepOriginId);
472 9 : Assert(roident != DoNotReplicateId);
473 :
474 9 : tuple = SearchSysCache1(REPLORIGIDENT,
475 : ObjectIdGetDatum((Oid) roident));
476 :
477 9 : if (HeapTupleIsValid(tuple))
478 : {
479 9 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
480 9 : *roname = text_to_cstring(&ric->roname);
2902 andres 481 CBC 9 : ReleaseSysCache(tuple);
482 :
2902 andres 483 GIC 9 : return true;
484 : }
485 : else
2902 andres 486 ECB : {
2902 andres 487 LBC 0 : *roname = NULL;
2902 andres 488 ECB :
2902 andres 489 UIC 0 : if (!missing_ok)
2012 rhaas 490 LBC 0 : ereport(ERROR,
491 : (errcode(ERRCODE_UNDEFINED_OBJECT),
492 : errmsg("replication origin with ID %d does not exist",
2012 rhaas 493 ECB : roident)));
494 :
2902 andres 495 LBC 0 : return false;
2902 andres 496 ECB : }
497 : }
498 :
499 :
500 : /* ---------------------------------------------------------------------------
501 : * Functions for handling replication progress.
502 : * ---------------------------------------------------------------------------
2902 andres 503 EUB : */
504 :
505 : Size
2902 andres 506 GBC 6390 : ReplicationOriginShmemSize(void)
507 : {
2902 andres 508 GIC 6390 : Size size = 0;
509 :
510 : /*
2802 andres 511 EUB : * XXX: max_replication_slots is arguably the wrong thing to use, as here
512 : * we keep the replay state of *remote* transactions. But for now it seems
513 : * sufficient to reuse it, rather than introduce a separate GUC.
514 : */
2902 andres 515 GIC 6390 : if (max_replication_slots == 0)
2902 andres 516 UIC 0 : return size;
517 :
2902 andres 518 GIC 6390 : size = add_size(size, offsetof(ReplicationStateCtl, states));
519 :
520 6390 : size = add_size(size,
521 : mul_size(max_replication_slots, sizeof(ReplicationState)));
2902 andres 522 CBC 6390 : return size;
523 : }
2902 andres 524 ECB :
525 : void
2902 andres 526 GIC 1826 : ReplicationOriginShmemInit(void)
527 : {
528 : bool found;
529 :
530 1826 : if (max_replication_slots == 0)
2902 andres 531 LBC 0 : return;
2902 andres 532 EUB :
2902 andres 533 GIC 1826 : replication_states_ctl = (ReplicationStateCtl *)
2902 andres 534 CBC 1826 : ShmemInitStruct("ReplicationOriginState",
535 : ReplicationOriginShmemSize(),
2902 andres 536 ECB : &found);
2878 bruce 537 GIC 1826 : replication_states = replication_states_ctl->states;
2902 andres 538 ECB :
2902 andres 539 GIC 1826 : if (!found)
540 : {
541 : int i;
2902 andres 542 ECB :
1059 tgl 543 GIC 130639 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
544 :
545 1826 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
2902 andres 546 ECB :
2902 andres 547 GBC 19967 : for (i = 0; i < max_replication_slots; i++)
548 : {
2902 andres 549 CBC 18141 : LWLockInitialize(&replication_states[i].lock,
550 18141 : replication_states_ctl->tranche_id);
2070 alvherre 551 GIC 18141 : ConditionVariableInit(&replication_states[i].origin_cv);
552 : }
2902 andres 553 ECB : }
554 : }
555 :
556 : /* ---------------------------------------------------------------------------
557 : * Perform a checkpoint of each replication origin's progress with respect to
558 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
559 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
560 : * if the transactions were originally committed asynchronously.
561 : *
562 : * We store checkpoints in the following format:
563 : * +-------+------------------------+------------------+-----+--------+
564 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
565 : * +-------+------------------------+------------------+-----+--------+
566 : *
567 : * So its just the magic, followed by the statically sized
568 : * ReplicationStateOnDisk structs. Note that the maximum number of
569 : * ReplicationState is determined by max_replication_slots.
570 : * ---------------------------------------------------------------------------
571 : */
572 : void
2902 andres 573 GIC 2363 : CheckPointReplicationOrigin(void)
574 : {
575 2363 : const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
576 2363 : const char *path = "pg_logical/replorigin_checkpoint";
577 : int tmpfd;
578 : int i;
579 2363 : uint32 magic = REPLICATION_STATE_MAGIC;
580 : pg_crc32c crc;
581 :
582 2363 : if (max_replication_slots == 0)
2902 andres 583 UIC 0 : return;
584 :
2902 andres 585 GIC 2363 : INIT_CRC32C(crc);
586 :
587 : /* make sure no old temp file is remaining */
588 2363 : if (unlink(tmppath) < 0 && errno != ENOENT)
2902 andres 589 LBC 0 : ereport(PANIC,
590 : (errcode_for_file_access(),
2902 andres 591 ECB : errmsg("could not remove file \"%s\": %m",
2802 592 : tmppath)));
593 :
594 : /*
697 tgl 595 : * no other backend can perform this at the same time; only one checkpoint
596 : * can happen at a time.
597 : */
2024 peter_e 598 CBC 2363 : tmpfd = OpenTransientFile(tmppath,
2024 peter_e 599 EUB : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2902 andres 600 GIC 2363 : if (tmpfd < 0)
2902 andres 601 LBC 0 : ereport(PANIC,
602 : (errcode_for_file_access(),
603 : errmsg("could not create file \"%s\": %m",
2902 andres 604 ECB : tmppath)));
2902 andres 605 EUB :
606 : /* write magic */
1708 michael 607 GIC 2363 : errno = 0;
2902 andres 608 2363 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
609 : {
610 : /* if write didn't set errno, assume problem is no disk space */
1453 michael 611 UIC 0 : if (errno == 0)
612 0 : errno = ENOSPC;
2902 andres 613 0 : ereport(PANIC,
2902 andres 614 ECB : (errcode_for_file_access(),
615 : errmsg("could not write to file \"%s\": %m",
616 : tmppath)));
2902 andres 617 EUB : }
2902 andres 618 GIC 2363 : COMP_CRC32C(crc, &magic, sizeof(magic));
619 :
620 : /* prevent concurrent creations/drops */
621 2363 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
622 :
2902 andres 623 ECB : /* write actual data */
2902 andres 624 CBC 25839 : for (i = 0; i < max_replication_slots; i++)
625 : {
626 : ReplicationStateOnDisk disk_state;
2902 andres 627 GBC 23476 : ReplicationState *curstate = &replication_states[i];
2878 bruce 628 EUB : XLogRecPtr local_lsn;
2902 andres 629 :
2902 andres 630 GIC 23476 : if (curstate->roident == InvalidRepOriginId)
631 23449 : continue;
632 :
633 : /* zero, to avoid uninitialized padding bytes */
2177 andres 634 CBC 27 : memset(&disk_state, 0, sizeof(disk_state));
635 :
2902 andres 636 GIC 27 : LWLockAcquire(&curstate->lock, LW_SHARED);
2902 andres 637 ECB :
2902 andres 638 GIC 27 : disk_state.roident = curstate->roident;
639 :
2902 andres 640 CBC 27 : disk_state.remote_lsn = curstate->remote_lsn;
2902 andres 641 GIC 27 : local_lsn = curstate->local_lsn;
642 :
2902 andres 643 CBC 27 : LWLockRelease(&curstate->lock);
644 :
645 : /* make sure we only write out a commit that's persistent */
646 27 : XLogFlush(local_lsn);
2902 andres 647 ECB :
1708 michael 648 GIC 27 : errno = 0;
2902 andres 649 27 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
2902 andres 650 ECB : sizeof(disk_state))
651 : {
1749 michael 652 : /* if write didn't set errno, assume problem is no disk space */
1453 michael 653 UIC 0 : if (errno == 0)
1453 michael 654 LBC 0 : errno = ENOSPC;
2902 andres 655 UIC 0 : ereport(PANIC,
2902 andres 656 ECB : (errcode_for_file_access(),
657 : errmsg("could not write to file \"%s\": %m",
658 : tmppath)));
659 : }
660 :
2902 andres 661 GIC 27 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
2902 andres 662 ECB : }
663 :
2902 andres 664 CBC 2363 : LWLockRelease(ReplicationOriginLock);
2902 andres 665 ECB :
666 : /* write out the CRC */
2902 andres 667 GIC 2363 : FIN_CRC32C(crc);
1708 michael 668 2363 : errno = 0;
2902 andres 669 GBC 2363 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
2902 andres 670 EUB : {
1749 michael 671 : /* if write didn't set errno, assume problem is no disk space */
1453 michael 672 UIC 0 : if (errno == 0)
673 0 : errno = ENOSPC;
2902 andres 674 0 : ereport(PANIC,
675 : (errcode_for_file_access(),
676 : errmsg("could not write to file \"%s\": %m",
2902 andres 677 ECB : tmppath)));
678 : }
679 :
1373 peter 680 CBC 2363 : if (CloseTransientFile(tmpfd) != 0)
1492 michael 681 UIC 0 : ereport(PANIC,
682 : (errcode_for_file_access(),
1492 michael 683 ECB : errmsg("could not close file \"%s\": %m",
684 : tmppath)));
2902 andres 685 :
686 : /* fsync, rename to permanent file, fsync file and directory */
2587 andres 687 GIC 2363 : durable_rename(tmppath, path, PANIC);
2902 andres 688 EUB : }
689 :
690 : /*
691 : * Recover replication replay status from checkpoint data saved earlier by
692 : * CheckPointReplicationOrigin.
693 : *
694 : * This only needs to be called at startup and *not* during every checkpoint
695 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
2902 andres 696 ECB : * state thereafter can be recovered by looking at commit records.
2902 andres 697 EUB : */
698 : void
2902 andres 699 GIC 1176 : StartupReplicationOrigin(void)
700 : {
701 1176 : const char *path = "pg_logical/replorigin_checkpoint";
702 : int fd;
2878 bruce 703 ECB : int readBytes;
2878 bruce 704 GIC 1176 : uint32 magic = REPLICATION_STATE_MAGIC;
705 1176 : int last_state = 0;
706 : pg_crc32c file_crc;
707 : pg_crc32c crc;
708 :
709 : /* don't want to overwrite already existing state */
710 : #ifdef USE_ASSERT_CHECKING
711 : static bool already_started = false;
712 :
2902 andres 713 1176 : Assert(!already_started);
714 1176 : already_started = true;
2902 andres 715 ECB : #endif
716 :
2902 andres 717 CBC 1176 : if (max_replication_slots == 0)
2902 andres 718 GIC 305 : return;
719 :
2902 andres 720 CBC 1176 : INIT_CRC32C(crc);
2902 andres 721 ECB :
2902 andres 722 GIC 1176 : elog(DEBUG2, "starting up replication origin progress state");
723 :
2024 peter_e 724 1176 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
725 :
726 : /*
727 : * might have had max_replication_slots == 0 last run, or we just brought
728 : * up a standby.
2902 andres 729 ECB : */
2902 andres 730 CBC 1176 : if (fd < 0 && errno == ENOENT)
2902 andres 731 GIC 305 : return;
732 871 : else if (fd < 0)
2902 andres 733 LBC 0 : ereport(PANIC,
2902 andres 734 ECB : (errcode_for_file_access(),
735 : errmsg("could not open file \"%s\": %m",
736 : path)));
737 :
2624 magnus 738 : /* verify magic, that is written even if nothing was active */
2902 andres 739 GIC 871 : readBytes = read(fd, &magic, sizeof(magic));
2902 andres 740 CBC 871 : if (readBytes != sizeof(magic))
741 : {
1726 michael 742 UIC 0 : if (readBytes < 0)
743 0 : ereport(PANIC,
744 : (errcode_for_file_access(),
745 : errmsg("could not read file \"%s\": %m",
1726 michael 746 ECB : path)));
747 : else
1726 michael 748 LBC 0 : ereport(PANIC,
1721 michael 749 EUB : (errcode(ERRCODE_DATA_CORRUPTED),
750 : errmsg("could not read file \"%s\": read %d of %zu",
751 : path, readBytes, sizeof(magic))));
752 : }
2902 andres 753 GIC 871 : COMP_CRC32C(crc, &magic, sizeof(magic));
754 :
2902 andres 755 CBC 871 : if (magic != REPLICATION_STATE_MAGIC)
2902 andres 756 LBC 0 : ereport(PANIC,
757 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
2118 tgl 758 EUB : magic, REPLICATION_STATE_MAGIC)));
2902 andres 759 :
760 : /* we can skip locking here, no other access is possible */
761 :
762 : /* recover individual states, until there are no more to be found */
763 : while (true)
2902 andres 764 GBC 3 : {
765 : ReplicationStateOnDisk disk_state;
766 :
2902 andres 767 GIC 874 : readBytes = read(fd, &disk_state, sizeof(disk_state));
768 :
2902 andres 769 ECB : /* no further data */
2902 andres 770 GIC 874 : if (readBytes == sizeof(crc))
2902 andres 771 ECB : {
2902 andres 772 EUB : /* not pretty, but simple ... */
2878 bruce 773 GIC 871 : file_crc = *(pg_crc32c *) &disk_state;
2902 andres 774 871 : break;
775 : }
776 :
777 3 : if (readBytes < 0)
778 : {
2902 andres 779 UIC 0 : ereport(PANIC,
2902 andres 780 ECB : (errcode_for_file_access(),
781 : errmsg("could not read file \"%s\": %m",
782 : path)));
783 : }
784 :
2902 andres 785 GIC 3 : if (readBytes != sizeof(disk_state))
2902 andres 786 ECB : {
2902 andres 787 UIC 0 : ereport(PANIC,
788 : (errcode_for_file_access(),
2902 andres 789 ECB : errmsg("could not read file \"%s\": read %d of %zu",
790 : path, readBytes, sizeof(disk_state))));
791 : }
792 :
2902 andres 793 CBC 3 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
794 :
2902 andres 795 GBC 3 : if (last_state == max_replication_slots)
2902 andres 796 UIC 0 : ereport(PANIC,
797 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798 : errmsg("could not find free replication state, increase max_replication_slots")));
799 :
800 : /* copy data to shared memory */
2902 andres 801 CBC 3 : replication_states[last_state].roident = disk_state.roident;
2902 andres 802 GIC 3 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
2902 andres 803 GBC 3 : last_state++;
804 :
856 peter 805 GIC 3 : ereport(LOG,
806 : (errmsg("recovered replication state of node %d to %X/%X",
807 : disk_state.roident,
808 : LSN_FORMAT_ARGS(disk_state.remote_lsn))));
2902 andres 809 ECB : }
810 :
811 : /* now check checksum */
2902 andres 812 GBC 871 : FIN_CRC32C(crc);
2902 andres 813 GIC 871 : if (file_crc != crc)
2902 andres 814 UIC 0 : ereport(PANIC,
815 : (errcode(ERRCODE_DATA_CORRUPTED),
816 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
2902 andres 817 ECB : crc, file_crc)));
818 :
1373 peter 819 CBC 871 : if (CloseTransientFile(fd) != 0)
1492 michael 820 UIC 0 : ereport(PANIC,
1492 michael 821 ECB : (errcode_for_file_access(),
822 : errmsg("could not close file \"%s\": %m",
823 : path)));
824 : }
825 :
826 : void
2902 andres 827 GIC 6 : replorigin_redo(XLogReaderState *record)
2902 andres 828 ECB : {
2902 andres 829 CBC 6 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2902 andres 830 EUB :
2902 andres 831 GIC 6 : switch (info)
832 : {
833 3 : case XLOG_REPLORIGIN_SET:
834 : {
2902 andres 835 CBC 3 : xl_replorigin_set *xlrec =
2878 bruce 836 GBC 3 : (xl_replorigin_set *) XLogRecGetData(record);
837 :
2902 andres 838 GIC 3 : replorigin_advance(xlrec->node_id,
839 : xlrec->remote_lsn, record->EndRecPtr,
2878 bruce 840 3 : xlrec->force /* backward */ ,
841 : false /* WAL log */ );
2902 andres 842 3 : break;
2902 andres 843 ECB : }
2902 andres 844 GIC 3 : case XLOG_REPLORIGIN_DROP:
2902 andres 845 ECB : {
846 : xl_replorigin_drop *xlrec;
2878 bruce 847 : int i;
848 :
2902 andres 849 CBC 3 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
850 :
851 4 : for (i = 0; i < max_replication_slots; i++)
2902 andres 852 ECB : {
2902 andres 853 GIC 4 : ReplicationState *state = &replication_states[i];
2902 andres 854 ECB :
855 : /* found our slot */
2902 andres 856 CBC 4 : if (state->roident == xlrec->node_id)
857 : {
2902 andres 858 ECB : /* reset entry */
2902 andres 859 GIC 3 : state->roident = InvalidRepOriginId;
2902 andres 860 CBC 3 : state->remote_lsn = InvalidXLogRecPtr;
2902 andres 861 GIC 3 : state->local_lsn = InvalidXLogRecPtr;
862 3 : break;
863 : }
864 : }
2902 andres 865 CBC 3 : break;
866 : }
2902 andres 867 LBC 0 : default:
2902 andres 868 UIC 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
2902 andres 869 ECB : }
2902 andres 870 GIC 6 : }
871 :
2902 andres 872 ECB :
873 : /*
874 : * Tell the replication origin progress machinery that a commit from 'node'
875 : * that originated at the LSN remote_commit on the remote node was replayed
876 : * successfully and that we don't need to do so again. In combination with
2750 alvherre 877 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
824 akapila 878 : * that ensures we won't lose knowledge about that after a crash if the
879 : * transaction had a persistent effect (think of asynchronous commits).
880 : *
2902 andres 881 : * local_commit needs to be a local LSN of the commit so that we can make sure
882 : * upon a checkpoint that enough WAL has been persisted to disk.
2902 andres 883 EUB : *
884 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
885 : * unless running in recovery.
2902 andres 886 ECB : */
887 : void
2902 andres 888 GIC 200 : replorigin_advance(RepOriginId node,
889 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
890 : bool go_backward, bool wal_log)
891 : {
892 : int i;
893 200 : ReplicationState *replication_state = NULL;
894 200 : ReplicationState *free_state = NULL;
895 :
896 200 : Assert(node != InvalidRepOriginId);
897 :
898 : /* we don't track DoNotReplicateId */
899 200 : if (node == DoNotReplicateId)
2902 andres 900 UIC 0 : return;
901 :
902 : /*
903 : * XXX: For the case where this is called by WAL replay, it'd be more
2902 andres 904 ECB : * efficient to restore into a backend local hashtable and only dump into
905 : * shmem after recovery is finished. Let's wait with implementing that
906 : * till it's shown to be a measurable expense
907 : */
908 :
909 : /* Lock exclusively, as we may have to create a new table entry. */
2902 andres 910 CBC 200 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
911 :
2902 andres 912 ECB : /*
913 : * Search for either an existing slot for the origin, or a free one we can
914 : * use.
915 : */
2902 andres 916 GBC 1749 : for (i = 0; i < max_replication_slots; i++)
917 : {
2902 andres 918 GIC 1595 : ReplicationState *curstate = &replication_states[i];
919 :
920 : /* remember where to insert if necessary */
921 1595 : if (curstate->roident == InvalidRepOriginId &&
922 : free_state == NULL)
923 : {
924 156 : free_state = curstate;
925 156 : continue;
2902 andres 926 ECB : }
927 :
928 : /* not our slot */
2902 andres 929 GIC 1439 : if (curstate->roident != node)
930 : {
931 1393 : continue;
2902 andres 932 ECB : }
933 :
934 : /* ok, found slot */
2902 andres 935 GIC 46 : replication_state = curstate;
936 :
2902 andres 937 CBC 46 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
938 :
939 : /* Make sure it's not used by somebody else */
940 46 : if (replication_state->acquired_by != 0)
2902 andres 941 ECB : {
2902 andres 942 UIC 0 : ereport(ERROR,
943 : (errcode(ERRCODE_OBJECT_IN_USE),
944 : errmsg("replication origin with ID %d is already active for PID %d",
2902 andres 945 ECB : replication_state->roident,
946 : replication_state->acquired_by)));
947 : }
948 :
2902 andres 949 GIC 46 : break;
950 : }
2902 andres 951 ECB :
2902 andres 952 GIC 200 : if (replication_state == NULL && free_state == NULL)
2902 andres 953 LBC 0 : ereport(ERROR,
954 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955 : errmsg("could not find free replication state slot for replication origin with ID %d",
2902 andres 956 ECB : node),
957 : errhint("Increase max_replication_slots and try again.")));
2902 andres 958 EUB :
2902 andres 959 GIC 200 : if (replication_state == NULL)
960 : {
961 : /* initialize new slot */
962 154 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
963 154 : replication_state = free_state;
964 154 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
2902 andres 965 CBC 154 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
2902 andres 966 GIC 154 : replication_state->roident = node;
967 : }
2902 andres 968 ECB :
2902 andres 969 GBC 200 : Assert(replication_state->roident != InvalidRepOriginId);
970 :
971 : /*
972 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
973 : * and the standby gets the message. Primarily this will be called during
974 : * WAL replay (of commit records) where no WAL logging is necessary.
2902 andres 975 ECB : */
2902 andres 976 GIC 200 : if (wal_log)
977 : {
2902 andres 978 ECB : xl_replorigin_set xlrec;
2878 bruce 979 :
2902 andres 980 CBC 155 : xlrec.remote_lsn = remote_commit;
981 155 : xlrec.node_id = node;
982 155 : xlrec.force = go_backward;
983 :
2902 andres 984 GIC 155 : XLogBeginInsert();
2902 andres 985 CBC 155 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
986 :
2902 andres 987 GIC 155 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
988 : }
989 :
990 : /*
991 : * Due to - harmless - race conditions during a checkpoint we could see
634 akapila 992 ECB : * values here that are older than the ones we already have in memory. We
993 : * could also see older values for prepared transactions when the prepare
994 : * is sent at a later point of time along with commit prepared and there
995 : * are other transactions commits between prepare and commit prepared. See
996 : * ReorderBufferFinishPrepared. Don't overwrite those.
2902 andres 997 : */
2902 andres 998 CBC 200 : if (go_backward || replication_state->remote_lsn < remote_commit)
2902 andres 999 GIC 189 : replication_state->remote_lsn = remote_commit;
2902 andres 1000 CBC 200 : if (local_commit != InvalidXLogRecPtr &&
1001 42 : (go_backward || replication_state->local_lsn < local_commit))
2902 andres 1002 GIC 45 : replication_state->local_lsn = local_commit;
2902 andres 1003 CBC 200 : LWLockRelease(&replication_state->lock);
1004 :
1005 : /*
1006 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1007 : * otherwise be dropped anytime.
1008 : */
2902 andres 1009 GIC 200 : LWLockRelease(ReplicationOriginLock);
1010 : }
1011 :
1012 :
1013 : XLogRecPtr
2902 andres 1014 CBC 8 : replorigin_get_progress(RepOriginId node, bool flush)
2902 andres 1015 ECB : {
1016 : int i;
2902 andres 1017 CBC 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1018 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
2902 andres 1019 ECB :
1020 : /* prevent slots from being concurrently dropped */
2902 andres 1021 GIC 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1022 :
1023 38 : for (i = 0; i < max_replication_slots; i++)
1024 : {
2902 andres 1025 ECB : ReplicationState *state;
1026 :
2902 andres 1027 GIC 35 : state = &replication_states[i];
1028 :
1029 35 : if (state->roident == node)
2902 andres 1030 ECB : {
2902 andres 1031 GIC 5 : LWLockAcquire(&state->lock, LW_SHARED);
1032 :
2902 andres 1033 CBC 5 : remote_lsn = state->remote_lsn;
1034 5 : local_lsn = state->local_lsn;
1035 :
2902 andres 1036 GIC 5 : LWLockRelease(&state->lock);
2902 andres 1037 ECB :
2902 andres 1038 GIC 5 : break;
2902 andres 1039 ECB : }
1040 : }
1041 :
2902 andres 1042 GIC 8 : LWLockRelease(ReplicationOriginLock);
2902 andres 1043 ECB :
2902 andres 1044 GIC 8 : if (flush && local_lsn != InvalidXLogRecPtr)
2902 andres 1045 CBC 1 : XLogFlush(local_lsn);
1046 :
1047 8 : return remote_lsn;
1048 : }
2902 andres 1049 ECB :
1050 : /*
1051 : * Tear down a (possibly) configured session replication origin during process
1052 : * exit.
1053 : */
1054 : static void
2902 andres 1055 GIC 314 : ReplicationOriginExitCleanup(int code, Datum arg)
1056 : {
2064 tgl 1057 314 : ConditionVariable *cv = NULL;
2070 alvherre 1058 ECB :
2902 andres 1059 GIC 314 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
2902 andres 1060 ECB :
2902 andres 1061 CBC 314 : if (session_replication_state != NULL &&
2902 andres 1062 GIC 164 : session_replication_state->acquired_by == MyProcPid)
2902 andres 1063 ECB : {
2070 alvherre 1064 GIC 154 : cv = &session_replication_state->origin_cv;
1065 :
2902 andres 1066 154 : session_replication_state->acquired_by = 0;
1067 154 : session_replication_state = NULL;
1068 : }
1069 :
1070 314 : LWLockRelease(ReplicationOriginLock);
2070 alvherre 1071 ECB :
2070 alvherre 1072 GIC 314 : if (cv)
2070 alvherre 1073 CBC 154 : ConditionVariableBroadcast(cv);
2902 andres 1074 GIC 314 : }
2902 andres 1075 ECB :
1076 : /*
1077 : * Setup a replication origin in the shared memory struct if it doesn't
338 michael 1078 : * already exist and cache access to the specific ReplicationSlot so the
1079 : * array doesn't have to be searched when calling
2902 andres 1080 : * replorigin_session_advance().
1081 : *
1082 : * Normally only one such cached origin can exist per process so the cached
1083 : * value can only be set again after the previous value is torn down with
1084 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1085 : * (meaning the slot is not allowed to be already acquired by another process).
1086 : *
1087 : * However, sometimes multiple processes can safely re-use the same origin slot
1088 : * (for example, multiple parallel apply processes can safely use the same
1089 : * origin, provided they maintain commit order by allowing only one process to
1090 : * commit at a time). For this case the first process must pass acquired_by =
1091 : * 0, and then the other processes sharing that same origin can pass
1092 : * acquired_by = PID of the first process.
1093 : */
1094 : void
90 akapila 1095 GNC 316 : replorigin_session_setup(RepOriginId node, int acquired_by)
2902 andres 1096 ECB : {
1097 : static bool registered_cleanup;
2878 bruce 1098 : int i;
2878 bruce 1099 GIC 316 : int free_slot = -1;
1100 :
2902 andres 1101 316 : if (!registered_cleanup)
1102 : {
1103 314 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1104 314 : registered_cleanup = true;
1105 : }
1106 :
1107 316 : Assert(max_replication_slots > 0);
1108 :
1109 316 : if (session_replication_state != NULL)
1110 1 : ereport(ERROR,
1111 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1112 : errmsg("cannot setup replication origin when one is already setup")));
1113 :
1114 : /* Lock exclusively, as we may have to create a new table entry. */
1115 315 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1116 :
1117 : /*
1118 : * Search for either an existing slot for the origin, or a free one we can
2902 andres 1119 ECB : * use.
1120 : */
2902 andres 1121 GIC 3453 : for (i = 0; i < max_replication_slots; i++)
1122 : {
2902 andres 1123 CBC 3138 : ReplicationState *curstate = &replication_states[i];
1124 :
2902 andres 1125 ECB : /* remember where to insert if necessary */
2902 andres 1126 GIC 3138 : if (curstate->roident == InvalidRepOriginId &&
2902 andres 1127 ECB : free_slot == -1)
1128 : {
2902 andres 1129 GIC 315 : free_slot = i;
1130 315 : continue;
2902 andres 1131 ECB : }
1132 :
1133 : /* not our slot */
2902 andres 1134 CBC 2823 : if (curstate->roident != node)
2902 andres 1135 GIC 2579 : continue;
1136 :
90 akapila 1137 GNC 244 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1138 : {
2902 andres 1139 LBC 0 : ereport(ERROR,
1140 : (errcode(ERRCODE_OBJECT_IN_USE),
1141 : errmsg("replication origin with ID %d is already active for PID %d",
1142 : curstate->roident, curstate->acquired_by)));
1143 : }
1144 :
2902 andres 1145 ECB : /* ok, found slot */
2902 andres 1146 GIC 244 : session_replication_state = curstate;
2902 andres 1147 ECB : }
1148 :
1149 :
2902 andres 1150 CBC 315 : if (session_replication_state == NULL && free_slot == -1)
2902 andres 1151 UIC 0 : ereport(ERROR,
1152 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
229 john.naylor 1153 ECB : errmsg("could not find free replication state slot for replication origin with ID %d",
2902 andres 1154 : node),
1155 : errhint("Increase max_replication_slots and try again.")));
2902 andres 1156 GIC 315 : else if (session_replication_state == NULL)
1157 : {
2902 andres 1158 ECB : /* initialize new slot */
2902 andres 1159 CBC 71 : session_replication_state = &replication_states[free_slot];
2902 andres 1160 GIC 71 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
2902 andres 1161 CBC 71 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
2902 andres 1162 GIC 71 : session_replication_state->roident = node;
2902 andres 1163 EUB : }
1164 :
1165 :
2902 andres 1166 GIC 315 : Assert(session_replication_state->roident != InvalidRepOriginId);
1167 :
90 akapila 1168 GNC 315 : if (acquired_by == 0)
1169 305 : session_replication_state->acquired_by = MyProcPid;
1170 10 : else if (session_replication_state->acquired_by != acquired_by)
90 akapila 1171 UNC 0 : elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1172 : node, acquired_by);
1173 :
2902 andres 1174 CBC 315 : LWLockRelease(ReplicationOriginLock);
1175 :
1176 : /* probably this one is pointless */
2070 alvherre 1177 GIC 315 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
2902 andres 1178 CBC 315 : }
2902 andres 1179 EUB :
1180 : /*
1181 : * Reset replay state previously setup in this session.
1182 : *
1183 : * This function may only be called if an origin was setup with
2902 andres 1184 ECB : * replorigin_session_setup().
1185 : */
1186 : void
2902 andres 1187 CBC 152 : replorigin_session_reset(void)
2902 andres 1188 ECB : {
2064 tgl 1189 : ConditionVariable *cv;
2070 alvherre 1190 :
2902 andres 1191 GIC 152 : Assert(max_replication_slots != 0);
1192 :
1193 152 : if (session_replication_state == NULL)
2902 andres 1194 CBC 1 : ereport(ERROR,
1195 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2902 andres 1196 ECB : errmsg("no replication origin is configured")));
1197 :
2902 andres 1198 CBC 151 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
2902 andres 1199 EUB :
2902 andres 1200 GIC 151 : session_replication_state->acquired_by = 0;
2070 alvherre 1201 151 : cv = &session_replication_state->origin_cv;
2902 andres 1202 CBC 151 : session_replication_state = NULL;
1203 :
2902 andres 1204 GIC 151 : LWLockRelease(ReplicationOriginLock);
2070 alvherre 1205 ECB :
2070 alvherre 1206 CBC 151 : ConditionVariableBroadcast(cv);
2902 andres 1207 GIC 151 : }
1208 :
1209 : /*
1210 : * Do the same work replorigin_advance() does, just on the session's
1211 : * configured origin.
1212 : *
1213 : * This is noticeably cheaper than using replorigin_advance().
1214 : */
2902 andres 1215 ECB : void
2902 andres 1216 GIC 912 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1217 : {
1218 912 : Assert(session_replication_state != NULL);
2902 andres 1219 CBC 912 : Assert(session_replication_state->roident != InvalidRepOriginId);
1220 :
1221 912 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1222 912 : if (session_replication_state->local_lsn < local_commit)
2902 andres 1223 GIC 912 : session_replication_state->local_lsn = local_commit;
1224 912 : if (session_replication_state->remote_lsn < remote_commit)
1225 440 : session_replication_state->remote_lsn = remote_commit;
2902 andres 1226 CBC 912 : LWLockRelease(&session_replication_state->lock);
2902 andres 1227 GIC 912 : }
2902 andres 1228 ECB :
1229 : /*
1230 : * Ask the machinery about the point up to which we successfully replayed
1231 : * changes from an already setup replication origin.
1232 : */
1233 : XLogRecPtr
2902 andres 1234 CBC 149 : replorigin_session_get_progress(bool flush)
2902 andres 1235 ECB : {
1236 : XLogRecPtr remote_lsn;
1237 : XLogRecPtr local_lsn;
1238 :
2902 andres 1239 GIC 149 : Assert(session_replication_state != NULL);
1240 :
1241 149 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1242 149 : remote_lsn = session_replication_state->remote_lsn;
1243 149 : local_lsn = session_replication_state->local_lsn;
2902 andres 1244 CBC 149 : LWLockRelease(&session_replication_state->lock);
1245 :
1246 149 : if (flush && local_lsn != InvalidXLogRecPtr)
1247 1 : XLogFlush(local_lsn);
1248 :
1249 149 : return remote_lsn;
2902 andres 1250 ECB : }
1251 :
1252 :
1253 :
1254 : /* ---------------------------------------------------------------------------
1255 : * SQL functions for working with replication origin.
1256 : *
1257 : * These mostly should be fairly short wrappers around more generic functions.
1258 : * ---------------------------------------------------------------------------
1259 : */
1260 :
1261 : /*
1262 : * Create replication origin for the passed in name, and return the assigned
1263 : * oid.
1264 : */
1265 : Datum
2902 andres 1266 GIC 8 : pg_replication_origin_create(PG_FUNCTION_ARGS)
2902 andres 1267 ECB : {
1268 : char *name;
1269 : RepOriginId roident;
1270 :
2902 andres 1271 CBC 8 : replorigin_check_prerequisites(false, false);
2902 andres 1272 ECB :
2902 andres 1273 GIC 8 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1380 tgl 1274 ECB :
1275 : /*
1276 : * Replication origins "any and "none" are reserved for system options.
1277 : * The origins "pg_xxx" are reserved for internal use.
1278 : */
262 akapila 1279 GNC 8 : if (IsReservedName(name) || IsReservedOriginName(name))
1380 tgl 1280 CBC 3 : ereport(ERROR,
1281 : (errcode(ERRCODE_RESERVED_NAME),
1282 : errmsg("replication origin name \"%s\" is reserved",
1283 : name),
1284 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1285 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1286 :
1287 : /*
1288 : * If built with appropriate switch, whine when regression-testing
1289 : * conventions for replication origin names are violated.
1290 : */
1291 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1292 : if (strncmp(name, "regress_", 8) != 0)
1293 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1294 : #endif
1295 :
2902 andres 1296 GIC 5 : roident = replorigin_create(name);
1297 :
2902 andres 1298 CBC 4 : pfree(name);
1299 :
2902 andres 1300 GIC 4 : PG_RETURN_OID(roident);
1301 : }
1302 :
2902 andres 1303 ECB : /*
1304 : * Drop replication origin.
1305 : */
1306 : Datum
2902 andres 1307 GIC 5 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1308 : {
1309 : char *name;
1310 :
2902 andres 1311 CBC 5 : replorigin_check_prerequisites(false, false);
2902 andres 1312 ECB :
2902 andres 1313 GIC 5 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1314 :
788 akapila 1315 5 : replorigin_drop_by_name(name, false, true);
1316 :
2902 andres 1317 4 : pfree(name);
1318 :
1319 4 : PG_RETURN_VOID();
1320 : }
1321 :
1322 : /*
1323 : * Return oid of a replication origin.
1324 : */
1325 : Datum
2902 andres 1326 UIC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1327 : {
2878 bruce 1328 ECB : char *name;
1329 : RepOriginId roident;
2902 andres 1330 :
2902 andres 1331 UIC 0 : replorigin_check_prerequisites(false, false);
2902 andres 1332 ECB :
2902 andres 1333 UIC 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1334 0 : roident = replorigin_by_name(name, true);
1335 :
1336 0 : pfree(name);
1337 :
1338 0 : if (OidIsValid(roident))
2902 andres 1339 LBC 0 : PG_RETURN_OID(roident);
2902 andres 1340 UIC 0 : PG_RETURN_NULL();
1341 : }
1342 :
2902 andres 1343 ECB : /*
1344 : * Setup a replication origin for this session.
1345 : */
1346 : Datum
2902 andres 1347 CBC 5 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1348 : {
2878 bruce 1349 ECB : char *name;
1350 : RepOriginId origin;
2902 andres 1351 :
2902 andres 1352 GIC 5 : replorigin_check_prerequisites(true, false);
1353 :
1354 5 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1355 5 : origin = replorigin_by_name(name, false);
90 akapila 1356 GNC 4 : replorigin_session_setup(origin, 0);
1357 :
2750 alvherre 1358 GBC 3 : replorigin_session_origin = origin;
1359 :
2902 andres 1360 GIC 3 : pfree(name);
1361 :
1362 3 : PG_RETURN_VOID();
2902 andres 1363 EUB : }
1364 :
1365 : /*
1366 : * Reset previously setup origin in this session
1367 : */
1368 : Datum
2902 andres 1369 GIC 4 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
2902 andres 1370 EUB : {
2902 andres 1371 GBC 4 : replorigin_check_prerequisites(true, false);
2902 andres 1372 EUB :
2902 andres 1373 GIC 4 : replorigin_session_reset();
1374 :
2750 alvherre 1375 3 : replorigin_session_origin = InvalidRepOriginId;
1376 3 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1377 3 : replorigin_session_origin_timestamp = 0;
1378 :
2902 andres 1379 CBC 3 : PG_RETURN_VOID();
1380 : }
1381 :
1382 : /*
1383 : * Has a replication origin been setup for this session.
2902 andres 1384 ECB : */
1385 : Datum
2902 andres 1386 LBC 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
2902 andres 1387 ECB : {
2902 andres 1388 LBC 0 : replorigin_check_prerequisites(false, false);
1389 :
2750 alvherre 1390 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1391 : }
2902 andres 1392 ECB :
1393 :
1394 : /*
1395 : * Return the replication progress for origin setup in the current session.
1396 : *
1397 : * If 'flush' is set to true it is ensured that the returned value corresponds
1398 : * to a local transaction that has been flushed. This is useful if asynchronous
1399 : * commits are used when replaying replicated transactions.
1400 : */
1401 : Datum
2902 andres 1402 GIC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
2902 andres 1403 ECB : {
2902 andres 1404 GIC 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
2902 andres 1405 CBC 2 : bool flush = PG_GETARG_BOOL(0);
1406 :
1407 2 : replorigin_check_prerequisites(true, false);
2902 andres 1408 ECB :
2902 andres 1409 CBC 2 : if (session_replication_state == NULL)
2902 andres 1410 UIC 0 : ereport(ERROR,
2902 andres 1411 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1412 : errmsg("no replication origin is configured")));
1413 :
2902 andres 1414 GIC 2 : remote_lsn = replorigin_session_get_progress(flush);
1415 :
1416 2 : if (remote_lsn == InvalidXLogRecPtr)
2902 andres 1417 UIC 0 : PG_RETURN_NULL();
2902 andres 1418 EUB :
2902 andres 1419 GIC 2 : PG_RETURN_LSN(remote_lsn);
2902 andres 1420 EUB : }
1421 :
1422 : Datum
2902 andres 1423 GIC 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1424 : {
1425 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1426 :
1427 1 : replorigin_check_prerequisites(true, false);
1428 :
1429 1 : if (session_replication_state == NULL)
2902 andres 1430 UIC 0 : ereport(ERROR,
1431 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1432 : errmsg("no replication origin is configured")));
1433 :
2750 alvherre 1434 CBC 1 : replorigin_session_origin_lsn = location;
2750 alvherre 1435 GIC 1 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
2902 andres 1436 ECB :
2902 andres 1437 CBC 1 : PG_RETURN_VOID();
1438 : }
2902 andres 1439 ECB :
1440 : Datum
2902 andres 1441 LBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
2902 andres 1442 EUB : {
2902 andres 1443 UIC 0 : replorigin_check_prerequisites(true, false);
1444 :
2750 alvherre 1445 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
2750 alvherre 1446 LBC 0 : replorigin_session_origin_timestamp = 0;
1447 :
2902 andres 1448 0 : PG_RETURN_VOID();
2902 andres 1449 EUB : }
1450 :
2902 andres 1451 ECB :
1452 : Datum
2902 andres 1453 GIC 1 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1454 : {
2219 noah 1455 CBC 1 : text *name = PG_GETARG_TEXT_PP(0);
2878 bruce 1456 GIC 1 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
2878 bruce 1457 ECB : RepOriginId node;
1458 :
2902 andres 1459 CBC 1 : replorigin_check_prerequisites(true, false);
1460 :
2902 andres 1461 ECB : /* lock to prevent the replication origin from vanishing */
2902 andres 1462 GBC 1 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1463 :
2902 andres 1464 GIC 1 : node = replorigin_by_name(text_to_cstring(name), false);
1465 :
2902 andres 1466 ECB : /*
1467 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1468 : * xact hasn't committed yet. This is why this function should be used to
2881 heikki.linnakangas 1469 : * set up the initial replication state, but not for replay.
1470 : */
2902 andres 1471 UIC 0 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1472 : true /* go backward */ , true /* WAL log */ );
2902 andres 1473 EUB :
2902 andres 1474 UIC 0 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
2902 andres 1475 EUB :
2902 andres 1476 UIC 0 : PG_RETURN_VOID();
2902 andres 1477 EUB : }
1478 :
1479 :
1480 : /*
1481 : * Return the replication progress for an individual replication origin.
1482 : *
1483 : * If 'flush' is set to true it is ensured that the returned value corresponds
1484 : * to a local transaction that has been flushed. This is useful if asynchronous
2902 andres 1485 ECB : * commits are used when replaying replicated transactions.
1486 : */
1487 : Datum
2902 andres 1488 CBC 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1489 : {
1490 : char *name;
2902 andres 1491 ECB : bool flush;
1492 : RepOriginId roident;
2902 andres 1493 GIC 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
2902 andres 1494 ECB :
2902 andres 1495 GIC 3 : replorigin_check_prerequisites(true, true);
2902 andres 1496 ECB :
2902 andres 1497 GIC 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1498 3 : flush = PG_GETARG_BOOL(1);
1499 :
1500 3 : roident = replorigin_by_name(name, false);
1501 2 : Assert(OidIsValid(roident));
1502 :
2902 andres 1503 GBC 2 : remote_lsn = replorigin_get_progress(roident, flush);
1504 :
2902 andres 1505 GIC 2 : if (remote_lsn == InvalidXLogRecPtr)
2902 andres 1506 UBC 0 : PG_RETURN_NULL();
1507 :
2902 andres 1508 GBC 2 : PG_RETURN_LSN(remote_lsn);
1509 : }
1510 :
1511 :
1512 : Datum
2902 andres 1513 GIC 2 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1514 : {
1515 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1516 : int i;
1517 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1518 :
1519 : /* we want to return 0 rows if slot is set to zero */
2902 andres 1520 CBC 2 : replorigin_check_prerequisites(false, true);
1521 :
173 michael 1522 GIC 2 : InitMaterializedSRF(fcinfo, 0);
1523 :
1524 : /* prevent slots from being concurrently dropped */
2902 andres 1525 CBC 2 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1526 :
2902 andres 1527 ECB : /*
1528 : * Iterate through all possible replication_states, display if they are
1529 : * filled. Note that we do not take any locks, so slightly corrupted/out
1530 : * of date values are a possibility.
1531 : */
2902 andres 1532 CBC 10 : for (i = 0; i < max_replication_slots; i++)
2902 andres 1533 ECB : {
1534 : ReplicationState *state;
1535 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1536 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1537 : char *roname;
2902 andres 1538 EUB :
2902 andres 1539 GIC 8 : state = &replication_states[i];
2902 andres 1540 ECB :
1541 : /* unused slot, nothing to display */
2902 andres 1542 GIC 8 : if (state->roident == InvalidRepOriginId)
1543 6 : continue;
1544 :
2902 andres 1545 CBC 2 : memset(values, 0, sizeof(values));
2902 andres 1546 GIC 2 : memset(nulls, 1, sizeof(nulls));
2902 andres 1547 ECB :
2902 andres 1548 GIC 2 : values[0] = ObjectIdGetDatum(state->roident);
1549 2 : nulls[0] = false;
1550 :
1551 : /*
2902 andres 1552 ECB : * We're not preventing the origin to be dropped concurrently, so
1553 : * silently accept that it might be gone.
1554 : */
2902 andres 1555 GIC 2 : if (replorigin_by_oid(state->roident, true,
1556 : &roname))
2902 andres 1557 ECB : {
2902 andres 1558 GIC 2 : values[1] = CStringGetTextDatum(roname);
1559 2 : nulls[1] = false;
1560 : }
1561 :
1562 2 : LWLockAcquire(&state->lock, LW_SHARED);
1563 :
2878 bruce 1564 CBC 2 : values[2] = LSNGetDatum(state->remote_lsn);
2902 andres 1565 GIC 2 : nulls[2] = false;
1566 :
1567 2 : values[3] = LSNGetDatum(state->local_lsn);
1568 2 : nulls[3] = false;
1569 :
1570 2 : LWLockRelease(&state->lock);
2902 andres 1571 ECB :
398 michael 1572 GIC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1573 : values, nulls);
2902 andres 1574 ECB : }
1575 :
2902 andres 1576 GIC 2 : LWLockRelease(ReplicationOriginLock);
2902 andres 1577 ECB :
1578 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1579 :
2902 andres 1580 CBC 2 : return (Datum) 0;
2902 andres 1581 ECB : }
|