TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/logical.h"
86 : #include "replication/origin.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/copydir.h"
89 : #include "storage/fd.h"
90 : #include "storage/ipc.h"
91 : #include "storage/lmgr.h"
92 : #include "utils/builtins.h"
93 : #include "utils/fmgroids.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /*
100 : * Replay progress of a single remote node.
101 : */
102 : typedef struct ReplicationState
103 : {
104 : /*
105 : * Local identifier for the remote node.
106 : */
107 : RepOriginId roident;
108 :
109 : /*
110 : * Location of the latest commit from the remote side.
111 : */
112 : XLogRecPtr remote_lsn;
113 :
114 : /*
115 : * Remember the local lsn of the commit record so we can XLogFlush() to it
116 : * during a checkpoint so we know the commit record actually is safe on
117 : * disk.
118 : */
119 : XLogRecPtr local_lsn;
120 :
121 : /*
122 : * PID of backend that's acquired slot, or 0 if none.
123 : */
124 : int acquired_by;
125 :
126 : /*
127 : * Condition variable that's signaled when acquired_by changes.
128 : */
129 : ConditionVariable origin_cv;
130 :
131 : /*
132 : * Lock protecting remote_lsn and local_lsn.
133 : */
134 : LWLock lock;
135 : } ReplicationState;
136 :
137 : /*
138 : * On disk version of ReplicationState.
139 : */
140 : typedef struct ReplicationStateOnDisk
141 : {
142 : RepOriginId roident;
143 : XLogRecPtr remote_lsn;
144 : } ReplicationStateOnDisk;
145 :
146 :
147 : typedef struct ReplicationStateCtl
148 : {
149 : /* Tranche to use for per-origin LWLocks */
150 : int tranche_id;
151 : /* Array of length max_replication_slots */
152 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
153 : } ReplicationStateCtl;
154 :
155 : /* external variables */
156 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
157 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
158 : TimestampTz replorigin_session_origin_timestamp = 0;
159 :
160 : /*
161 : * Base address into a shared memory array of replication states of size
162 : * max_replication_slots.
163 : *
164 : * XXX: Should we use a separate variable to size this rather than
165 : * max_replication_slots?
166 : */
167 : static ReplicationState *replication_states;
168 :
169 : /*
170 : * Actual shared memory block (replication_states[] is now part of this).
171 : */
172 : static ReplicationStateCtl *replication_states_ctl;
173 :
174 : /*
175 : * Backend-local, cached element from ReplicationState for use in a backend
176 : * replaying remote commits, so we don't have to search ReplicationState for
177 : * the backends current RepOriginId.
178 : */
179 : static ReplicationState *session_replication_state = NULL;
180 :
181 : /* Magic for on disk files. */
182 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
183 :
184 : static void
185 GIC 31 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
186 ECB : {
187 GIC 31 : if (check_slots && max_replication_slots == 0)
188 LBC 0 : ereport(ERROR,
189 EUB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 : errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 :
192 GIC 31 : if (!recoveryOK && RecoveryInProgress())
193 LBC 0 : ereport(ERROR,
194 EUB : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195 : errmsg("cannot manipulate replication origins during recovery")));
196 GIC 31 : }
197 ECB :
198 :
199 : /*
200 : * IsReservedOriginName
201 : * True iff name is either "none" or "any".
202 : */
203 : static bool
204 GNC 7 : IsReservedOriginName(const char *name)
205 : {
206 13 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
207 6 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
208 : }
209 :
210 : /* ---------------------------------------------------------------------------
211 : * Functions for working with replication origins themselves.
212 : * ---------------------------------------------------------------------------
213 : */
214 :
215 : /*
216 ECB : * Check for a persistent replication origin identified by name.
217 : *
218 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
219 : */
220 : RepOriginId
221 GIC 694 : replorigin_by_name(const char *roname, bool missing_ok)
222 : {
223 : Form_pg_replication_origin ident;
224 694 : Oid roident = InvalidOid;
225 : HeapTuple tuple;
226 : Datum roname_d;
227 :
228 694 : roname_d = CStringGetTextDatum(roname);
229 :
230 694 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
231 694 : if (HeapTupleIsValid(tuple))
232 : {
233 CBC 390 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
234 GIC 390 : roident = ident->roident;
235 390 : ReleaseSysCache(tuple);
236 ECB : }
237 GIC 304 : else if (!missing_ok)
238 4 : ereport(ERROR,
239 : (errcode(ERRCODE_UNDEFINED_OBJECT),
240 ECB : errmsg("replication origin \"%s\" does not exist",
241 : roname)));
242 :
243 CBC 690 : return roident;
244 : }
245 ECB :
246 : /*
247 : * Create a replication origin.
248 : *
249 : * Needs to be called in a transaction.
250 : */
251 : RepOriginId
252 GIC 271 : replorigin_create(const char *roname)
253 : {
254 : Oid roident;
255 CBC 271 : HeapTuple tuple = NULL;
256 : Relation rel;
257 : Datum roname_d;
258 : SnapshotData SnapshotDirty;
259 : SysScanDesc scan;
260 : ScanKeyData key;
261 :
262 GIC 271 : roname_d = CStringGetTextDatum(roname);
263 :
264 CBC 271 : Assert(IsTransactionState());
265 :
266 : /*
267 ECB : * We need the numeric replication origin to be 16bit wide, so we cannot
268 : * rely on the normal oid allocation. Instead we simply scan
269 : * pg_replication_origin for the first unused id. That's not particularly
270 : * efficient, but this should be a fairly infrequent operation - we can
271 : * easily spend a bit more code on this when it turns out it needs to be
272 : * faster.
273 : *
274 : * We handle concurrency by taking an exclusive lock (allowing reads!)
275 : * over the table for the duration of the search. Because we use a "dirty
276 : * snapshot" we can read rows that other in-progress sessions have
277 : * written, even though they would be invisible with normal snapshots. Due
278 : * to the exclusive lock there's no danger that new rows can appear while
279 : * we're checking.
280 : */
281 GIC 271 : InitDirtySnapshot(SnapshotDirty);
282 :
283 271 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
284 :
285 492 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
286 : {
287 : bool nulls[Natts_pg_replication_origin];
288 : Datum values[Natts_pg_replication_origin];
289 : bool collides;
290 :
291 492 : CHECK_FOR_INTERRUPTS();
292 :
293 CBC 492 : ScanKeyInit(&key,
294 : Anum_pg_replication_origin_roident,
295 ECB : BTEqualStrategyNumber, F_OIDEQ,
296 : ObjectIdGetDatum(roident));
297 :
298 GIC 492 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
299 : true /* indexOK */ ,
300 : &SnapshotDirty,
301 : 1, &key);
302 :
303 CBC 492 : collides = HeapTupleIsValid(systable_getnext(scan));
304 :
305 492 : systable_endscan(scan);
306 :
307 GIC 492 : if (!collides)
308 : {
309 : /*
310 ECB : * Ok, found an unused roident, insert the new row and do a CCI,
311 : * so our callers can look it up if they want to.
312 : */
313 GIC 271 : memset(&nulls, 0, sizeof(nulls));
314 :
315 CBC 271 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
316 GIC 271 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
317 ECB :
318 GIC 271 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
319 CBC 271 : CatalogTupleInsert(rel, tuple);
320 GIC 270 : CommandCounterIncrement();
321 270 : break;
322 : }
323 : }
324 :
325 ECB : /* now release lock again, */
326 GIC 270 : table_close(rel, ExclusiveLock);
327 ECB :
328 CBC 270 : if (tuple == NULL)
329 UIC 0 : ereport(ERROR,
330 ECB : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
331 : errmsg("could not find free replication origin ID")));
332 :
333 CBC 270 : heap_freetuple(tuple);
334 GIC 270 : return roident;
335 : }
336 :
337 : /*
338 ECB : * Helper function to drop a replication origin.
339 : */
340 : static void
341 GNC 221 : replorigin_state_clear(RepOriginId roident, bool nowait)
342 : {
343 : int i;
344 ECB :
345 : /*
346 : * Clean up the slot state info, if there is any matching slot.
347 : */
348 GIC 221 : restart:
349 221 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
350 :
351 CBC 752 : for (i = 0; i < max_replication_slots; i++)
352 : {
353 GIC 717 : ReplicationState *state = &replication_states[i];
354 :
355 717 : if (state->roident == roident)
356 : {
357 : /* found our slot, is it busy? */
358 CBC 186 : if (state->acquired_by != 0)
359 ECB : {
360 : ConditionVariable *cv;
361 :
362 UIC 0 : if (nowait)
363 LBC 0 : ereport(ERROR,
364 : (errcode(ERRCODE_OBJECT_IN_USE),
365 ECB : errmsg("could not drop replication origin with ID %d, in use by PID %d",
366 : state->roident,
367 : state->acquired_by)));
368 :
369 : /*
370 : * We must wait and then retry. Since we don't know which CV
371 : * to wait on until here, we can't readily use
372 EUB : * ConditionVariablePrepareToSleep (calling it here would be
373 : * wrong, since we could miss the signal if we did so); just
374 : * use ConditionVariableSleep directly.
375 : */
376 UIC 0 : cv = &state->origin_cv;
377 :
378 0 : LWLockRelease(ReplicationOriginLock);
379 :
380 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
381 0 : goto restart;
382 : }
383 :
384 : /* first make a WAL log entry */
385 : {
386 EUB : xl_replorigin_drop xlrec;
387 :
388 GBC 186 : xlrec.node_id = roident;
389 GIC 186 : XLogBeginInsert();
390 GBC 186 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
391 186 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
392 : }
393 :
394 : /* then clear the in-memory slot */
395 GIC 186 : state->roident = InvalidRepOriginId;
396 186 : state->remote_lsn = InvalidXLogRecPtr;
397 186 : state->local_lsn = InvalidXLogRecPtr;
398 CBC 186 : break;
399 ECB : }
400 : }
401 CBC 221 : LWLockRelease(ReplicationOriginLock);
402 GIC 221 : ConditionVariableCancelSleep();
403 221 : }
404 :
405 : /*
406 : * Drop replication origin (by name).
407 : *
408 ECB : * Needs to be called in a transaction.
409 : */
410 : void
411 GIC 367 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
412 : {
413 : RepOriginId roident;
414 ECB : Relation rel;
415 : HeapTuple tuple;
416 :
417 CBC 367 : Assert(IsTransactionState());
418 :
419 GNC 367 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
420 EUB :
421 GIC 367 : roident = replorigin_by_name(name, missing_ok);
422 :
423 : /* Lock the origin to prevent concurrent drops. */
424 GNC 366 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
425 : AccessExclusiveLock);
426 :
427 366 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
428 366 : if (!HeapTupleIsValid(tuple))
429 : {
430 145 : if (!missing_ok)
431 UNC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
432 : roident);
433 :
434 : /*
435 : * We don't need to retain the locks if the origin is already dropped.
436 : */
437 GNC 145 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
438 : AccessExclusiveLock);
439 145 : table_close(rel, RowExclusiveLock);
440 145 : return;
441 : }
442 :
443 221 : replorigin_state_clear(roident, nowait);
444 :
445 : /*
446 : * Now, we can delete the catalog entry.
447 : */
448 221 : CatalogTupleDelete(rel, &tuple->t_self);
449 221 : ReleaseSysCache(tuple);
450 :
451 221 : CommandCounterIncrement();
452 :
453 ECB : /* We keep the lock on pg_replication_origin until commit */
454 GIC 221 : table_close(rel, NoLock);
455 ECB : }
456 :
457 : /*
458 : * Lookup replication origin via its oid and return the name.
459 : *
460 : * The external name is palloc'd in the calling context.
461 : *
462 : * Returns true if the origin is known, false otherwise.
463 : */
464 : bool
465 CBC 9 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
466 : {
467 ECB : HeapTuple tuple;
468 : Form_pg_replication_origin ric;
469 :
470 CBC 9 : Assert(OidIsValid((Oid) roident));
471 GIC 9 : Assert(roident != InvalidRepOriginId);
472 9 : Assert(roident != DoNotReplicateId);
473 :
474 9 : tuple = SearchSysCache1(REPLORIGIDENT,
475 : ObjectIdGetDatum((Oid) roident));
476 :
477 9 : if (HeapTupleIsValid(tuple))
478 : {
479 9 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
480 9 : *roname = text_to_cstring(&ric->roname);
481 CBC 9 : ReleaseSysCache(tuple);
482 :
483 GIC 9 : return true;
484 : }
485 : else
486 ECB : {
487 LBC 0 : *roname = NULL;
488 ECB :
489 UIC 0 : if (!missing_ok)
490 LBC 0 : ereport(ERROR,
491 : (errcode(ERRCODE_UNDEFINED_OBJECT),
492 : errmsg("replication origin with ID %d does not exist",
493 ECB : roident)));
494 :
495 LBC 0 : return false;
496 ECB : }
497 : }
498 :
499 :
500 : /* ---------------------------------------------------------------------------
501 : * Functions for handling replication progress.
502 : * ---------------------------------------------------------------------------
503 EUB : */
504 :
505 : Size
506 GBC 6390 : ReplicationOriginShmemSize(void)
507 : {
508 GIC 6390 : Size size = 0;
509 :
510 : /*
511 EUB : * XXX: max_replication_slots is arguably the wrong thing to use, as here
512 : * we keep the replay state of *remote* transactions. But for now it seems
513 : * sufficient to reuse it, rather than introduce a separate GUC.
514 : */
515 GIC 6390 : if (max_replication_slots == 0)
516 UIC 0 : return size;
517 :
518 GIC 6390 : size = add_size(size, offsetof(ReplicationStateCtl, states));
519 :
520 6390 : size = add_size(size,
521 : mul_size(max_replication_slots, sizeof(ReplicationState)));
522 CBC 6390 : return size;
523 : }
524 ECB :
525 : void
526 GIC 1826 : ReplicationOriginShmemInit(void)
527 : {
528 : bool found;
529 :
530 1826 : if (max_replication_slots == 0)
531 LBC 0 : return;
532 EUB :
533 GIC 1826 : replication_states_ctl = (ReplicationStateCtl *)
534 CBC 1826 : ShmemInitStruct("ReplicationOriginState",
535 : ReplicationOriginShmemSize(),
536 ECB : &found);
537 GIC 1826 : replication_states = replication_states_ctl->states;
538 ECB :
539 GIC 1826 : if (!found)
540 : {
541 : int i;
542 ECB :
543 GIC 130639 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
544 :
545 1826 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
546 ECB :
547 GBC 19967 : for (i = 0; i < max_replication_slots; i++)
548 : {
549 CBC 18141 : LWLockInitialize(&replication_states[i].lock,
550 18141 : replication_states_ctl->tranche_id);
551 GIC 18141 : ConditionVariableInit(&replication_states[i].origin_cv);
552 : }
553 ECB : }
554 : }
555 :
556 : /* ---------------------------------------------------------------------------
557 : * Perform a checkpoint of each replication origin's progress with respect to
558 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
559 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
560 : * if the transactions were originally committed asynchronously.
561 : *
562 : * We store checkpoints in the following format:
563 : * +-------+------------------------+------------------+-----+--------+
564 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
565 : * +-------+------------------------+------------------+-----+--------+
566 : *
567 : * So its just the magic, followed by the statically sized
568 : * ReplicationStateOnDisk structs. Note that the maximum number of
569 : * ReplicationState is determined by max_replication_slots.
570 : * ---------------------------------------------------------------------------
571 : */
572 : void
573 GIC 2363 : CheckPointReplicationOrigin(void)
574 : {
575 2363 : const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
576 2363 : const char *path = "pg_logical/replorigin_checkpoint";
577 : int tmpfd;
578 : int i;
579 2363 : uint32 magic = REPLICATION_STATE_MAGIC;
580 : pg_crc32c crc;
581 :
582 2363 : if (max_replication_slots == 0)
583 UIC 0 : return;
584 :
585 GIC 2363 : INIT_CRC32C(crc);
586 :
587 : /* make sure no old temp file is remaining */
588 2363 : if (unlink(tmppath) < 0 && errno != ENOENT)
589 LBC 0 : ereport(PANIC,
590 : (errcode_for_file_access(),
591 ECB : errmsg("could not remove file \"%s\": %m",
592 : tmppath)));
593 :
594 : /*
595 : * no other backend can perform this at the same time; only one checkpoint
596 : * can happen at a time.
597 : */
598 CBC 2363 : tmpfd = OpenTransientFile(tmppath,
599 EUB : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
600 GIC 2363 : if (tmpfd < 0)
601 LBC 0 : ereport(PANIC,
602 : (errcode_for_file_access(),
603 : errmsg("could not create file \"%s\": %m",
604 ECB : tmppath)));
605 EUB :
606 : /* write magic */
607 GIC 2363 : errno = 0;
608 2363 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
609 : {
610 : /* if write didn't set errno, assume problem is no disk space */
611 UIC 0 : if (errno == 0)
612 0 : errno = ENOSPC;
613 0 : ereport(PANIC,
614 ECB : (errcode_for_file_access(),
615 : errmsg("could not write to file \"%s\": %m",
616 : tmppath)));
617 EUB : }
618 GIC 2363 : COMP_CRC32C(crc, &magic, sizeof(magic));
619 :
620 : /* prevent concurrent creations/drops */
621 2363 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
622 :
623 ECB : /* write actual data */
624 CBC 25839 : for (i = 0; i < max_replication_slots; i++)
625 : {
626 : ReplicationStateOnDisk disk_state;
627 GBC 23476 : ReplicationState *curstate = &replication_states[i];
628 EUB : XLogRecPtr local_lsn;
629 :
630 GIC 23476 : if (curstate->roident == InvalidRepOriginId)
631 23449 : continue;
632 :
633 : /* zero, to avoid uninitialized padding bytes */
634 CBC 27 : memset(&disk_state, 0, sizeof(disk_state));
635 :
636 GIC 27 : LWLockAcquire(&curstate->lock, LW_SHARED);
637 ECB :
638 GIC 27 : disk_state.roident = curstate->roident;
639 :
640 CBC 27 : disk_state.remote_lsn = curstate->remote_lsn;
641 GIC 27 : local_lsn = curstate->local_lsn;
642 :
643 CBC 27 : LWLockRelease(&curstate->lock);
644 :
645 : /* make sure we only write out a commit that's persistent */
646 27 : XLogFlush(local_lsn);
647 ECB :
648 GIC 27 : errno = 0;
649 27 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
650 ECB : sizeof(disk_state))
651 : {
652 : /* if write didn't set errno, assume problem is no disk space */
653 UIC 0 : if (errno == 0)
654 LBC 0 : errno = ENOSPC;
655 UIC 0 : ereport(PANIC,
656 ECB : (errcode_for_file_access(),
657 : errmsg("could not write to file \"%s\": %m",
658 : tmppath)));
659 : }
660 :
661 GIC 27 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
662 ECB : }
663 :
664 CBC 2363 : LWLockRelease(ReplicationOriginLock);
665 ECB :
666 : /* write out the CRC */
667 GIC 2363 : FIN_CRC32C(crc);
668 2363 : errno = 0;
669 GBC 2363 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
670 EUB : {
671 : /* if write didn't set errno, assume problem is no disk space */
672 UIC 0 : if (errno == 0)
673 0 : errno = ENOSPC;
674 0 : ereport(PANIC,
675 : (errcode_for_file_access(),
676 : errmsg("could not write to file \"%s\": %m",
677 ECB : tmppath)));
678 : }
679 :
680 CBC 2363 : if (CloseTransientFile(tmpfd) != 0)
681 UIC 0 : ereport(PANIC,
682 : (errcode_for_file_access(),
683 ECB : errmsg("could not close file \"%s\": %m",
684 : tmppath)));
685 :
686 : /* fsync, rename to permanent file, fsync file and directory */
687 GIC 2363 : durable_rename(tmppath, path, PANIC);
688 EUB : }
689 :
690 : /*
691 : * Recover replication replay status from checkpoint data saved earlier by
692 : * CheckPointReplicationOrigin.
693 : *
694 : * This only needs to be called at startup and *not* during every checkpoint
695 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
696 ECB : * state thereafter can be recovered by looking at commit records.
697 EUB : */
698 : void
699 GIC 1176 : StartupReplicationOrigin(void)
700 : {
701 1176 : const char *path = "pg_logical/replorigin_checkpoint";
702 : int fd;
703 ECB : int readBytes;
704 GIC 1176 : uint32 magic = REPLICATION_STATE_MAGIC;
705 1176 : int last_state = 0;
706 : pg_crc32c file_crc;
707 : pg_crc32c crc;
708 :
709 : /* don't want to overwrite already existing state */
710 : #ifdef USE_ASSERT_CHECKING
711 : static bool already_started = false;
712 :
713 1176 : Assert(!already_started);
714 1176 : already_started = true;
715 ECB : #endif
716 :
717 CBC 1176 : if (max_replication_slots == 0)
718 GIC 305 : return;
719 :
720 CBC 1176 : INIT_CRC32C(crc);
721 ECB :
722 GIC 1176 : elog(DEBUG2, "starting up replication origin progress state");
723 :
724 1176 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
725 :
726 : /*
727 : * might have had max_replication_slots == 0 last run, or we just brought
728 : * up a standby.
729 ECB : */
730 CBC 1176 : if (fd < 0 && errno == ENOENT)
731 GIC 305 : return;
732 871 : else if (fd < 0)
733 LBC 0 : ereport(PANIC,
734 ECB : (errcode_for_file_access(),
735 : errmsg("could not open file \"%s\": %m",
736 : path)));
737 :
738 : /* verify magic, that is written even if nothing was active */
739 GIC 871 : readBytes = read(fd, &magic, sizeof(magic));
740 CBC 871 : if (readBytes != sizeof(magic))
741 : {
742 UIC 0 : if (readBytes < 0)
743 0 : ereport(PANIC,
744 : (errcode_for_file_access(),
745 : errmsg("could not read file \"%s\": %m",
746 ECB : path)));
747 : else
748 LBC 0 : ereport(PANIC,
749 EUB : (errcode(ERRCODE_DATA_CORRUPTED),
750 : errmsg("could not read file \"%s\": read %d of %zu",
751 : path, readBytes, sizeof(magic))));
752 : }
753 GIC 871 : COMP_CRC32C(crc, &magic, sizeof(magic));
754 :
755 CBC 871 : if (magic != REPLICATION_STATE_MAGIC)
756 LBC 0 : ereport(PANIC,
757 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
758 EUB : magic, REPLICATION_STATE_MAGIC)));
759 :
760 : /* we can skip locking here, no other access is possible */
761 :
762 : /* recover individual states, until there are no more to be found */
763 : while (true)
764 GBC 3 : {
765 : ReplicationStateOnDisk disk_state;
766 :
767 GIC 874 : readBytes = read(fd, &disk_state, sizeof(disk_state));
768 :
769 ECB : /* no further data */
770 GIC 874 : if (readBytes == sizeof(crc))
771 ECB : {
772 EUB : /* not pretty, but simple ... */
773 GIC 871 : file_crc = *(pg_crc32c *) &disk_state;
774 871 : break;
775 : }
776 :
777 3 : if (readBytes < 0)
778 : {
779 UIC 0 : ereport(PANIC,
780 ECB : (errcode_for_file_access(),
781 : errmsg("could not read file \"%s\": %m",
782 : path)));
783 : }
784 :
785 GIC 3 : if (readBytes != sizeof(disk_state))
786 ECB : {
787 UIC 0 : ereport(PANIC,
788 : (errcode_for_file_access(),
789 ECB : errmsg("could not read file \"%s\": read %d of %zu",
790 : path, readBytes, sizeof(disk_state))));
791 : }
792 :
793 CBC 3 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
794 :
795 GBC 3 : if (last_state == max_replication_slots)
796 UIC 0 : ereport(PANIC,
797 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798 : errmsg("could not find free replication state, increase max_replication_slots")));
799 :
800 : /* copy data to shared memory */
801 CBC 3 : replication_states[last_state].roident = disk_state.roident;
802 GIC 3 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
803 GBC 3 : last_state++;
804 :
805 GIC 3 : ereport(LOG,
806 : (errmsg("recovered replication state of node %d to %X/%X",
807 : disk_state.roident,
808 : LSN_FORMAT_ARGS(disk_state.remote_lsn))));
809 ECB : }
810 :
811 : /* now check checksum */
812 GBC 871 : FIN_CRC32C(crc);
813 GIC 871 : if (file_crc != crc)
814 UIC 0 : ereport(PANIC,
815 : (errcode(ERRCODE_DATA_CORRUPTED),
816 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
817 ECB : crc, file_crc)));
818 :
819 CBC 871 : if (CloseTransientFile(fd) != 0)
820 UIC 0 : ereport(PANIC,
821 ECB : (errcode_for_file_access(),
822 : errmsg("could not close file \"%s\": %m",
823 : path)));
824 : }
825 :
826 : void
827 GIC 6 : replorigin_redo(XLogReaderState *record)
828 ECB : {
829 CBC 6 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
830 EUB :
831 GIC 6 : switch (info)
832 : {
833 3 : case XLOG_REPLORIGIN_SET:
834 : {
835 CBC 3 : xl_replorigin_set *xlrec =
836 GBC 3 : (xl_replorigin_set *) XLogRecGetData(record);
837 :
838 GIC 3 : replorigin_advance(xlrec->node_id,
839 : xlrec->remote_lsn, record->EndRecPtr,
840 3 : xlrec->force /* backward */ ,
841 : false /* WAL log */ );
842 3 : break;
843 ECB : }
844 GIC 3 : case XLOG_REPLORIGIN_DROP:
845 ECB : {
846 : xl_replorigin_drop *xlrec;
847 : int i;
848 :
849 CBC 3 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
850 :
851 4 : for (i = 0; i < max_replication_slots; i++)
852 ECB : {
853 GIC 4 : ReplicationState *state = &replication_states[i];
854 ECB :
855 : /* found our slot */
856 CBC 4 : if (state->roident == xlrec->node_id)
857 : {
858 ECB : /* reset entry */
859 GIC 3 : state->roident = InvalidRepOriginId;
860 CBC 3 : state->remote_lsn = InvalidXLogRecPtr;
861 GIC 3 : state->local_lsn = InvalidXLogRecPtr;
862 3 : break;
863 : }
864 : }
865 CBC 3 : break;
866 : }
867 LBC 0 : default:
868 UIC 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
869 ECB : }
870 GIC 6 : }
871 :
872 ECB :
873 : /*
874 : * Tell the replication origin progress machinery that a commit from 'node'
875 : * that originated at the LSN remote_commit on the remote node was replayed
876 : * successfully and that we don't need to do so again. In combination with
877 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
878 : * that ensures we won't lose knowledge about that after a crash if the
879 : * transaction had a persistent effect (think of asynchronous commits).
880 : *
881 : * local_commit needs to be a local LSN of the commit so that we can make sure
882 : * upon a checkpoint that enough WAL has been persisted to disk.
883 EUB : *
884 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
885 : * unless running in recovery.
886 ECB : */
887 : void
888 GIC 200 : replorigin_advance(RepOriginId node,
889 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
890 : bool go_backward, bool wal_log)
891 : {
892 : int i;
893 200 : ReplicationState *replication_state = NULL;
894 200 : ReplicationState *free_state = NULL;
895 :
896 200 : Assert(node != InvalidRepOriginId);
897 :
898 : /* we don't track DoNotReplicateId */
899 200 : if (node == DoNotReplicateId)
900 UIC 0 : return;
901 :
902 : /*
903 : * XXX: For the case where this is called by WAL replay, it'd be more
904 ECB : * efficient to restore into a backend local hashtable and only dump into
905 : * shmem after recovery is finished. Let's wait with implementing that
906 : * till it's shown to be a measurable expense
907 : */
908 :
909 : /* Lock exclusively, as we may have to create a new table entry. */
910 CBC 200 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
911 :
912 ECB : /*
913 : * Search for either an existing slot for the origin, or a free one we can
914 : * use.
915 : */
916 GBC 1749 : for (i = 0; i < max_replication_slots; i++)
917 : {
918 GIC 1595 : ReplicationState *curstate = &replication_states[i];
919 :
920 : /* remember where to insert if necessary */
921 1595 : if (curstate->roident == InvalidRepOriginId &&
922 : free_state == NULL)
923 : {
924 156 : free_state = curstate;
925 156 : continue;
926 ECB : }
927 :
928 : /* not our slot */
929 GIC 1439 : if (curstate->roident != node)
930 : {
931 1393 : continue;
932 ECB : }
933 :
934 : /* ok, found slot */
935 GIC 46 : replication_state = curstate;
936 :
937 CBC 46 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
938 :
939 : /* Make sure it's not used by somebody else */
940 46 : if (replication_state->acquired_by != 0)
941 ECB : {
942 UIC 0 : ereport(ERROR,
943 : (errcode(ERRCODE_OBJECT_IN_USE),
944 : errmsg("replication origin with ID %d is already active for PID %d",
945 ECB : replication_state->roident,
946 : replication_state->acquired_by)));
947 : }
948 :
949 GIC 46 : break;
950 : }
951 ECB :
952 GIC 200 : if (replication_state == NULL && free_state == NULL)
953 LBC 0 : ereport(ERROR,
954 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955 : errmsg("could not find free replication state slot for replication origin with ID %d",
956 ECB : node),
957 : errhint("Increase max_replication_slots and try again.")));
958 EUB :
959 GIC 200 : if (replication_state == NULL)
960 : {
961 : /* initialize new slot */
962 154 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
963 154 : replication_state = free_state;
964 154 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
965 CBC 154 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
966 GIC 154 : replication_state->roident = node;
967 : }
968 ECB :
969 GBC 200 : Assert(replication_state->roident != InvalidRepOriginId);
970 :
971 : /*
972 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
973 : * and the standby gets the message. Primarily this will be called during
974 : * WAL replay (of commit records) where no WAL logging is necessary.
975 ECB : */
976 GIC 200 : if (wal_log)
977 : {
978 ECB : xl_replorigin_set xlrec;
979 :
980 CBC 155 : xlrec.remote_lsn = remote_commit;
981 155 : xlrec.node_id = node;
982 155 : xlrec.force = go_backward;
983 :
984 GIC 155 : XLogBeginInsert();
985 CBC 155 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
986 :
987 GIC 155 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
988 : }
989 :
990 : /*
991 : * Due to - harmless - race conditions during a checkpoint we could see
992 ECB : * values here that are older than the ones we already have in memory. We
993 : * could also see older values for prepared transactions when the prepare
994 : * is sent at a later point of time along with commit prepared and there
995 : * are other transactions commits between prepare and commit prepared. See
996 : * ReorderBufferFinishPrepared. Don't overwrite those.
997 : */
998 CBC 200 : if (go_backward || replication_state->remote_lsn < remote_commit)
999 GIC 189 : replication_state->remote_lsn = remote_commit;
1000 CBC 200 : if (local_commit != InvalidXLogRecPtr &&
1001 42 : (go_backward || replication_state->local_lsn < local_commit))
1002 GIC 45 : replication_state->local_lsn = local_commit;
1003 CBC 200 : LWLockRelease(&replication_state->lock);
1004 :
1005 : /*
1006 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1007 : * otherwise be dropped anytime.
1008 : */
1009 GIC 200 : LWLockRelease(ReplicationOriginLock);
1010 : }
1011 :
1012 :
1013 : XLogRecPtr
1014 CBC 8 : replorigin_get_progress(RepOriginId node, bool flush)
1015 ECB : {
1016 : int i;
1017 CBC 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1018 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1019 ECB :
1020 : /* prevent slots from being concurrently dropped */
1021 GIC 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1022 :
1023 38 : for (i = 0; i < max_replication_slots; i++)
1024 : {
1025 ECB : ReplicationState *state;
1026 :
1027 GIC 35 : state = &replication_states[i];
1028 :
1029 35 : if (state->roident == node)
1030 ECB : {
1031 GIC 5 : LWLockAcquire(&state->lock, LW_SHARED);
1032 :
1033 CBC 5 : remote_lsn = state->remote_lsn;
1034 5 : local_lsn = state->local_lsn;
1035 :
1036 GIC 5 : LWLockRelease(&state->lock);
1037 ECB :
1038 GIC 5 : break;
1039 ECB : }
1040 : }
1041 :
1042 GIC 8 : LWLockRelease(ReplicationOriginLock);
1043 ECB :
1044 GIC 8 : if (flush && local_lsn != InvalidXLogRecPtr)
1045 CBC 1 : XLogFlush(local_lsn);
1046 :
1047 8 : return remote_lsn;
1048 : }
1049 ECB :
1050 : /*
1051 : * Tear down a (possibly) configured session replication origin during process
1052 : * exit.
1053 : */
1054 : static void
1055 GIC 314 : ReplicationOriginExitCleanup(int code, Datum arg)
1056 : {
1057 314 : ConditionVariable *cv = NULL;
1058 ECB :
1059 GIC 314 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1060 ECB :
1061 CBC 314 : if (session_replication_state != NULL &&
1062 GIC 164 : session_replication_state->acquired_by == MyProcPid)
1063 ECB : {
1064 GIC 154 : cv = &session_replication_state->origin_cv;
1065 :
1066 154 : session_replication_state->acquired_by = 0;
1067 154 : session_replication_state = NULL;
1068 : }
1069 :
1070 314 : LWLockRelease(ReplicationOriginLock);
1071 ECB :
1072 GIC 314 : if (cv)
1073 CBC 154 : ConditionVariableBroadcast(cv);
1074 GIC 314 : }
1075 ECB :
1076 : /*
1077 : * Setup a replication origin in the shared memory struct if it doesn't
1078 : * already exist and cache access to the specific ReplicationSlot so the
1079 : * array doesn't have to be searched when calling
1080 : * replorigin_session_advance().
1081 : *
1082 : * Normally only one such cached origin can exist per process so the cached
1083 : * value can only be set again after the previous value is torn down with
1084 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1085 : * (meaning the slot is not allowed to be already acquired by another process).
1086 : *
1087 : * However, sometimes multiple processes can safely re-use the same origin slot
1088 : * (for example, multiple parallel apply processes can safely use the same
1089 : * origin, provided they maintain commit order by allowing only one process to
1090 : * commit at a time). For this case the first process must pass acquired_by =
1091 : * 0, and then the other processes sharing that same origin can pass
1092 : * acquired_by = PID of the first process.
1093 : */
1094 : void
1095 GNC 316 : replorigin_session_setup(RepOriginId node, int acquired_by)
1096 ECB : {
1097 : static bool registered_cleanup;
1098 : int i;
1099 GIC 316 : int free_slot = -1;
1100 :
1101 316 : if (!registered_cleanup)
1102 : {
1103 314 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1104 314 : registered_cleanup = true;
1105 : }
1106 :
1107 316 : Assert(max_replication_slots > 0);
1108 :
1109 316 : if (session_replication_state != NULL)
1110 1 : ereport(ERROR,
1111 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1112 : errmsg("cannot setup replication origin when one is already setup")));
1113 :
1114 : /* Lock exclusively, as we may have to create a new table entry. */
1115 315 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1116 :
1117 : /*
1118 : * Search for either an existing slot for the origin, or a free one we can
1119 ECB : * use.
1120 : */
1121 GIC 3453 : for (i = 0; i < max_replication_slots; i++)
1122 : {
1123 CBC 3138 : ReplicationState *curstate = &replication_states[i];
1124 :
1125 ECB : /* remember where to insert if necessary */
1126 GIC 3138 : if (curstate->roident == InvalidRepOriginId &&
1127 ECB : free_slot == -1)
1128 : {
1129 GIC 315 : free_slot = i;
1130 315 : continue;
1131 ECB : }
1132 :
1133 : /* not our slot */
1134 CBC 2823 : if (curstate->roident != node)
1135 GIC 2579 : continue;
1136 :
1137 GNC 244 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1138 : {
1139 LBC 0 : ereport(ERROR,
1140 : (errcode(ERRCODE_OBJECT_IN_USE),
1141 : errmsg("replication origin with ID %d is already active for PID %d",
1142 : curstate->roident, curstate->acquired_by)));
1143 : }
1144 :
1145 ECB : /* ok, found slot */
1146 GIC 244 : session_replication_state = curstate;
1147 ECB : }
1148 :
1149 :
1150 CBC 315 : if (session_replication_state == NULL && free_slot == -1)
1151 UIC 0 : ereport(ERROR,
1152 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1153 ECB : errmsg("could not find free replication state slot for replication origin with ID %d",
1154 : node),
1155 : errhint("Increase max_replication_slots and try again.")));
1156 GIC 315 : else if (session_replication_state == NULL)
1157 : {
1158 ECB : /* initialize new slot */
1159 CBC 71 : session_replication_state = &replication_states[free_slot];
1160 GIC 71 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1161 CBC 71 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1162 GIC 71 : session_replication_state->roident = node;
1163 EUB : }
1164 :
1165 :
1166 GIC 315 : Assert(session_replication_state->roident != InvalidRepOriginId);
1167 :
1168 GNC 315 : if (acquired_by == 0)
1169 305 : session_replication_state->acquired_by = MyProcPid;
1170 10 : else if (session_replication_state->acquired_by != acquired_by)
1171 UNC 0 : elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1172 : node, acquired_by);
1173 :
1174 CBC 315 : LWLockRelease(ReplicationOriginLock);
1175 :
1176 : /* probably this one is pointless */
1177 GIC 315 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1178 CBC 315 : }
1179 EUB :
1180 : /*
1181 : * Reset replay state previously setup in this session.
1182 : *
1183 : * This function may only be called if an origin was setup with
1184 ECB : * replorigin_session_setup().
1185 : */
1186 : void
1187 CBC 152 : replorigin_session_reset(void)
1188 ECB : {
1189 : ConditionVariable *cv;
1190 :
1191 GIC 152 : Assert(max_replication_slots != 0);
1192 :
1193 152 : if (session_replication_state == NULL)
1194 CBC 1 : ereport(ERROR,
1195 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1196 ECB : errmsg("no replication origin is configured")));
1197 :
1198 CBC 151 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1199 EUB :
1200 GIC 151 : session_replication_state->acquired_by = 0;
1201 151 : cv = &session_replication_state->origin_cv;
1202 CBC 151 : session_replication_state = NULL;
1203 :
1204 GIC 151 : LWLockRelease(ReplicationOriginLock);
1205 ECB :
1206 CBC 151 : ConditionVariableBroadcast(cv);
1207 GIC 151 : }
1208 :
1209 : /*
1210 : * Do the same work replorigin_advance() does, just on the session's
1211 : * configured origin.
1212 : *
1213 : * This is noticeably cheaper than using replorigin_advance().
1214 : */
1215 ECB : void
1216 GIC 912 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1217 : {
1218 912 : Assert(session_replication_state != NULL);
1219 CBC 912 : Assert(session_replication_state->roident != InvalidRepOriginId);
1220 :
1221 912 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1222 912 : if (session_replication_state->local_lsn < local_commit)
1223 GIC 912 : session_replication_state->local_lsn = local_commit;
1224 912 : if (session_replication_state->remote_lsn < remote_commit)
1225 440 : session_replication_state->remote_lsn = remote_commit;
1226 CBC 912 : LWLockRelease(&session_replication_state->lock);
1227 GIC 912 : }
1228 ECB :
1229 : /*
1230 : * Ask the machinery about the point up to which we successfully replayed
1231 : * changes from an already setup replication origin.
1232 : */
1233 : XLogRecPtr
1234 CBC 149 : replorigin_session_get_progress(bool flush)
1235 ECB : {
1236 : XLogRecPtr remote_lsn;
1237 : XLogRecPtr local_lsn;
1238 :
1239 GIC 149 : Assert(session_replication_state != NULL);
1240 :
1241 149 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1242 149 : remote_lsn = session_replication_state->remote_lsn;
1243 149 : local_lsn = session_replication_state->local_lsn;
1244 CBC 149 : LWLockRelease(&session_replication_state->lock);
1245 :
1246 149 : if (flush && local_lsn != InvalidXLogRecPtr)
1247 1 : XLogFlush(local_lsn);
1248 :
1249 149 : return remote_lsn;
1250 ECB : }
1251 :
1252 :
1253 :
1254 : /* ---------------------------------------------------------------------------
1255 : * SQL functions for working with replication origin.
1256 : *
1257 : * These mostly should be fairly short wrappers around more generic functions.
1258 : * ---------------------------------------------------------------------------
1259 : */
1260 :
1261 : /*
1262 : * Create replication origin for the passed in name, and return the assigned
1263 : * oid.
1264 : */
1265 : Datum
1266 GIC 8 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1267 ECB : {
1268 : char *name;
1269 : RepOriginId roident;
1270 :
1271 CBC 8 : replorigin_check_prerequisites(false, false);
1272 ECB :
1273 GIC 8 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1274 ECB :
1275 : /*
1276 : * Replication origins "any and "none" are reserved for system options.
1277 : * The origins "pg_xxx" are reserved for internal use.
1278 : */
1279 GNC 8 : if (IsReservedName(name) || IsReservedOriginName(name))
1280 CBC 3 : ereport(ERROR,
1281 : (errcode(ERRCODE_RESERVED_NAME),
1282 : errmsg("replication origin name \"%s\" is reserved",
1283 : name),
1284 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1285 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1286 :
1287 : /*
1288 : * If built with appropriate switch, whine when regression-testing
1289 : * conventions for replication origin names are violated.
1290 : */
1291 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1292 : if (strncmp(name, "regress_", 8) != 0)
1293 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1294 : #endif
1295 :
1296 GIC 5 : roident = replorigin_create(name);
1297 :
1298 CBC 4 : pfree(name);
1299 :
1300 GIC 4 : PG_RETURN_OID(roident);
1301 : }
1302 :
1303 ECB : /*
1304 : * Drop replication origin.
1305 : */
1306 : Datum
1307 GIC 5 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1308 : {
1309 : char *name;
1310 :
1311 CBC 5 : replorigin_check_prerequisites(false, false);
1312 ECB :
1313 GIC 5 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1314 :
1315 5 : replorigin_drop_by_name(name, false, true);
1316 :
1317 4 : pfree(name);
1318 :
1319 4 : PG_RETURN_VOID();
1320 : }
1321 :
1322 : /*
1323 : * Return oid of a replication origin.
1324 : */
1325 : Datum
1326 UIC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1327 : {
1328 ECB : char *name;
1329 : RepOriginId roident;
1330 :
1331 UIC 0 : replorigin_check_prerequisites(false, false);
1332 ECB :
1333 UIC 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1334 0 : roident = replorigin_by_name(name, true);
1335 :
1336 0 : pfree(name);
1337 :
1338 0 : if (OidIsValid(roident))
1339 LBC 0 : PG_RETURN_OID(roident);
1340 UIC 0 : PG_RETURN_NULL();
1341 : }
1342 :
1343 ECB : /*
1344 : * Setup a replication origin for this session.
1345 : */
1346 : Datum
1347 CBC 5 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1348 : {
1349 ECB : char *name;
1350 : RepOriginId origin;
1351 :
1352 GIC 5 : replorigin_check_prerequisites(true, false);
1353 :
1354 5 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1355 5 : origin = replorigin_by_name(name, false);
1356 GNC 4 : replorigin_session_setup(origin, 0);
1357 :
1358 GBC 3 : replorigin_session_origin = origin;
1359 :
1360 GIC 3 : pfree(name);
1361 :
1362 3 : PG_RETURN_VOID();
1363 EUB : }
1364 :
1365 : /*
1366 : * Reset previously setup origin in this session
1367 : */
1368 : Datum
1369 GIC 4 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1370 EUB : {
1371 GBC 4 : replorigin_check_prerequisites(true, false);
1372 EUB :
1373 GIC 4 : replorigin_session_reset();
1374 :
1375 3 : replorigin_session_origin = InvalidRepOriginId;
1376 3 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1377 3 : replorigin_session_origin_timestamp = 0;
1378 :
1379 CBC 3 : PG_RETURN_VOID();
1380 : }
1381 :
1382 : /*
1383 : * Has a replication origin been setup for this session.
1384 ECB : */
1385 : Datum
1386 LBC 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1387 ECB : {
1388 LBC 0 : replorigin_check_prerequisites(false, false);
1389 :
1390 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1391 : }
1392 ECB :
1393 :
1394 : /*
1395 : * Return the replication progress for origin setup in the current session.
1396 : *
1397 : * If 'flush' is set to true it is ensured that the returned value corresponds
1398 : * to a local transaction that has been flushed. This is useful if asynchronous
1399 : * commits are used when replaying replicated transactions.
1400 : */
1401 : Datum
1402 GIC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1403 ECB : {
1404 GIC 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1405 CBC 2 : bool flush = PG_GETARG_BOOL(0);
1406 :
1407 2 : replorigin_check_prerequisites(true, false);
1408 ECB :
1409 CBC 2 : if (session_replication_state == NULL)
1410 UIC 0 : ereport(ERROR,
1411 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1412 : errmsg("no replication origin is configured")));
1413 :
1414 GIC 2 : remote_lsn = replorigin_session_get_progress(flush);
1415 :
1416 2 : if (remote_lsn == InvalidXLogRecPtr)
1417 UIC 0 : PG_RETURN_NULL();
1418 EUB :
1419 GIC 2 : PG_RETURN_LSN(remote_lsn);
1420 EUB : }
1421 :
1422 : Datum
1423 GIC 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1424 : {
1425 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1426 :
1427 1 : replorigin_check_prerequisites(true, false);
1428 :
1429 1 : if (session_replication_state == NULL)
1430 UIC 0 : ereport(ERROR,
1431 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1432 : errmsg("no replication origin is configured")));
1433 :
1434 CBC 1 : replorigin_session_origin_lsn = location;
1435 GIC 1 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1436 ECB :
1437 CBC 1 : PG_RETURN_VOID();
1438 : }
1439 ECB :
1440 : Datum
1441 LBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1442 EUB : {
1443 UIC 0 : replorigin_check_prerequisites(true, false);
1444 :
1445 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1446 LBC 0 : replorigin_session_origin_timestamp = 0;
1447 :
1448 0 : PG_RETURN_VOID();
1449 EUB : }
1450 :
1451 ECB :
1452 : Datum
1453 GIC 1 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1454 : {
1455 CBC 1 : text *name = PG_GETARG_TEXT_PP(0);
1456 GIC 1 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1457 ECB : RepOriginId node;
1458 :
1459 CBC 1 : replorigin_check_prerequisites(true, false);
1460 :
1461 ECB : /* lock to prevent the replication origin from vanishing */
1462 GBC 1 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1463 :
1464 GIC 1 : node = replorigin_by_name(text_to_cstring(name), false);
1465 :
1466 ECB : /*
1467 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1468 : * xact hasn't committed yet. This is why this function should be used to
1469 : * set up the initial replication state, but not for replay.
1470 : */
1471 UIC 0 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1472 : true /* go backward */ , true /* WAL log */ );
1473 EUB :
1474 UIC 0 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1475 EUB :
1476 UIC 0 : PG_RETURN_VOID();
1477 EUB : }
1478 :
1479 :
1480 : /*
1481 : * Return the replication progress for an individual replication origin.
1482 : *
1483 : * If 'flush' is set to true it is ensured that the returned value corresponds
1484 : * to a local transaction that has been flushed. This is useful if asynchronous
1485 ECB : * commits are used when replaying replicated transactions.
1486 : */
1487 : Datum
1488 CBC 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1489 : {
1490 : char *name;
1491 ECB : bool flush;
1492 : RepOriginId roident;
1493 GIC 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1494 ECB :
1495 GIC 3 : replorigin_check_prerequisites(true, true);
1496 ECB :
1497 GIC 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1498 3 : flush = PG_GETARG_BOOL(1);
1499 :
1500 3 : roident = replorigin_by_name(name, false);
1501 2 : Assert(OidIsValid(roident));
1502 :
1503 GBC 2 : remote_lsn = replorigin_get_progress(roident, flush);
1504 :
1505 GIC 2 : if (remote_lsn == InvalidXLogRecPtr)
1506 UBC 0 : PG_RETURN_NULL();
1507 :
1508 GBC 2 : PG_RETURN_LSN(remote_lsn);
1509 : }
1510 :
1511 :
1512 : Datum
1513 GIC 2 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1514 : {
1515 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1516 : int i;
1517 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1518 :
1519 : /* we want to return 0 rows if slot is set to zero */
1520 CBC 2 : replorigin_check_prerequisites(false, true);
1521 :
1522 GIC 2 : InitMaterializedSRF(fcinfo, 0);
1523 :
1524 : /* prevent slots from being concurrently dropped */
1525 CBC 2 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1526 :
1527 ECB : /*
1528 : * Iterate through all possible replication_states, display if they are
1529 : * filled. Note that we do not take any locks, so slightly corrupted/out
1530 : * of date values are a possibility.
1531 : */
1532 CBC 10 : for (i = 0; i < max_replication_slots; i++)
1533 ECB : {
1534 : ReplicationState *state;
1535 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1536 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1537 : char *roname;
1538 EUB :
1539 GIC 8 : state = &replication_states[i];
1540 ECB :
1541 : /* unused slot, nothing to display */
1542 GIC 8 : if (state->roident == InvalidRepOriginId)
1543 6 : continue;
1544 :
1545 CBC 2 : memset(values, 0, sizeof(values));
1546 GIC 2 : memset(nulls, 1, sizeof(nulls));
1547 ECB :
1548 GIC 2 : values[0] = ObjectIdGetDatum(state->roident);
1549 2 : nulls[0] = false;
1550 :
1551 : /*
1552 ECB : * We're not preventing the origin to be dropped concurrently, so
1553 : * silently accept that it might be gone.
1554 : */
1555 GIC 2 : if (replorigin_by_oid(state->roident, true,
1556 : &roname))
1557 ECB : {
1558 GIC 2 : values[1] = CStringGetTextDatum(roname);
1559 2 : nulls[1] = false;
1560 : }
1561 :
1562 2 : LWLockAcquire(&state->lock, LW_SHARED);
1563 :
1564 CBC 2 : values[2] = LSNGetDatum(state->remote_lsn);
1565 GIC 2 : nulls[2] = false;
1566 :
1567 2 : values[3] = LSNGetDatum(state->local_lsn);
1568 2 : nulls[3] = false;
1569 :
1570 2 : LWLockRelease(&state->lock);
1571 ECB :
1572 GIC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1573 : values, nulls);
1574 ECB : }
1575 :
1576 GIC 2 : LWLockRelease(ReplicationOriginLock);
1577 ECB :
1578 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1579 :
1580 CBC 2 : return (Datum) 0;
1581 ECB : }
|