Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * origin.c
4 : : * Logical replication progress tracking support.
5 : : *
6 : : * Copyright (c) 2013-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/origin.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * This file provides the following:
14 : : * * An infrastructure to name nodes in a replication setup
15 : : * * A facility to efficiently store and persist replication progress in an
16 : : * efficient and durable manner.
17 : : *
18 : : * Replication origin consist out of a descriptive, user defined, external
19 : : * name and a short, thus space efficient, internal 2 byte one. This split
20 : : * exists because replication origin have to be stored in WAL and shared
21 : : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : : * for the internal id of a replication origin as it seems unlikely that there
23 : : * soon will be more than 65k nodes in one replication setup; and using only
24 : : * two bytes allow us to be more space efficient.
25 : : *
26 : : * Replication progress is tracked in a shared memory table
27 : : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : : * ('slots') in this table are identified by the internal id. That's the case
29 : : * because it allows to increase replication progress during crash
30 : : * recovery. To allow doing so we store the original LSN (from the originating
31 : : * system) of a transaction in the commit record. That allows to recover the
32 : : * precise replayed state after crash recovery; without requiring synchronous
33 : : * commits. Allowing logical replication to use asynchronous commit is
34 : : * generally good for performance, but especially important as it allows a
35 : : * single threaded replay process to keep up with a source that has multiple
36 : : * backends generating changes concurrently. For efficiency and simplicity
37 : : * reasons a backend can setup one replication origin that's from then used as
38 : : * the source of changes produced by the backend, until reset again.
39 : : *
40 : : * This infrastructure is intended to be used in cooperation with logical
41 : : * decoding. When replaying from a remote system the configured origin is
42 : : * provided to output plugins, allowing prevention of replication loops and
43 : : * other filtering.
44 : : *
45 : : * There are several levels of locking at work:
46 : : *
47 : : * * To create and drop replication origins an exclusive lock on
48 : : * pg_replication_slot is required for the duration. That allows us to
49 : : * safely and conflict free assign new origins using a dirty snapshot.
50 : : *
51 : : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : : * LWLock has to be held exclusively; when iterating over the replication
53 : : * progress a shared lock has to be held, the same when advancing the
54 : : * replication progress of an individual backend that has not setup as the
55 : : * session's replication origin.
56 : : *
57 : : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : : * replication progress slot that slot's lwlock has to be held. That's
59 : : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : : * all our platforms, but it also simplifies memory ordering concerns
61 : : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : : * so it's less harmful to hold the lock over a WAL write
63 : : * (cf. AdvanceReplicationProgress).
64 : : *
65 : : * ---------------------------------------------------------------------------
66 : : */
67 : :
68 : : #include "postgres.h"
69 : :
70 : : #include <unistd.h>
71 : : #include <sys/stat.h>
72 : :
73 : : #include "access/genam.h"
74 : : #include "access/htup_details.h"
75 : : #include "access/table.h"
76 : : #include "access/xact.h"
77 : : #include "access/xloginsert.h"
78 : : #include "catalog/catalog.h"
79 : : #include "catalog/indexing.h"
80 : : #include "catalog/pg_subscription.h"
81 : : #include "funcapi.h"
82 : : #include "miscadmin.h"
83 : : #include "nodes/execnodes.h"
84 : : #include "pgstat.h"
85 : : #include "replication/origin.h"
86 : : #include "replication/slot.h"
87 : : #include "storage/condition_variable.h"
88 : : #include "storage/fd.h"
89 : : #include "storage/ipc.h"
90 : : #include "storage/lmgr.h"
91 : : #include "utils/builtins.h"
92 : : #include "utils/fmgroids.h"
93 : : #include "utils/pg_lsn.h"
94 : : #include "utils/rel.h"
95 : : #include "utils/snapmgr.h"
96 : : #include "utils/syscache.h"
97 : :
98 : : /*
99 : : * Replay progress of a single remote node.
100 : : */
101 : : typedef struct ReplicationState
102 : : {
103 : : /*
104 : : * Local identifier for the remote node.
105 : : */
106 : : RepOriginId roident;
107 : :
108 : : /*
109 : : * Location of the latest commit from the remote side.
110 : : */
111 : : XLogRecPtr remote_lsn;
112 : :
113 : : /*
114 : : * Remember the local lsn of the commit record so we can XLogFlush() to it
115 : : * during a checkpoint so we know the commit record actually is safe on
116 : : * disk.
117 : : */
118 : : XLogRecPtr local_lsn;
119 : :
120 : : /*
121 : : * PID of backend that's acquired slot, or 0 if none.
122 : : */
123 : : int acquired_by;
124 : :
125 : : /*
126 : : * Condition variable that's signaled when acquired_by changes.
127 : : */
128 : : ConditionVariable origin_cv;
129 : :
130 : : /*
131 : : * Lock protecting remote_lsn and local_lsn.
132 : : */
133 : : LWLock lock;
134 : : } ReplicationState;
135 : :
136 : : /*
137 : : * On disk version of ReplicationState.
138 : : */
139 : : typedef struct ReplicationStateOnDisk
140 : : {
141 : : RepOriginId roident;
142 : : XLogRecPtr remote_lsn;
143 : : } ReplicationStateOnDisk;
144 : :
145 : :
146 : : typedef struct ReplicationStateCtl
147 : : {
148 : : /* Tranche to use for per-origin LWLocks */
149 : : int tranche_id;
150 : : /* Array of length max_replication_slots */
151 : : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
152 : : } ReplicationStateCtl;
153 : :
154 : : /* external variables */
155 : : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
156 : : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
157 : : TimestampTz replorigin_session_origin_timestamp = 0;
158 : :
159 : : /*
160 : : * Base address into a shared memory array of replication states of size
161 : : * max_replication_slots.
162 : : *
163 : : * XXX: Should we use a separate variable to size this rather than
164 : : * max_replication_slots?
165 : : */
166 : : static ReplicationState *replication_states;
167 : :
168 : : /*
169 : : * Actual shared memory block (replication_states[] is now part of this).
170 : : */
171 : : static ReplicationStateCtl *replication_states_ctl;
172 : :
173 : : /*
174 : : * We keep a pointer to this backend's ReplicationState to avoid having to
175 : : * search the replication_states array in replorigin_session_advance for each
176 : : * remote commit. (Ownership of a backend's own entry can only be changed by
177 : : * that backend.)
178 : : */
179 : : static ReplicationState *session_replication_state = NULL;
180 : :
181 : : /* Magic for on disk files. */
182 : : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
183 : :
184 : : static void
3273 andres@anarazel.de 185 :CBC 42 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
186 : : {
187 [ + + - + ]: 42 : if (check_slots && max_replication_slots == 0)
3273 andres@anarazel.de 188 [ # # ]:UBC 0 : ereport(ERROR,
189 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 : : errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 : :
3273 andres@anarazel.de 192 [ + + - + ]:CBC 42 : if (!recoveryOK && RecoveryInProgress())
3273 andres@anarazel.de 193 [ # # ]:UBC 0 : ereport(ERROR,
194 : : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195 : : errmsg("cannot manipulate replication origins during recovery")));
3273 andres@anarazel.de 196 :CBC 42 : }
197 : :
198 : :
199 : : /*
200 : : * IsReservedOriginName
201 : : * True iff name is either "none" or "any".
202 : : */
203 : : static bool
633 akapila@postgresql.o 204 : 8 : IsReservedOriginName(const char *name)
205 : : {
206 [ + + + + ]: 15 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
207 : 7 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
208 : : }
209 : :
210 : : /* ---------------------------------------------------------------------------
211 : : * Functions for working with replication origins themselves.
212 : : * ---------------------------------------------------------------------------
213 : : */
214 : :
215 : : /*
216 : : * Check for a persistent replication origin identified by name.
217 : : *
218 : : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
219 : : */
220 : : RepOriginId
1039 peter@eisentraut.org 221 : 880 : replorigin_by_name(const char *roname, bool missing_ok)
222 : : {
223 : : Form_pg_replication_origin ident;
3249 bruce@momjian.us 224 : 880 : Oid roident = InvalidOid;
225 : : HeapTuple tuple;
226 : : Datum roname_d;
227 : :
3273 andres@anarazel.de 228 : 880 : roname_d = CStringGetTextDatum(roname);
229 : :
230 : 880 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
231 [ + + ]: 880 : if (HeapTupleIsValid(tuple))
232 : : {
233 : 545 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
234 : 545 : roident = ident->roident;
235 : 545 : ReleaseSysCache(tuple);
236 : : }
237 [ + + ]: 335 : else if (!missing_ok)
2383 rhaas@postgresql.org 238 [ + - ]: 4 : ereport(ERROR,
239 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
240 : : errmsg("replication origin \"%s\" does not exist",
241 : : roname)));
242 : :
3273 andres@anarazel.de 243 : 876 : return roident;
244 : : }
245 : :
246 : : /*
247 : : * Create a replication origin.
248 : : *
249 : : * Needs to be called in a transaction.
250 : : */
251 : : RepOriginId
1039 peter@eisentraut.org 252 : 317 : replorigin_create(const char *roname)
253 : : {
254 : : Oid roident;
3249 bruce@momjian.us 255 : 317 : HeapTuple tuple = NULL;
256 : : Relation rel;
257 : : Datum roname_d;
258 : : SnapshotData SnapshotDirty;
259 : : SysScanDesc scan;
260 : : ScanKeyData key;
261 : :
3273 andres@anarazel.de 262 : 317 : roname_d = CStringGetTextDatum(roname);
263 : :
264 [ - + ]: 317 : Assert(IsTransactionState());
265 : :
266 : : /*
267 : : * We need the numeric replication origin to be 16bit wide, so we cannot
268 : : * rely on the normal oid allocation. Instead we simply scan
269 : : * pg_replication_origin for the first unused id. That's not particularly
270 : : * efficient, but this should be a fairly infrequent operation - we can
271 : : * easily spend a bit more code on this when it turns out it needs to be
272 : : * faster.
273 : : *
274 : : * We handle concurrency by taking an exclusive lock (allowing reads!)
275 : : * over the table for the duration of the search. Because we use a "dirty
276 : : * snapshot" we can read rows that other in-progress sessions have
277 : : * written, even though they would be invisible with normal snapshots. Due
278 : : * to the exclusive lock there's no danger that new rows can appear while
279 : : * we're checking.
280 : : */
281 : 317 : InitDirtySnapshot(SnapshotDirty);
282 : :
1910 283 : 317 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
284 : :
3272 285 [ + - ]: 583 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
286 : : {
287 : : bool nulls[Natts_pg_replication_origin];
288 : : Datum values[Natts_pg_replication_origin];
289 : : bool collides;
290 : :
3273 291 [ - + ]: 583 : CHECK_FOR_INTERRUPTS();
292 : :
293 : 583 : ScanKeyInit(&key,
294 : : Anum_pg_replication_origin_roident,
295 : : BTEqualStrategyNumber, F_OIDEQ,
296 : : ObjectIdGetDatum(roident));
297 : :
298 : 583 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
299 : : true /* indexOK */ ,
300 : : &SnapshotDirty,
301 : : 1, &key);
302 : :
303 : 583 : collides = HeapTupleIsValid(systable_getnext(scan));
304 : :
305 : 583 : systable_endscan(scan);
306 : :
307 [ + + ]: 583 : if (!collides)
308 : : {
309 : : /*
310 : : * Ok, found an unused roident, insert the new row and do a CCI,
311 : : * so our callers can look it up if they want to.
312 : : */
313 : 317 : memset(&nulls, 0, sizeof(nulls));
314 : :
3249 bruce@momjian.us 315 : 317 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
3273 andres@anarazel.de 316 : 317 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
317 : :
318 : 317 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
2630 alvherre@alvh.no-ip. 319 : 317 : CatalogTupleInsert(rel, tuple);
3273 andres@anarazel.de 320 : 316 : CommandCounterIncrement();
321 : 316 : break;
322 : : }
323 : : }
324 : :
325 : : /* now release lock again, */
1910 326 : 316 : table_close(rel, ExclusiveLock);
327 : :
3273 328 [ - + ]: 316 : if (tuple == NULL)
3273 andres@anarazel.de 329 [ # # ]:UBC 0 : ereport(ERROR,
330 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
331 : : errmsg("could not find free replication origin ID")));
332 : :
3273 andres@anarazel.de 333 :CBC 316 : heap_freetuple(tuple);
334 : 316 : return roident;
335 : : }
336 : :
337 : : /*
338 : : * Helper function to drop a replication origin.
339 : : */
340 : : static void
436 akapila@postgresql.o 341 : 252 : replorigin_state_clear(RepOriginId roident, bool nowait)
342 : : {
343 : : int i;
344 : :
345 : : /*
346 : : * Clean up the slot state info, if there is any matching slot.
347 : : */
2441 alvherre@alvh.no-ip. 348 : 252 : restart:
3273 andres@anarazel.de 349 : 252 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
350 : :
351 [ + + ]: 831 : for (i = 0; i < max_replication_slots; i++)
352 : : {
353 : 795 : ReplicationState *state = &replication_states[i];
354 : :
355 [ + + ]: 795 : if (state->roident == roident)
356 : : {
357 : : /* found our slot, is it busy? */
358 [ - + ]: 216 : if (state->acquired_by != 0)
359 : : {
360 : : ConditionVariable *cv;
361 : :
2441 alvherre@alvh.no-ip. 362 [ # # ]:UBC 0 : if (nowait)
363 [ # # ]: 0 : ereport(ERROR,
364 : : (errcode(ERRCODE_OBJECT_IN_USE),
365 : : errmsg("could not drop replication origin with ID %d, in use by PID %d",
366 : : state->roident,
367 : : state->acquired_by)));
368 : :
369 : : /*
370 : : * We must wait and then retry. Since we don't know which CV
371 : : * to wait on until here, we can't readily use
372 : : * ConditionVariablePrepareToSleep (calling it here would be
373 : : * wrong, since we could miss the signal if we did so); just
374 : : * use ConditionVariableSleep directly.
375 : : */
376 : 0 : cv = &state->origin_cv;
377 : :
378 : 0 : LWLockRelease(ReplicationOriginLock);
379 : :
380 : 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
381 : 0 : goto restart;
382 : : }
383 : :
384 : : /* first make a WAL log entry */
385 : : {
386 : : xl_replorigin_drop xlrec;
387 : :
3273 andres@anarazel.de 388 :CBC 216 : xlrec.node_id = roident;
389 : 216 : XLogBeginInsert();
390 : 216 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
391 : 216 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
392 : : }
393 : :
394 : : /* then clear the in-memory slot */
395 : 216 : state->roident = InvalidRepOriginId;
396 : 216 : state->remote_lsn = InvalidXLogRecPtr;
397 : 216 : state->local_lsn = InvalidXLogRecPtr;
398 : 216 : break;
399 : : }
400 : : }
401 : 252 : LWLockRelease(ReplicationOriginLock);
2287 tgl@sss.pgh.pa.us 402 : 252 : ConditionVariableCancelSleep();
3273 andres@anarazel.de 403 : 252 : }
404 : :
405 : : /*
406 : : * Drop replication origin (by name).
407 : : *
408 : : * Needs to be called in a transaction.
409 : : */
410 : : void
1039 peter@eisentraut.org 411 : 415 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
412 : : {
413 : : RepOriginId roident;
414 : : Relation rel;
415 : : HeapTuple tuple;
416 : :
1159 akapila@postgresql.o 417 [ - + ]: 415 : Assert(IsTransactionState());
418 : :
436 419 : 415 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
420 : :
1159 421 : 415 : roident = replorigin_by_name(name, missing_ok);
422 : :
423 : : /* Lock the origin to prevent concurrent drops. */
436 424 : 414 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
425 : : AccessExclusiveLock);
426 : :
427 : 414 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
428 [ + + ]: 414 : if (!HeapTupleIsValid(tuple))
429 : : {
430 [ - + ]: 162 : if (!missing_ok)
436 akapila@postgresql.o 431 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
432 : : roident);
433 : :
434 : : /*
435 : : * We don't need to retain the locks if the origin is already dropped.
436 : : */
436 akapila@postgresql.o 437 :CBC 162 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
438 : : AccessExclusiveLock);
439 : 162 : table_close(rel, RowExclusiveLock);
440 : 162 : return;
441 : : }
442 : :
443 : 252 : replorigin_state_clear(roident, nowait);
444 : :
445 : : /*
446 : : * Now, we can delete the catalog entry.
447 : : */
448 : 252 : CatalogTupleDelete(rel, &tuple->t_self);
449 : 252 : ReleaseSysCache(tuple);
450 : :
451 : 252 : CommandCounterIncrement();
452 : :
453 : : /* We keep the lock on pg_replication_origin until commit */
1159 454 : 252 : table_close(rel, NoLock);
455 : : }
456 : :
457 : : /*
458 : : * Lookup replication origin via its oid and return the name.
459 : : *
460 : : * The external name is palloc'd in the calling context.
461 : : *
462 : : * Returns true if the origin is known, false otherwise.
463 : : */
464 : : bool
3273 andres@anarazel.de 465 : 14 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
466 : : {
467 : : HeapTuple tuple;
468 : : Form_pg_replication_origin ric;
469 : :
470 [ - + ]: 14 : Assert(OidIsValid((Oid) roident));
471 [ - + ]: 14 : Assert(roident != InvalidRepOriginId);
472 [ - + ]: 14 : Assert(roident != DoNotReplicateId);
473 : :
474 : 14 : tuple = SearchSysCache1(REPLORIGIDENT,
475 : : ObjectIdGetDatum((Oid) roident));
476 : :
477 [ + - ]: 14 : if (HeapTupleIsValid(tuple))
478 : : {
479 : 14 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
480 : 14 : *roname = text_to_cstring(&ric->roname);
481 : 14 : ReleaseSysCache(tuple);
482 : :
483 : 14 : return true;
484 : : }
485 : : else
486 : : {
3273 andres@anarazel.de 487 :UBC 0 : *roname = NULL;
488 : :
489 [ # # ]: 0 : if (!missing_ok)
2383 rhaas@postgresql.org 490 [ # # ]: 0 : ereport(ERROR,
491 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
492 : : errmsg("replication origin with ID %d does not exist",
493 : : roident)));
494 : :
3273 andres@anarazel.de 495 : 0 : return false;
496 : : }
497 : : }
498 : :
499 : :
500 : : /* ---------------------------------------------------------------------------
501 : : * Functions for handling replication progress.
502 : : * ---------------------------------------------------------------------------
503 : : */
504 : :
505 : : Size
3273 andres@anarazel.de 506 :CBC 3473 : ReplicationOriginShmemSize(void)
507 : : {
508 : 3473 : Size size = 0;
509 : :
510 : : /*
511 : : * XXX: max_replication_slots is arguably the wrong thing to use, as here
512 : : * we keep the replay state of *remote* transactions. But for now it seems
513 : : * sufficient to reuse it, rather than introduce a separate GUC.
514 : : */
515 [ + + ]: 3473 : if (max_replication_slots == 0)
3273 andres@anarazel.de 516 :GBC 2 : return size;
517 : :
3273 andres@anarazel.de 518 :CBC 3471 : size = add_size(size, offsetof(ReplicationStateCtl, states));
519 : :
520 : 3471 : size = add_size(size,
521 : : mul_size(max_replication_slots, sizeof(ReplicationState)));
522 : 3471 : return size;
523 : : }
524 : :
525 : : void
526 : 898 : ReplicationOriginShmemInit(void)
527 : : {
528 : : bool found;
529 : :
530 [ + + ]: 898 : if (max_replication_slots == 0)
3273 andres@anarazel.de 531 :GBC 1 : return;
532 : :
3273 andres@anarazel.de 533 :CBC 897 : replication_states_ctl = (ReplicationStateCtl *)
534 : 897 : ShmemInitStruct("ReplicationOriginState",
535 : : ReplicationOriginShmemSize(),
536 : : &found);
3249 bruce@momjian.us 537 : 897 : replication_states = replication_states_ctl->states;
538 : :
3273 andres@anarazel.de 539 [ + - ]: 897 : if (!found)
540 : : {
541 : : int i;
542 : :
1430 tgl@sss.pgh.pa.us 543 [ + - + - : 63268 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
+ - + - +
+ ]
544 : :
545 : 897 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
546 : :
3273 andres@anarazel.de 547 [ + + ]: 9679 : for (i = 0; i < max_replication_slots; i++)
548 : : {
549 : 8782 : LWLockInitialize(&replication_states[i].lock,
550 : 8782 : replication_states_ctl->tranche_id);
2441 alvherre@alvh.no-ip. 551 : 8782 : ConditionVariableInit(&replication_states[i].origin_cv);
552 : : }
553 : : }
554 : : }
555 : :
556 : : /* ---------------------------------------------------------------------------
557 : : * Perform a checkpoint of each replication origin's progress with respect to
558 : : * the replayed remote_lsn. Make sure that all transactions we refer to in the
559 : : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
560 : : * if the transactions were originally committed asynchronously.
561 : : *
562 : : * We store checkpoints in the following format:
563 : : * +-------+------------------------+------------------+-----+--------+
564 : : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
565 : : * +-------+------------------------+------------------+-----+--------+
566 : : *
567 : : * So its just the magic, followed by the statically sized
568 : : * ReplicationStateOnDisk structs. Note that the maximum number of
569 : : * ReplicationState is determined by max_replication_slots.
570 : : * ---------------------------------------------------------------------------
571 : : */
572 : : void
3273 andres@anarazel.de 573 : 1153 : CheckPointReplicationOrigin(void)
574 : : {
575 : 1153 : const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
576 : 1153 : const char *path = "pg_logical/replorigin_checkpoint";
577 : : int tmpfd;
578 : : int i;
579 : 1153 : uint32 magic = REPLICATION_STATE_MAGIC;
580 : : pg_crc32c crc;
581 : :
582 [ + + ]: 1153 : if (max_replication_slots == 0)
3273 andres@anarazel.de 583 :GBC 1 : return;
584 : :
3273 andres@anarazel.de 585 :CBC 1152 : INIT_CRC32C(crc);
586 : :
587 : : /* make sure no old temp file is remaining */
588 [ + - - + ]: 1152 : if (unlink(tmppath) < 0 && errno != ENOENT)
3273 andres@anarazel.de 589 [ # # ]:UBC 0 : ereport(PANIC,
590 : : (errcode_for_file_access(),
591 : : errmsg("could not remove file \"%s\": %m",
592 : : tmppath)));
593 : :
594 : : /*
595 : : * no other backend can perform this at the same time; only one checkpoint
596 : : * can happen at a time.
597 : : */
2395 peter_e@gmx.net 598 :CBC 1152 : tmpfd = OpenTransientFile(tmppath,
599 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3273 andres@anarazel.de 600 [ - + ]: 1152 : if (tmpfd < 0)
3273 andres@anarazel.de 601 [ # # ]:UBC 0 : ereport(PANIC,
602 : : (errcode_for_file_access(),
603 : : errmsg("could not create file \"%s\": %m",
604 : : tmppath)));
605 : :
606 : : /* write magic */
2079 michael@paquier.xyz 607 :CBC 1152 : errno = 0;
3273 andres@anarazel.de 608 [ - + ]: 1152 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
609 : : {
610 : : /* if write didn't set errno, assume problem is no disk space */
1824 michael@paquier.xyz 611 [ # # ]:UBC 0 : if (errno == 0)
612 : 0 : errno = ENOSPC;
3273 andres@anarazel.de 613 [ # # ]: 0 : ereport(PANIC,
614 : : (errcode_for_file_access(),
615 : : errmsg("could not write to file \"%s\": %m",
616 : : tmppath)));
617 : : }
3273 andres@anarazel.de 618 :CBC 1152 : COMP_CRC32C(crc, &magic, sizeof(magic));
619 : :
620 : : /* prevent concurrent creations/drops */
621 : 1152 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
622 : :
623 : : /* write actual data */
624 [ + + ]: 12377 : for (i = 0; i < max_replication_slots; i++)
625 : : {
626 : : ReplicationStateOnDisk disk_state;
627 : 11225 : ReplicationState *curstate = &replication_states[i];
628 : : XLogRecPtr local_lsn;
629 : :
630 [ + + ]: 11225 : if (curstate->roident == InvalidRepOriginId)
631 : 11182 : continue;
632 : :
633 : : /* zero, to avoid uninitialized padding bytes */
2548 634 : 43 : memset(&disk_state, 0, sizeof(disk_state));
635 : :
3273 636 : 43 : LWLockAcquire(&curstate->lock, LW_SHARED);
637 : :
638 : 43 : disk_state.roident = curstate->roident;
639 : :
640 : 43 : disk_state.remote_lsn = curstate->remote_lsn;
641 : 43 : local_lsn = curstate->local_lsn;
642 : :
643 : 43 : LWLockRelease(&curstate->lock);
644 : :
645 : : /* make sure we only write out a commit that's persistent */
646 : 43 : XLogFlush(local_lsn);
647 : :
2079 michael@paquier.xyz 648 : 43 : errno = 0;
3273 andres@anarazel.de 649 [ - + ]: 43 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
650 : : sizeof(disk_state))
651 : : {
652 : : /* if write didn't set errno, assume problem is no disk space */
1824 michael@paquier.xyz 653 [ # # ]:UBC 0 : if (errno == 0)
654 : 0 : errno = ENOSPC;
3273 andres@anarazel.de 655 [ # # ]: 0 : ereport(PANIC,
656 : : (errcode_for_file_access(),
657 : : errmsg("could not write to file \"%s\": %m",
658 : : tmppath)));
659 : : }
660 : :
3273 andres@anarazel.de 661 :CBC 43 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
662 : : }
663 : :
664 : 1152 : LWLockRelease(ReplicationOriginLock);
665 : :
666 : : /* write out the CRC */
667 : 1152 : FIN_CRC32C(crc);
2079 michael@paquier.xyz 668 : 1152 : errno = 0;
3273 andres@anarazel.de 669 [ - + ]: 1152 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
670 : : {
671 : : /* if write didn't set errno, assume problem is no disk space */
1824 michael@paquier.xyz 672 [ # # ]:UBC 0 : if (errno == 0)
673 : 0 : errno = ENOSPC;
3273 andres@anarazel.de 674 [ # # ]: 0 : ereport(PANIC,
675 : : (errcode_for_file_access(),
676 : : errmsg("could not write to file \"%s\": %m",
677 : : tmppath)));
678 : : }
679 : :
1744 peter@eisentraut.org 680 [ - + ]:CBC 1152 : if (CloseTransientFile(tmpfd) != 0)
1863 michael@paquier.xyz 681 [ # # ]:UBC 0 : ereport(PANIC,
682 : : (errcode_for_file_access(),
683 : : errmsg("could not close file \"%s\": %m",
684 : : tmppath)));
685 : :
686 : : /* fsync, rename to permanent file, fsync file and directory */
2958 andres@anarazel.de 687 :CBC 1152 : durable_rename(tmppath, path, PANIC);
688 : : }
689 : :
690 : : /*
691 : : * Recover replication replay status from checkpoint data saved earlier by
692 : : * CheckPointReplicationOrigin.
693 : : *
694 : : * This only needs to be called at startup and *not* during every checkpoint
695 : : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
696 : : * state thereafter can be recovered by looking at commit records.
697 : : */
698 : : void
3273 699 : 823 : StartupReplicationOrigin(void)
700 : : {
701 : 823 : const char *path = "pg_logical/replorigin_checkpoint";
702 : : int fd;
703 : : int readBytes;
3249 bruce@momjian.us 704 : 823 : uint32 magic = REPLICATION_STATE_MAGIC;
705 : 823 : int last_state = 0;
706 : : pg_crc32c file_crc;
707 : : pg_crc32c crc;
708 : :
709 : : /* don't want to overwrite already existing state */
710 : : #ifdef USE_ASSERT_CHECKING
711 : : static bool already_started = false;
712 : :
3273 andres@anarazel.de 713 [ - + ]: 823 : Assert(!already_started);
714 : 823 : already_started = true;
715 : : #endif
716 : :
717 [ + + ]: 823 : if (max_replication_slots == 0)
3273 andres@anarazel.de 718 :GBC 40 : return;
719 : :
3273 andres@anarazel.de 720 :CBC 822 : INIT_CRC32C(crc);
721 : :
722 [ + + ]: 822 : elog(DEBUG2, "starting up replication origin progress state");
723 : :
2395 peter_e@gmx.net 724 : 822 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
725 : :
726 : : /*
727 : : * might have had max_replication_slots == 0 last run, or we just brought
728 : : * up a standby.
729 : : */
3273 andres@anarazel.de 730 [ + + + - ]: 822 : if (fd < 0 && errno == ENOENT)
731 : 39 : return;
732 [ - + ]: 783 : else if (fd < 0)
3273 andres@anarazel.de 733 [ # # ]:UBC 0 : ereport(PANIC,
734 : : (errcode_for_file_access(),
735 : : errmsg("could not open file \"%s\": %m",
736 : : path)));
737 : :
738 : : /* verify magic, that is written even if nothing was active */
3273 andres@anarazel.de 739 :CBC 783 : readBytes = read(fd, &magic, sizeof(magic));
740 [ - + ]: 783 : if (readBytes != sizeof(magic))
741 : : {
2097 michael@paquier.xyz 742 [ # # ]:UBC 0 : if (readBytes < 0)
743 [ # # ]: 0 : ereport(PANIC,
744 : : (errcode_for_file_access(),
745 : : errmsg("could not read file \"%s\": %m",
746 : : path)));
747 : : else
748 [ # # ]: 0 : ereport(PANIC,
749 : : (errcode(ERRCODE_DATA_CORRUPTED),
750 : : errmsg("could not read file \"%s\": read %d of %zu",
751 : : path, readBytes, sizeof(magic))));
752 : : }
3273 andres@anarazel.de 753 :CBC 783 : COMP_CRC32C(crc, &magic, sizeof(magic));
754 : :
755 [ - + ]: 783 : if (magic != REPLICATION_STATE_MAGIC)
3273 andres@anarazel.de 756 [ # # ]:UBC 0 : ereport(PANIC,
757 : : (errmsg("replication checkpoint has wrong magic %u instead of %u",
758 : : magic, REPLICATION_STATE_MAGIC)));
759 : :
760 : : /* we can skip locking here, no other access is possible */
761 : :
762 : : /* recover individual states, until there are no more to be found */
763 : : while (true)
3273 andres@anarazel.de 764 :CBC 13 : {
765 : : ReplicationStateOnDisk disk_state;
766 : :
767 : 796 : readBytes = read(fd, &disk_state, sizeof(disk_state));
768 : :
769 : : /* no further data */
770 [ + + ]: 796 : if (readBytes == sizeof(crc))
771 : : {
772 : : /* not pretty, but simple ... */
3249 bruce@momjian.us 773 : 783 : file_crc = *(pg_crc32c *) &disk_state;
3273 andres@anarazel.de 774 : 783 : break;
775 : : }
776 : :
777 [ - + ]: 13 : if (readBytes < 0)
778 : : {
3273 andres@anarazel.de 779 [ # # ]:UBC 0 : ereport(PANIC,
780 : : (errcode_for_file_access(),
781 : : errmsg("could not read file \"%s\": %m",
782 : : path)));
783 : : }
784 : :
3273 andres@anarazel.de 785 [ - + ]:CBC 13 : if (readBytes != sizeof(disk_state))
786 : : {
3273 andres@anarazel.de 787 [ # # ]:UBC 0 : ereport(PANIC,
788 : : (errcode_for_file_access(),
789 : : errmsg("could not read file \"%s\": read %d of %zu",
790 : : path, readBytes, sizeof(disk_state))));
791 : : }
792 : :
3273 andres@anarazel.de 793 :CBC 13 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
794 : :
795 [ - + ]: 13 : if (last_state == max_replication_slots)
3273 andres@anarazel.de 796 [ # # ]:UBC 0 : ereport(PANIC,
797 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798 : : errmsg("could not find free replication state, increase max_replication_slots")));
799 : :
800 : : /* copy data to shared memory */
3273 andres@anarazel.de 801 :CBC 13 : replication_states[last_state].roident = disk_state.roident;
802 : 13 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
803 : 13 : last_state++;
804 : :
1227 peter@eisentraut.org 805 [ + - ]: 13 : ereport(LOG,
806 : : (errmsg("recovered replication state of node %d to %X/%X",
807 : : disk_state.roident,
808 : : LSN_FORMAT_ARGS(disk_state.remote_lsn))));
809 : : }
810 : :
811 : : /* now check checksum */
3273 andres@anarazel.de 812 : 783 : FIN_CRC32C(crc);
813 [ - + ]: 783 : if (file_crc != crc)
3273 andres@anarazel.de 814 [ # # ]:UBC 0 : ereport(PANIC,
815 : : (errcode(ERRCODE_DATA_CORRUPTED),
816 : : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
817 : : crc, file_crc)));
818 : :
1744 peter@eisentraut.org 819 [ - + ]:CBC 783 : if (CloseTransientFile(fd) != 0)
1863 michael@paquier.xyz 820 [ # # ]:UBC 0 : ereport(PANIC,
821 : : (errcode_for_file_access(),
822 : : errmsg("could not close file \"%s\": %m",
823 : : path)));
824 : : }
825 : :
826 : : void
3273 andres@anarazel.de 827 :CBC 6 : replorigin_redo(XLogReaderState *record)
828 : : {
829 : 6 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
830 : :
831 [ + + - ]: 6 : switch (info)
832 : : {
833 : 3 : case XLOG_REPLORIGIN_SET:
834 : : {
835 : 3 : xl_replorigin_set *xlrec =
331 tgl@sss.pgh.pa.us 836 : 3 : (xl_replorigin_set *) XLogRecGetData(record);
837 : :
3273 andres@anarazel.de 838 : 3 : replorigin_advance(xlrec->node_id,
839 : : xlrec->remote_lsn, record->EndRecPtr,
3249 bruce@momjian.us 840 : 3 : xlrec->force /* backward */ ,
841 : : false /* WAL log */ );
3273 andres@anarazel.de 842 : 3 : break;
843 : : }
844 : 3 : case XLOG_REPLORIGIN_DROP:
845 : : {
846 : : xl_replorigin_drop *xlrec;
847 : : int i;
848 : :
849 : 3 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
850 : :
851 [ + - ]: 4 : for (i = 0; i < max_replication_slots; i++)
852 : : {
853 : 4 : ReplicationState *state = &replication_states[i];
854 : :
855 : : /* found our slot */
856 [ + + ]: 4 : if (state->roident == xlrec->node_id)
857 : : {
858 : : /* reset entry */
859 : 3 : state->roident = InvalidRepOriginId;
860 : 3 : state->remote_lsn = InvalidXLogRecPtr;
861 : 3 : state->local_lsn = InvalidXLogRecPtr;
862 : 3 : break;
863 : : }
864 : : }
865 : 3 : break;
866 : : }
3273 andres@anarazel.de 867 :UBC 0 : default:
868 [ # # ]: 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
869 : : }
3273 andres@anarazel.de 870 :CBC 6 : }
871 : :
872 : :
873 : : /*
874 : : * Tell the replication origin progress machinery that a commit from 'node'
875 : : * that originated at the LSN remote_commit on the remote node was replayed
876 : : * successfully and that we don't need to do so again. In combination with
877 : : * setting up replorigin_session_origin_lsn and replorigin_session_origin
878 : : * that ensures we won't lose knowledge about that after a crash if the
879 : : * transaction had a persistent effect (think of asynchronous commits).
880 : : *
881 : : * local_commit needs to be a local LSN of the commit so that we can make sure
882 : : * upon a checkpoint that enough WAL has been persisted to disk.
883 : : *
884 : : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
885 : : * unless running in recovery.
886 : : */
887 : : void
888 : 217 : replorigin_advance(RepOriginId node,
889 : : XLogRecPtr remote_commit, XLogRecPtr local_commit,
890 : : bool go_backward, bool wal_log)
891 : : {
892 : : int i;
893 : 217 : ReplicationState *replication_state = NULL;
894 : 217 : ReplicationState *free_state = NULL;
895 : :
896 [ - + ]: 217 : Assert(node != InvalidRepOriginId);
897 : :
898 : : /* we don't track DoNotReplicateId */
899 [ - + ]: 217 : if (node == DoNotReplicateId)
3273 andres@anarazel.de 900 :UBC 0 : return;
901 : :
902 : : /*
903 : : * XXX: For the case where this is called by WAL replay, it'd be more
904 : : * efficient to restore into a backend local hashtable and only dump into
905 : : * shmem after recovery is finished. Let's wait with implementing that
906 : : * till it's shown to be a measurable expense
907 : : */
908 : :
909 : : /* Lock exclusively, as we may have to create a new table entry. */
3273 andres@anarazel.de 910 :CBC 217 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
911 : :
912 : : /*
913 : : * Search for either an existing slot for the origin, or a free one we can
914 : : * use.
915 : : */
916 [ + + ]: 1927 : for (i = 0; i < max_replication_slots; i++)
917 : : {
918 : 1757 : ReplicationState *curstate = &replication_states[i];
919 : :
920 : : /* remember where to insert if necessary */
921 [ + + + + ]: 1757 : if (curstate->roident == InvalidRepOriginId &&
922 : : free_state == NULL)
923 : : {
924 : 171 : free_state = curstate;
925 : 171 : continue;
926 : : }
927 : :
928 : : /* not our slot */
929 [ + + ]: 1586 : if (curstate->roident != node)
930 : : {
931 : 1539 : continue;
932 : : }
933 : :
934 : : /* ok, found slot */
935 : 47 : replication_state = curstate;
936 : :
937 : 47 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
938 : :
939 : : /* Make sure it's not used by somebody else */
940 [ - + ]: 47 : if (replication_state->acquired_by != 0)
941 : : {
3273 andres@anarazel.de 942 [ # # ]:UBC 0 : ereport(ERROR,
943 : : (errcode(ERRCODE_OBJECT_IN_USE),
944 : : errmsg("replication origin with ID %d is already active for PID %d",
945 : : replication_state->roident,
946 : : replication_state->acquired_by)));
947 : : }
948 : :
3273 andres@anarazel.de 949 :CBC 47 : break;
950 : : }
951 : :
952 [ + + - + ]: 217 : if (replication_state == NULL && free_state == NULL)
3273 andres@anarazel.de 953 [ # # ]:UBC 0 : ereport(ERROR,
954 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955 : : errmsg("could not find free replication state slot for replication origin with ID %d",
956 : : node),
957 : : errhint("Increase max_replication_slots and try again.")));
958 : :
3273 andres@anarazel.de 959 [ + + ]:CBC 217 : if (replication_state == NULL)
960 : : {
961 : : /* initialize new slot */
962 : 170 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
963 : 170 : replication_state = free_state;
964 [ - + ]: 170 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
965 [ - + ]: 170 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
966 : 170 : replication_state->roident = node;
967 : : }
968 : :
969 [ - + ]: 217 : Assert(replication_state->roident != InvalidRepOriginId);
970 : :
971 : : /*
972 : : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
973 : : * and the standby gets the message. Primarily this will be called during
974 : : * WAL replay (of commit records) where no WAL logging is necessary.
975 : : */
976 [ + + ]: 217 : if (wal_log)
977 : : {
978 : : xl_replorigin_set xlrec;
979 : :
980 : 171 : xlrec.remote_lsn = remote_commit;
981 : 171 : xlrec.node_id = node;
982 : 171 : xlrec.force = go_backward;
983 : :
984 : 171 : XLogBeginInsert();
985 : 171 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
986 : :
987 : 171 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
988 : : }
989 : :
990 : : /*
991 : : * Due to - harmless - race conditions during a checkpoint we could see
992 : : * values here that are older than the ones we already have in memory. We
993 : : * could also see older values for prepared transactions when the prepare
994 : : * is sent at a later point of time along with commit prepared and there
995 : : * are other transactions commits between prepare and commit prepared. See
996 : : * ReorderBufferFinishPrepared. Don't overwrite those.
997 : : */
998 [ + + + + ]: 217 : if (go_backward || replication_state->remote_lsn < remote_commit)
999 : 206 : replication_state->remote_lsn = remote_commit;
1000 [ + + + + ]: 217 : if (local_commit != InvalidXLogRecPtr &&
1001 [ + - ]: 42 : (go_backward || replication_state->local_lsn < local_commit))
1002 : 45 : replication_state->local_lsn = local_commit;
1003 : 217 : LWLockRelease(&replication_state->lock);
1004 : :
1005 : : /*
1006 : : * Release *after* changing the LSNs, slot isn't acquired and thus could
1007 : : * otherwise be dropped anytime.
1008 : : */
1009 : 217 : LWLockRelease(ReplicationOriginLock);
1010 : : }
1011 : :
1012 : :
1013 : : XLogRecPtr
1014 : 8 : replorigin_get_progress(RepOriginId node, bool flush)
1015 : : {
1016 : : int i;
1017 : 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1018 : 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1019 : :
1020 : : /* prevent slots from being concurrently dropped */
1021 : 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1022 : :
1023 [ + + ]: 38 : for (i = 0; i < max_replication_slots; i++)
1024 : : {
1025 : : ReplicationState *state;
1026 : :
1027 : 35 : state = &replication_states[i];
1028 : :
1029 [ + + ]: 35 : if (state->roident == node)
1030 : : {
1031 : 5 : LWLockAcquire(&state->lock, LW_SHARED);
1032 : :
1033 : 5 : remote_lsn = state->remote_lsn;
1034 : 5 : local_lsn = state->local_lsn;
1035 : :
1036 : 5 : LWLockRelease(&state->lock);
1037 : :
1038 : 5 : break;
1039 : : }
1040 : : }
1041 : :
1042 : 8 : LWLockRelease(ReplicationOriginLock);
1043 : :
1044 [ + + + - ]: 8 : if (flush && local_lsn != InvalidXLogRecPtr)
1045 : 1 : XLogFlush(local_lsn);
1046 : :
1047 : 8 : return remote_lsn;
1048 : : }
1049 : :
1050 : : /*
1051 : : * Tear down a (possibly) configured session replication origin during process
1052 : : * exit.
1053 : : */
1054 : : static void
1055 : 440 : ReplicationOriginExitCleanup(int code, Datum arg)
1056 : : {
2435 tgl@sss.pgh.pa.us 1057 : 440 : ConditionVariable *cv = NULL;
1058 : :
90 alvherre@alvh.no-ip. 1059 [ + + ]:GNC 440 : if (session_replication_state == NULL)
1060 : 162 : return;
1061 : :
3273 andres@anarazel.de 1062 :CBC 278 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1063 : :
90 alvherre@alvh.no-ip. 1064 [ + + ]:GNC 278 : if (session_replication_state->acquired_by == MyProcPid)
1065 : : {
2441 alvherre@alvh.no-ip. 1066 :CBC 268 : cv = &session_replication_state->origin_cv;
1067 : :
3273 andres@anarazel.de 1068 : 268 : session_replication_state->acquired_by = 0;
1069 : 268 : session_replication_state = NULL;
1070 : : }
1071 : :
1072 : 278 : LWLockRelease(ReplicationOriginLock);
1073 : :
2441 alvherre@alvh.no-ip. 1074 [ + + ]: 278 : if (cv)
1075 : 268 : ConditionVariableBroadcast(cv);
3273 andres@anarazel.de 1076 :ECB (330) : }
1077 : :
1078 : : /*
1079 : : * Setup a replication origin in the shared memory struct if it doesn't
1080 : : * already exist and cache access to the specific ReplicationSlot so the
1081 : : * array doesn't have to be searched when calling
1082 : : * replorigin_session_advance().
1083 : : *
1084 : : * Normally only one such cached origin can exist per process so the cached
1085 : : * value can only be set again after the previous value is torn down with
1086 : : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1087 : : * (meaning the slot is not allowed to be already acquired by another process).
1088 : : *
1089 : : * However, sometimes multiple processes can safely re-use the same origin slot
1090 : : * (for example, multiple parallel apply processes can safely use the same
1091 : : * origin, provided they maintain commit order by allowing only one process to
1092 : : * commit at a time). For this case the first process must pass acquired_by =
1093 : : * 0, and then the other processes sharing that same origin can pass
1094 : : * acquired_by = PID of the first process.
1095 : : */
1096 : : void
461 akapila@postgresql.o 1097 :CBC 451 : replorigin_session_setup(RepOriginId node, int acquired_by)
1098 : : {
1099 : : static bool registered_cleanup;
1100 : : int i;
3249 bruce@momjian.us 1101 : 451 : int free_slot = -1;
1102 : :
3273 andres@anarazel.de 1103 [ + + ]: 451 : if (!registered_cleanup)
1104 : : {
1105 : 448 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1106 : 448 : registered_cleanup = true;
1107 : : }
1108 : :
1109 [ - + ]: 451 : Assert(max_replication_slots > 0);
1110 : :
1111 [ + + ]: 451 : if (session_replication_state != NULL)
1112 [ + - ]: 1 : ereport(ERROR,
1113 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1114 : : errmsg("cannot setup replication origin when one is already setup")));
1115 : :
1116 : : /* Lock exclusively, as we may have to create a new table entry. */
1117 : 450 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1118 : :
1119 : : /*
1120 : : * Search for either an existing slot for the origin, or a free one we can
1121 : : * use.
1122 : : */
1123 [ + + ]: 1705 : for (i = 0; i < max_replication_slots; i++)
1124 : : {
1125 : 1606 : ReplicationState *curstate = &replication_states[i];
1126 : :
1127 : : /* remember where to insert if necessary */
1128 [ + + + + ]: 1606 : if (curstate->roident == InvalidRepOriginId &&
1129 : : free_slot == -1)
1130 : : {
1131 : 99 : free_slot = i;
1132 : 99 : continue;
1133 : : }
1134 : :
1135 : : /* not our slot */
1136 [ + + ]: 1507 : if (curstate->roident != node)
1137 : 1156 : continue;
1138 : :
461 akapila@postgresql.o 1139 [ + + - + ]: 351 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1140 : : {
3273 andres@anarazel.de 1141 [ # # ]:UBC 0 : ereport(ERROR,
1142 : : (errcode(ERRCODE_OBJECT_IN_USE),
1143 : : errmsg("replication origin with ID %d is already active for PID %d",
1144 : : curstate->roident, curstate->acquired_by)));
1145 : : }
1146 : :
1147 : : /* ok, found slot */
3273 andres@anarazel.de 1148 :CBC 351 : session_replication_state = curstate;
144 akapila@postgresql.o 1149 :GNC 351 : break;
1150 : : }
1151 : :
1152 : :
3273 andres@anarazel.de 1153 [ + + - + ]:CBC 450 : if (session_replication_state == NULL && free_slot == -1)
3273 andres@anarazel.de 1154 [ # # ]:UBC 0 : ereport(ERROR,
1155 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1156 : : errmsg("could not find free replication state slot for replication origin with ID %d",
1157 : : node),
1158 : : errhint("Increase max_replication_slots and try again.")));
3273 andres@anarazel.de 1159 [ + + ]:CBC 450 : else if (session_replication_state == NULL)
1160 : : {
1161 : : /* initialize new slot */
1162 : 99 : session_replication_state = &replication_states[free_slot];
1163 [ - + ]: 99 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1164 [ - + ]: 99 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1165 : 99 : session_replication_state->roident = node;
1166 : : }
1167 : :
1168 : :
1169 [ - + ]: 450 : Assert(session_replication_state->roident != InvalidRepOriginId);
1170 : :
461 akapila@postgresql.o 1171 [ + + ]: 450 : if (acquired_by == 0)
1172 : 439 : session_replication_state->acquired_by = MyProcPid;
1173 [ - + ]: 11 : else if (session_replication_state->acquired_by != acquired_by)
461 akapila@postgresql.o 1174 [ # # ]:UBC 0 : elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175 : : node, acquired_by);
1176 : :
3273 andres@anarazel.de 1177 :CBC 450 : LWLockRelease(ReplicationOriginLock);
1178 : :
1179 : : /* probably this one is pointless */
2441 alvherre@alvh.no-ip. 1180 : 450 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
3273 andres@anarazel.de 1181 : 450 : }
1182 : :
1183 : : /*
1184 : : * Reset replay state previously setup in this session.
1185 : : *
1186 : : * This function may only be called if an origin was setup with
1187 : : * replorigin_session_setup().
1188 : : */
1189 : : void
1190 : 165 : replorigin_session_reset(void)
1191 : : {
1192 : : ConditionVariable *cv;
1193 : :
1194 [ - + ]: 165 : Assert(max_replication_slots != 0);
1195 : :
1196 [ + + ]: 165 : if (session_replication_state == NULL)
1197 [ + - ]: 1 : ereport(ERROR,
1198 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1199 : : errmsg("no replication origin is configured")));
1200 : :
1201 : 164 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1202 : :
1203 : 164 : session_replication_state->acquired_by = 0;
2441 alvherre@alvh.no-ip. 1204 : 164 : cv = &session_replication_state->origin_cv;
3273 andres@anarazel.de 1205 : 164 : session_replication_state = NULL;
1206 : :
1207 : 164 : LWLockRelease(ReplicationOriginLock);
1208 : :
2441 alvherre@alvh.no-ip. 1209 : 164 : ConditionVariableBroadcast(cv);
3273 andres@anarazel.de 1210 : 164 : }
1211 : :
1212 : : /*
1213 : : * Do the same work replorigin_advance() does, just on the session's
1214 : : * configured origin.
1215 : : *
1216 : : * This is noticeably cheaper than using replorigin_advance().
1217 : : */
1218 : : void
1219 : 998 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1220 : : {
1221 [ - + ]: 998 : Assert(session_replication_state != NULL);
1222 [ - + ]: 998 : Assert(session_replication_state->roident != InvalidRepOriginId);
1223 : :
1224 : 998 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1225 [ + - ]: 998 : if (session_replication_state->local_lsn < local_commit)
1226 : 998 : session_replication_state->local_lsn = local_commit;
1227 [ + + ]: 998 : if (session_replication_state->remote_lsn < remote_commit)
1228 : 487 : session_replication_state->remote_lsn = remote_commit;
1229 : 998 : LWLockRelease(&session_replication_state->lock);
1230 : 998 : }
1231 : :
1232 : : /*
1233 : : * Ask the machinery about the point up to which we successfully replayed
1234 : : * changes from an already setup replication origin.
1235 : : */
1236 : : XLogRecPtr
1237 : 268 : replorigin_session_get_progress(bool flush)
1238 : : {
1239 : : XLogRecPtr remote_lsn;
1240 : : XLogRecPtr local_lsn;
1241 : :
1242 [ - + ]: 268 : Assert(session_replication_state != NULL);
1243 : :
1244 : 268 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1245 : 268 : remote_lsn = session_replication_state->remote_lsn;
1246 : 268 : local_lsn = session_replication_state->local_lsn;
1247 : 268 : LWLockRelease(&session_replication_state->lock);
1248 : :
1249 [ + + + - ]: 268 : if (flush && local_lsn != InvalidXLogRecPtr)
1250 : 1 : XLogFlush(local_lsn);
1251 : :
1252 : 268 : return remote_lsn;
1253 : : }
1254 : :
1255 : :
1256 : :
1257 : : /* ---------------------------------------------------------------------------
1258 : : * SQL functions for working with replication origin.
1259 : : *
1260 : : * These mostly should be fairly short wrappers around more generic functions.
1261 : : * ---------------------------------------------------------------------------
1262 : : */
1263 : :
1264 : : /*
1265 : : * Create replication origin for the passed in name, and return the assigned
1266 : : * oid.
1267 : : */
1268 : : Datum
1269 : 9 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1270 : : {
1271 : : char *name;
1272 : : RepOriginId roident;
1273 : :
1274 : 9 : replorigin_check_prerequisites(false, false);
1275 : :
1276 : 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1277 : :
1278 : : /*
1279 : : * Replication origins "any and "none" are reserved for system options.
1280 : : * The origins "pg_xxx" are reserved for internal use.
1281 : : */
633 akapila@postgresql.o 1282 [ + + + + ]: 9 : if (IsReservedName(name) || IsReservedOriginName(name))
1751 tgl@sss.pgh.pa.us 1283 [ + - ]: 3 : ereport(ERROR,
1284 : : (errcode(ERRCODE_RESERVED_NAME),
1285 : : errmsg("replication origin name \"%s\" is reserved",
1286 : : name),
1287 : : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1288 : : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1289 : :
1290 : : /*
1291 : : * If built with appropriate switch, whine when regression-testing
1292 : : * conventions for replication origin names are violated.
1293 : : */
1294 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1295 : : if (strncmp(name, "regress_", 8) != 0)
1296 : : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1297 : : #endif
1298 : :
3273 andres@anarazel.de 1299 : 6 : roident = replorigin_create(name);
1300 : :
1301 : 5 : pfree(name);
1302 : :
1303 : 5 : PG_RETURN_OID(roident);
1304 : : }
1305 : :
1306 : : /*
1307 : : * Drop replication origin.
1308 : : */
1309 : : Datum
1310 : 7 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1311 : : {
1312 : : char *name;
1313 : :
1314 : 7 : replorigin_check_prerequisites(false, false);
1315 : :
1316 : 7 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1317 : :
1159 akapila@postgresql.o 1318 : 7 : replorigin_drop_by_name(name, false, true);
1319 : :
3273 andres@anarazel.de 1320 : 6 : pfree(name);
1321 : :
1322 : 6 : PG_RETURN_VOID();
1323 : : }
1324 : :
1325 : : /*
1326 : : * Return oid of a replication origin.
1327 : : */
1328 : : Datum
3273 andres@anarazel.de 1329 :UBC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1330 : : {
1331 : : char *name;
1332 : : RepOriginId roident;
1333 : :
1334 : 0 : replorigin_check_prerequisites(false, false);
1335 : :
1336 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1337 : 0 : roident = replorigin_by_name(name, true);
1338 : :
1339 : 0 : pfree(name);
1340 : :
1341 [ # # ]: 0 : if (OidIsValid(roident))
1342 : 0 : PG_RETURN_OID(roident);
1343 : 0 : PG_RETURN_NULL();
1344 : : }
1345 : :
1346 : : /*
1347 : : * Setup a replication origin for this session.
1348 : : */
1349 : : Datum
3273 andres@anarazel.de 1350 :CBC 6 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1351 : : {
1352 : : char *name;
1353 : : RepOriginId origin;
1354 : :
1355 : 6 : replorigin_check_prerequisites(true, false);
1356 : :
1357 : 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1358 : 6 : origin = replorigin_by_name(name, false);
461 akapila@postgresql.o 1359 : 5 : replorigin_session_setup(origin, 0);
1360 : :
3121 alvherre@alvh.no-ip. 1361 : 4 : replorigin_session_origin = origin;
1362 : :
3273 andres@anarazel.de 1363 : 4 : pfree(name);
1364 : :
1365 : 4 : PG_RETURN_VOID();
1366 : : }
1367 : :
1368 : : /*
1369 : : * Reset previously setup origin in this session
1370 : : */
1371 : : Datum
1372 : 5 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1373 : : {
1374 : 5 : replorigin_check_prerequisites(true, false);
1375 : :
1376 : 5 : replorigin_session_reset();
1377 : :
3121 alvherre@alvh.no-ip. 1378 : 4 : replorigin_session_origin = InvalidRepOriginId;
1379 : 4 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1380 : 4 : replorigin_session_origin_timestamp = 0;
1381 : :
3273 andres@anarazel.de 1382 : 4 : PG_RETURN_VOID();
1383 : : }
1384 : :
1385 : : /*
1386 : : * Has a replication origin been setup for this session.
1387 : : */
1388 : : Datum
3273 andres@anarazel.de 1389 :UBC 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1390 : : {
1391 : 0 : replorigin_check_prerequisites(false, false);
1392 : :
3121 alvherre@alvh.no-ip. 1393 : 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1394 : : }
1395 : :
1396 : :
1397 : : /*
1398 : : * Return the replication progress for origin setup in the current session.
1399 : : *
1400 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1401 : : * to a local transaction that has been flushed. This is useful if asynchronous
1402 : : * commits are used when replaying replicated transactions.
1403 : : */
1404 : : Datum
3273 andres@anarazel.de 1405 :CBC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1406 : : {
1407 : 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1408 : 2 : bool flush = PG_GETARG_BOOL(0);
1409 : :
1410 : 2 : replorigin_check_prerequisites(true, false);
1411 : :
1412 [ - + ]: 2 : if (session_replication_state == NULL)
3273 andres@anarazel.de 1413 [ # # ]:UBC 0 : ereport(ERROR,
1414 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1415 : : errmsg("no replication origin is configured")));
1416 : :
3273 andres@anarazel.de 1417 :CBC 2 : remote_lsn = replorigin_session_get_progress(flush);
1418 : :
1419 [ - + ]: 2 : if (remote_lsn == InvalidXLogRecPtr)
3273 andres@anarazel.de 1420 :UBC 0 : PG_RETURN_NULL();
1421 : :
3273 andres@anarazel.de 1422 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1423 : : }
1424 : :
1425 : : Datum
1426 : 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1427 : : {
1428 : 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1429 : :
1430 : 1 : replorigin_check_prerequisites(true, false);
1431 : :
1432 [ - + ]: 1 : if (session_replication_state == NULL)
3273 andres@anarazel.de 1433 [ # # ]:UBC 0 : ereport(ERROR,
1434 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1435 : : errmsg("no replication origin is configured")));
1436 : :
3121 alvherre@alvh.no-ip. 1437 :CBC 1 : replorigin_session_origin_lsn = location;
1438 : 1 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1439 : :
3273 andres@anarazel.de 1440 : 1 : PG_RETURN_VOID();
1441 : : }
1442 : :
1443 : : Datum
3273 andres@anarazel.de 1444 :UBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1445 : : {
1446 : 0 : replorigin_check_prerequisites(true, false);
1447 : :
3121 alvherre@alvh.no-ip. 1448 : 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1449 : 0 : replorigin_session_origin_timestamp = 0;
1450 : :
3273 andres@anarazel.de 1451 : 0 : PG_RETURN_VOID();
1452 : : }
1453 : :
1454 : :
1455 : : Datum
3273 andres@anarazel.de 1456 :CBC 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1457 : : {
2590 noah@leadboat.com 1458 : 3 : text *name = PG_GETARG_TEXT_PP(0);
3249 bruce@momjian.us 1459 : 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1460 : : RepOriginId node;
1461 : :
3273 andres@anarazel.de 1462 : 3 : replorigin_check_prerequisites(true, false);
1463 : :
1464 : : /* lock to prevent the replication origin from vanishing */
1465 : 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1466 : :
1467 : 3 : node = replorigin_by_name(text_to_cstring(name), false);
1468 : :
1469 : : /*
1470 : : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1471 : : * xact hasn't committed yet. This is why this function should be used to
1472 : : * set up the initial replication state, but not for replay.
1473 : : */
3273 andres@anarazel.de 1474 :GBC 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1475 : : true /* go backward */ , true /* WAL log */ );
1476 : :
1477 : 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1478 : :
1479 : 2 : PG_RETURN_VOID();
1480 : : }
1481 : :
1482 : :
1483 : : /*
1484 : : * Return the replication progress for an individual replication origin.
1485 : : *
1486 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1487 : : * to a local transaction that has been flushed. This is useful if asynchronous
1488 : : * commits are used when replaying replicated transactions.
1489 : : */
1490 : : Datum
3273 andres@anarazel.de 1491 :CBC 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1492 : : {
1493 : : char *name;
1494 : : bool flush;
1495 : : RepOriginId roident;
1496 : 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1497 : :
1498 : 3 : replorigin_check_prerequisites(true, true);
1499 : :
1500 : 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1501 : 3 : flush = PG_GETARG_BOOL(1);
1502 : :
1503 : 3 : roident = replorigin_by_name(name, false);
1504 [ - + ]: 2 : Assert(OidIsValid(roident));
1505 : :
1506 : 2 : remote_lsn = replorigin_get_progress(roident, flush);
1507 : :
1508 [ - + ]: 2 : if (remote_lsn == InvalidXLogRecPtr)
3273 andres@anarazel.de 1509 :UBC 0 : PG_RETURN_NULL();
1510 : :
3273 andres@anarazel.de 1511 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1512 : : }
1513 : :
1514 : :
1515 : : Datum
1516 : 6 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1517 : : {
1518 : 6 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1519 : : int i;
1520 : : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1521 : :
1522 : : /* we want to return 0 rows if slot is set to zero */
1523 : 6 : replorigin_check_prerequisites(false, true);
1524 : :
544 michael@paquier.xyz 1525 : 6 : InitMaterializedSRF(fcinfo, 0);
1526 : :
1527 : : /* prevent slots from being concurrently dropped */
3273 andres@anarazel.de 1528 : 6 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1529 : :
1530 : : /*
1531 : : * Iterate through all possible replication_states, display if they are
1532 : : * filled. Note that we do not take any locks, so slightly corrupted/out
1533 : : * of date values are a possibility.
1534 : : */
1535 [ + + ]: 54 : for (i = 0; i < max_replication_slots; i++)
1536 : : {
1537 : : ReplicationState *state;
1538 : : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1539 : : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1540 : : char *roname;
1541 : :
1542 : 48 : state = &replication_states[i];
1543 : :
1544 : : /* unused slot, nothing to display */
1545 [ + + ]: 48 : if (state->roident == InvalidRepOriginId)
1546 : 43 : continue;
1547 : :
1548 : 5 : memset(values, 0, sizeof(values));
1549 : 5 : memset(nulls, 1, sizeof(nulls));
1550 : :
1551 : 5 : values[0] = ObjectIdGetDatum(state->roident);
1552 : 5 : nulls[0] = false;
1553 : :
1554 : : /*
1555 : : * We're not preventing the origin to be dropped concurrently, so
1556 : : * silently accept that it might be gone.
1557 : : */
1558 [ + - ]: 5 : if (replorigin_by_oid(state->roident, true,
1559 : : &roname))
1560 : : {
1561 : 5 : values[1] = CStringGetTextDatum(roname);
1562 : 5 : nulls[1] = false;
1563 : : }
1564 : :
1565 : 5 : LWLockAcquire(&state->lock, LW_SHARED);
1566 : :
3249 bruce@momjian.us 1567 : 5 : values[2] = LSNGetDatum(state->remote_lsn);
3273 andres@anarazel.de 1568 : 5 : nulls[2] = false;
1569 : :
1570 : 5 : values[3] = LSNGetDatum(state->local_lsn);
1571 : 5 : nulls[3] = false;
1572 : :
1573 : 5 : LWLockRelease(&state->lock);
1574 : :
769 michael@paquier.xyz 1575 : 5 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1576 : : values, nulls);
1577 : : }
1578 : :
3273 andres@anarazel.de 1579 : 6 : LWLockRelease(ReplicationOriginLock);
1580 : :
1581 : : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1582 : :
1583 : 6 : return (Datum) 0;
1584 : : }
|