LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 85.2 % 474 404 2 23 44 1 19 254 23 108 49 258 1 15
Current Date: 2023-04-08 17:13:01 Functions: 90.3 % 31 28 3 25 3 3 28
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 90.5 % 21 19 2 19
Legend: Lines: hit not hit (120,180] days: 100.0 % 1 1 1
(240..) days: 85.0 % 452 384 23 44 1 19 253 4 108 49 243
Function coverage date bins:
(60,120] days: 100.0 % 2 2 2
(240..) days: 43.3 % 60 26 3 25 1 3 28

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * origin.c
                                  4                 :  *    Logical replication progress tracking support.
                                  5                 :  *
                                  6                 :  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *    src/backend/replication/logical/origin.c
                                 10                 :  *
                                 11                 :  * NOTES
                                 12                 :  *
                                 13                 :  * This file provides the following:
                                 14                 :  * * An infrastructure to name nodes in a replication setup
                                 15                 :  * * A facility to efficiently store and persist replication progress in an
                                 16                 :  *   efficient and durable manner.
                                 17                 :  *
                                 18                 :  * Replication origin consist out of a descriptive, user defined, external
                                 19                 :  * name and a short, thus space efficient, internal 2 byte one. This split
                                 20                 :  * exists because replication origin have to be stored in WAL and shared
                                 21                 :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
                                 22                 :  * for the internal id of a replication origin as it seems unlikely that there
                                 23                 :  * soon will be more than 65k nodes in one replication setup; and using only
                                 24                 :  * two bytes allow us to be more space efficient.
                                 25                 :  *
                                 26                 :  * Replication progress is tracked in a shared memory table
                                 27                 :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
                                 28                 :  * ('slots') in this table are identified by the internal id. That's the case
                                 29                 :  * because it allows to increase replication progress during crash
                                 30                 :  * recovery. To allow doing so we store the original LSN (from the originating
                                 31                 :  * system) of a transaction in the commit record. That allows to recover the
                                 32                 :  * precise replayed state after crash recovery; without requiring synchronous
                                 33                 :  * commits. Allowing logical replication to use asynchronous commit is
                                 34                 :  * generally good for performance, but especially important as it allows a
                                 35                 :  * single threaded replay process to keep up with a source that has multiple
                                 36                 :  * backends generating changes concurrently.  For efficiency and simplicity
                                 37                 :  * reasons a backend can setup one replication origin that's from then used as
                                 38                 :  * the source of changes produced by the backend, until reset again.
                                 39                 :  *
                                 40                 :  * This infrastructure is intended to be used in cooperation with logical
                                 41                 :  * decoding. When replaying from a remote system the configured origin is
                                 42                 :  * provided to output plugins, allowing prevention of replication loops and
                                 43                 :  * other filtering.
                                 44                 :  *
                                 45                 :  * There are several levels of locking at work:
                                 46                 :  *
                                 47                 :  * * To create and drop replication origins an exclusive lock on
                                 48                 :  *   pg_replication_slot is required for the duration. That allows us to
                                 49                 :  *   safely and conflict free assign new origins using a dirty snapshot.
                                 50                 :  *
                                 51                 :  * * When creating an in-memory replication progress slot the ReplicationOrigin
                                 52                 :  *   LWLock has to be held exclusively; when iterating over the replication
                                 53                 :  *   progress a shared lock has to be held, the same when advancing the
                                 54                 :  *   replication progress of an individual backend that has not setup as the
                                 55                 :  *   session's replication origin.
                                 56                 :  *
                                 57                 :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
                                 58                 :  *   replication progress slot that slot's lwlock has to be held. That's
                                 59                 :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
                                 60                 :  *   all our platforms, but it also simplifies memory ordering concerns
                                 61                 :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
                                 62                 :  *   so it's less harmful to hold the lock over a WAL write
                                 63                 :  *   (cf. AdvanceReplicationProgress).
                                 64                 :  *
                                 65                 :  * ---------------------------------------------------------------------------
                                 66                 :  */
                                 67                 : 
                                 68                 : #include "postgres.h"
                                 69                 : 
                                 70                 : #include <unistd.h>
                                 71                 : #include <sys/stat.h>
                                 72                 : 
                                 73                 : #include "access/genam.h"
                                 74                 : #include "access/htup_details.h"
                                 75                 : #include "access/table.h"
                                 76                 : #include "access/xact.h"
                                 77                 : #include "access/xloginsert.h"
                                 78                 : #include "catalog/catalog.h"
                                 79                 : #include "catalog/indexing.h"
                                 80                 : #include "catalog/pg_subscription.h"
                                 81                 : #include "funcapi.h"
                                 82                 : #include "miscadmin.h"
                                 83                 : #include "nodes/execnodes.h"
                                 84                 : #include "pgstat.h"
                                 85                 : #include "replication/logical.h"
                                 86                 : #include "replication/origin.h"
                                 87                 : #include "storage/condition_variable.h"
                                 88                 : #include "storage/copydir.h"
                                 89                 : #include "storage/fd.h"
                                 90                 : #include "storage/ipc.h"
                                 91                 : #include "storage/lmgr.h"
                                 92                 : #include "utils/builtins.h"
                                 93                 : #include "utils/fmgroids.h"
                                 94                 : #include "utils/pg_lsn.h"
                                 95                 : #include "utils/rel.h"
                                 96                 : #include "utils/snapmgr.h"
                                 97                 : #include "utils/syscache.h"
                                 98                 : 
                                 99                 : /*
                                100                 :  * Replay progress of a single remote node.
                                101                 :  */
                                102                 : typedef struct ReplicationState
                                103                 : {
                                104                 :     /*
                                105                 :      * Local identifier for the remote node.
                                106                 :      */
                                107                 :     RepOriginId roident;
                                108                 : 
                                109                 :     /*
                                110                 :      * Location of the latest commit from the remote side.
                                111                 :      */
                                112                 :     XLogRecPtr  remote_lsn;
                                113                 : 
                                114                 :     /*
                                115                 :      * Remember the local lsn of the commit record so we can XLogFlush() to it
                                116                 :      * during a checkpoint so we know the commit record actually is safe on
                                117                 :      * disk.
                                118                 :      */
                                119                 :     XLogRecPtr  local_lsn;
                                120                 : 
                                121                 :     /*
                                122                 :      * PID of backend that's acquired slot, or 0 if none.
                                123                 :      */
                                124                 :     int         acquired_by;
                                125                 : 
                                126                 :     /*
                                127                 :      * Condition variable that's signaled when acquired_by changes.
                                128                 :      */
                                129                 :     ConditionVariable origin_cv;
                                130                 : 
                                131                 :     /*
                                132                 :      * Lock protecting remote_lsn and local_lsn.
                                133                 :      */
                                134                 :     LWLock      lock;
                                135                 : } ReplicationState;
                                136                 : 
                                137                 : /*
                                138                 :  * On disk version of ReplicationState.
                                139                 :  */
                                140                 : typedef struct ReplicationStateOnDisk
                                141                 : {
                                142                 :     RepOriginId roident;
                                143                 :     XLogRecPtr  remote_lsn;
                                144                 : } ReplicationStateOnDisk;
                                145                 : 
                                146                 : 
                                147                 : typedef struct ReplicationStateCtl
                                148                 : {
                                149                 :     /* Tranche to use for per-origin LWLocks */
                                150                 :     int         tranche_id;
                                151                 :     /* Array of length max_replication_slots */
                                152                 :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
                                153                 : } ReplicationStateCtl;
                                154                 : 
                                155                 : /* external variables */
                                156                 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
                                157                 : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
                                158                 : TimestampTz replorigin_session_origin_timestamp = 0;
                                159                 : 
                                160                 : /*
                                161                 :  * Base address into a shared memory array of replication states of size
                                162                 :  * max_replication_slots.
                                163                 :  *
                                164                 :  * XXX: Should we use a separate variable to size this rather than
                                165                 :  * max_replication_slots?
                                166                 :  */
                                167                 : static ReplicationState *replication_states;
                                168                 : 
                                169                 : /*
                                170                 :  * Actual shared memory block (replication_states[] is now part of this).
                                171                 :  */
                                172                 : static ReplicationStateCtl *replication_states_ctl;
                                173                 : 
                                174                 : /*
                                175                 :  * Backend-local, cached element from ReplicationState for use in a backend
                                176                 :  * replaying remote commits, so we don't have to search ReplicationState for
                                177                 :  * the backends current RepOriginId.
                                178                 :  */
                                179                 : static ReplicationState *session_replication_state = NULL;
                                180                 : 
                                181                 : /* Magic for on disk files. */
                                182                 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
                                183                 : 
                                184                 : static void
 2902 andres                    185 GIC          31 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
 2902 andres                    186 ECB             : {
 2902 andres                    187 GIC          31 :     if (check_slots && max_replication_slots == 0)
 2902 andres                    188 LBC           0 :         ereport(ERROR,
 2902 andres                    189 EUB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                190                 :                  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
                                191                 : 
 2902 andres                    192 GIC          31 :     if (!recoveryOK && RecoveryInProgress())
 2902 andres                    193 LBC           0 :         ereport(ERROR,
 2902 andres                    194 EUB             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
                                195                 :                  errmsg("cannot manipulate replication origins during recovery")));
 2902 andres                    196 GIC          31 : }
 2902 andres                    197 ECB             : 
                                198                 : 
                                199                 : /*
                                200                 :  * IsReservedOriginName
                                201                 :  *      True iff name is either "none" or "any".
                                202                 :  */
                                203                 : static bool
  262 akapila                   204 GNC           7 : IsReservedOriginName(const char *name)
                                205                 : {
                                206              13 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
                                207               6 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
                                208                 : }
                                209                 : 
                                210                 : /* ---------------------------------------------------------------------------
                                211                 :  * Functions for working with replication origins themselves.
                                212                 :  * ---------------------------------------------------------------------------
                                213                 :  */
                                214                 : 
                                215                 : /*
 2902 andres                    216 ECB             :  * Check for a persistent replication origin identified by name.
                                217                 :  *
                                218                 :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
                                219                 :  */
                                220                 : RepOriginId
  668 peter                     221 GIC         694 : replorigin_by_name(const char *roname, bool missing_ok)
                                222                 : {
                                223                 :     Form_pg_replication_origin ident;
 2878 bruce                     224             694 :     Oid         roident = InvalidOid;
                                225                 :     HeapTuple   tuple;
                                226                 :     Datum       roname_d;
                                227                 : 
 2902 andres                    228             694 :     roname_d = CStringGetTextDatum(roname);
                                229                 : 
                                230             694 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
                                231             694 :     if (HeapTupleIsValid(tuple))
                                232                 :     {
 2902 andres                    233 CBC         390 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
 2902 andres                    234 GIC         390 :         roident = ident->roident;
                                235             390 :         ReleaseSysCache(tuple);
 2902 andres                    236 ECB             :     }
 2902 andres                    237 GIC         304 :     else if (!missing_ok)
 2012 rhaas                     238               4 :         ereport(ERROR,
                                239                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
 2012 rhaas                     240 ECB             :                  errmsg("replication origin \"%s\" does not exist",
                                241                 :                         roname)));
 2902 andres                    242                 : 
 2902 andres                    243 CBC         690 :     return roident;
                                244                 : }
 2902 andres                    245 ECB             : 
                                246                 : /*
                                247                 :  * Create a replication origin.
                                248                 :  *
                                249                 :  * Needs to be called in a transaction.
                                250                 :  */
                                251                 : RepOriginId
  668 peter                     252 GIC         271 : replorigin_create(const char *roname)
                                253                 : {
                                254                 :     Oid         roident;
 2878 bruce                     255 CBC         271 :     HeapTuple   tuple = NULL;
                                256                 :     Relation    rel;
                                257                 :     Datum       roname_d;
                                258                 :     SnapshotData SnapshotDirty;
                                259                 :     SysScanDesc scan;
                                260                 :     ScanKeyData key;
                                261                 : 
 2902 andres                    262 GIC         271 :     roname_d = CStringGetTextDatum(roname);
                                263                 : 
 2902 andres                    264 CBC         271 :     Assert(IsTransactionState());
                                265                 : 
                                266                 :     /*
 2902 andres                    267 ECB             :      * We need the numeric replication origin to be 16bit wide, so we cannot
                                268                 :      * rely on the normal oid allocation. Instead we simply scan
                                269                 :      * pg_replication_origin for the first unused id. That's not particularly
                                270                 :      * efficient, but this should be a fairly infrequent operation - we can
                                271                 :      * easily spend a bit more code on this when it turns out it needs to be
                                272                 :      * faster.
                                273                 :      *
                                274                 :      * We handle concurrency by taking an exclusive lock (allowing reads!)
                                275                 :      * over the table for the duration of the search. Because we use a "dirty
                                276                 :      * snapshot" we can read rows that other in-progress sessions have
                                277                 :      * written, even though they would be invisible with normal snapshots. Due
                                278                 :      * to the exclusive lock there's no danger that new rows can appear while
                                279                 :      * we're checking.
                                280                 :      */
 2902 andres                    281 GIC         271 :     InitDirtySnapshot(SnapshotDirty);
                                282                 : 
 1539                           283             271 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
                                284                 : 
 2901                           285             492 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
                                286                 :     {
                                287                 :         bool        nulls[Natts_pg_replication_origin];
                                288                 :         Datum       values[Natts_pg_replication_origin];
                                289                 :         bool        collides;
                                290                 : 
 2902                           291             492 :         CHECK_FOR_INTERRUPTS();
                                292                 : 
 2902 andres                    293 CBC         492 :         ScanKeyInit(&key,
                                294                 :                     Anum_pg_replication_origin_roident,
 2902 andres                    295 ECB             :                     BTEqualStrategyNumber, F_OIDEQ,
                                296                 :                     ObjectIdGetDatum(roident));
                                297                 : 
 2902 andres                    298 GIC         492 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
                                299                 :                                   true /* indexOK */ ,
                                300                 :                                   &SnapshotDirty,
                                301                 :                                   1, &key);
                                302                 : 
 2902 andres                    303 CBC         492 :         collides = HeapTupleIsValid(systable_getnext(scan));
                                304                 : 
                                305             492 :         systable_endscan(scan);
                                306                 : 
 2902 andres                    307 GIC         492 :         if (!collides)
                                308                 :         {
                                309                 :             /*
 2902 andres                    310 ECB             :              * Ok, found an unused roident, insert the new row and do a CCI,
                                311                 :              * so our callers can look it up if they want to.
                                312                 :              */
 2902 andres                    313 GIC         271 :             memset(&nulls, 0, sizeof(nulls));
                                314                 : 
 2878 bruce                     315 CBC         271 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
 2902 andres                    316 GIC         271 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
 2902 andres                    317 ECB             : 
 2902 andres                    318 GIC         271 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 2259 alvherre                  319 CBC         271 :             CatalogTupleInsert(rel, tuple);
 2902 andres                    320 GIC         270 :             CommandCounterIncrement();
                                321             270 :             break;
                                322                 :         }
                                323                 :     }
                                324                 : 
 2878 bruce                     325 ECB             :     /* now release lock again,  */
 1539 andres                    326 GIC         270 :     table_close(rel, ExclusiveLock);
 2902 andres                    327 ECB             : 
 2902 andres                    328 CBC         270 :     if (tuple == NULL)
 2902 andres                    329 UIC           0 :         ereport(ERROR,
 2902 andres                    330 ECB             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
  234 john.naylor               331                 :                  errmsg("could not find free replication origin ID")));
 2902 andres                    332                 : 
 2902 andres                    333 CBC         270 :     heap_freetuple(tuple);
 2902 andres                    334 GIC         270 :     return roident;
                                335                 : }
                                336                 : 
                                337                 : /*
  788 akapila                   338 ECB             :  * Helper function to drop a replication origin.
                                339                 :  */
                                340                 : static void
   65 akapila                   341 GNC         221 : replorigin_state_clear(RepOriginId roident, bool nowait)
                                342                 : {
                                343                 :     int         i;
 2902 andres                    344 ECB             : 
 1916 tgl                       345                 :     /*
                                346                 :      * Clean up the slot state info, if there is any matching slot.
                                347                 :      */
 2070 alvherre                  348 GIC         221 : restart:
 2902 andres                    349             221 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
                                350                 : 
 2902 andres                    351 CBC         752 :     for (i = 0; i < max_replication_slots; i++)
                                352                 :     {
 2902 andres                    353 GIC         717 :         ReplicationState *state = &replication_states[i];
                                354                 : 
                                355             717 :         if (state->roident == roident)
                                356                 :         {
                                357                 :             /* found our slot, is it busy? */
 2902 andres                    358 CBC         186 :             if (state->acquired_by != 0)
 2902 andres                    359 ECB             :             {
                                360                 :                 ConditionVariable *cv;
 2070 alvherre                  361                 : 
 2070 alvherre                  362 UIC           0 :                 if (nowait)
 2070 alvherre                  363 LBC           0 :                     ereport(ERROR,
                                364                 :                             (errcode(ERRCODE_OBJECT_IN_USE),
  229 john.naylor               365 ECB             :                              errmsg("could not drop replication origin with ID %d, in use by PID %d",
                                366                 :                                     state->roident,
                                367                 :                                     state->acquired_by)));
 1916 tgl                       368                 : 
                                369                 :                 /*
                                370                 :                  * We must wait and then retry.  Since we don't know which CV
                                371                 :                  * to wait on until here, we can't readily use
 1916 tgl                       372 EUB             :                  * ConditionVariablePrepareToSleep (calling it here would be
                                373                 :                  * wrong, since we could miss the signal if we did so); just
                                374                 :                  * use ConditionVariableSleep directly.
                                375                 :                  */
 2070 alvherre                  376 UIC           0 :                 cv = &state->origin_cv;
                                377                 : 
                                378               0 :                 LWLockRelease(ReplicationOriginLock);
                                379                 : 
                                380               0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
                                381               0 :                 goto restart;
                                382                 :             }
                                383                 : 
                                384                 :             /* first make a WAL log entry */
                                385                 :             {
 2902 andres                    386 EUB             :                 xl_replorigin_drop xlrec;
                                387                 : 
 2902 andres                    388 GBC         186 :                 xlrec.node_id = roident;
 2902 andres                    389 GIC         186 :                 XLogBeginInsert();
 2902 andres                    390 GBC         186 :                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
                                391             186 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
                                392                 :             }
                                393                 : 
                                394                 :             /* then clear the in-memory slot */
 2902 andres                    395 GIC         186 :             state->roident = InvalidRepOriginId;
                                396             186 :             state->remote_lsn = InvalidXLogRecPtr;
                                397             186 :             state->local_lsn = InvalidXLogRecPtr;
 2902 andres                    398 CBC         186 :             break;
 2902 andres                    399 ECB             :         }
                                400                 :     }
 2902 andres                    401 CBC         221 :     LWLockRelease(ReplicationOriginLock);
 1916 tgl                       402 GIC         221 :     ConditionVariableCancelSleep();
 2902 andres                    403             221 : }
                                404                 : 
                                405                 : /*
                                406                 :  * Drop replication origin (by name).
                                407                 :  *
  788 akapila                   408 ECB             :  * Needs to be called in a transaction.
                                409                 :  */
                                410                 : void
  668 peter                     411 GIC         367 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
                                412                 : {
                                413                 :     RepOriginId roident;
  788 akapila                   414 ECB             :     Relation    rel;
                                415                 :     HeapTuple   tuple;
                                416                 : 
  788 akapila                   417 CBC         367 :     Assert(IsTransactionState());
                                418                 : 
   65 akapila                   419 GNC         367 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
  788 akapila                   420 EUB             : 
  788 akapila                   421 GIC         367 :     roident = replorigin_by_name(name, missing_ok);
                                422                 : 
                                423                 :     /* Lock the origin to prevent concurrent drops. */
   65 akapila                   424 GNC         366 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
                                425                 :                      AccessExclusiveLock);
                                426                 : 
                                427             366 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
                                428             366 :     if (!HeapTupleIsValid(tuple))
                                429                 :     {
                                430             145 :         if (!missing_ok)
   65 akapila                   431 UNC           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
                                432                 :                  roident);
                                433                 : 
                                434                 :         /*
                                435                 :          * We don't need to retain the locks if the origin is already dropped.
                                436                 :          */
   65 akapila                   437 GNC         145 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
                                438                 :                            AccessExclusiveLock);
                                439             145 :         table_close(rel, RowExclusiveLock);
                                440             145 :         return;
                                441                 :     }
                                442                 : 
                                443             221 :     replorigin_state_clear(roident, nowait);
                                444                 : 
                                445                 :     /*
                                446                 :      * Now, we can delete the catalog entry.
                                447                 :      */
                                448             221 :     CatalogTupleDelete(rel, &tuple->t_self);
                                449             221 :     ReleaseSysCache(tuple);
                                450                 : 
                                451             221 :     CommandCounterIncrement();
                                452                 : 
  788 akapila                   453 ECB             :     /* We keep the lock on pg_replication_origin until commit */
  788 akapila                   454 GIC         221 :     table_close(rel, NoLock);
  788 akapila                   455 ECB             : }
 2902 andres                    456                 : 
                                457                 : /*
                                458                 :  * Lookup replication origin via its oid and return the name.
                                459                 :  *
                                460                 :  * The external name is palloc'd in the calling context.
                                461                 :  *
                                462                 :  * Returns true if the origin is known, false otherwise.
                                463                 :  */
                                464                 : bool
 2902 andres                    465 CBC           9 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
                                466                 : {
 2878 bruce                     467 ECB             :     HeapTuple   tuple;
                                468                 :     Form_pg_replication_origin ric;
                                469                 : 
 2902 andres                    470 CBC           9 :     Assert(OidIsValid((Oid) roident));
 2902 andres                    471 GIC           9 :     Assert(roident != InvalidRepOriginId);
                                472               9 :     Assert(roident != DoNotReplicateId);
                                473                 : 
                                474               9 :     tuple = SearchSysCache1(REPLORIGIDENT,
                                475                 :                             ObjectIdGetDatum((Oid) roident));
                                476                 : 
                                477               9 :     if (HeapTupleIsValid(tuple))
                                478                 :     {
                                479               9 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
                                480               9 :         *roname = text_to_cstring(&ric->roname);
 2902 andres                    481 CBC           9 :         ReleaseSysCache(tuple);
                                482                 : 
 2902 andres                    483 GIC           9 :         return true;
                                484                 :     }
                                485                 :     else
 2902 andres                    486 ECB             :     {
 2902 andres                    487 LBC           0 :         *roname = NULL;
 2902 andres                    488 ECB             : 
 2902 andres                    489 UIC           0 :         if (!missing_ok)
 2012 rhaas                     490 LBC           0 :             ereport(ERROR,
                                491                 :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
                                492                 :                      errmsg("replication origin with ID %d does not exist",
 2012 rhaas                     493 ECB             :                             roident)));
                                494                 : 
 2902 andres                    495 LBC           0 :         return false;
 2902 andres                    496 ECB             :     }
                                497                 : }
                                498                 : 
                                499                 : 
                                500                 : /* ---------------------------------------------------------------------------
                                501                 :  * Functions for handling replication progress.
                                502                 :  * ---------------------------------------------------------------------------
 2902 andres                    503 EUB             :  */
                                504                 : 
                                505                 : Size
 2902 andres                    506 GBC        6390 : ReplicationOriginShmemSize(void)
                                507                 : {
 2902 andres                    508 GIC        6390 :     Size        size = 0;
                                509                 : 
                                510                 :     /*
 2802 andres                    511 EUB             :      * XXX: max_replication_slots is arguably the wrong thing to use, as here
                                512                 :      * we keep the replay state of *remote* transactions. But for now it seems
                                513                 :      * sufficient to reuse it, rather than introduce a separate GUC.
                                514                 :      */
 2902 andres                    515 GIC        6390 :     if (max_replication_slots == 0)
 2902 andres                    516 UIC           0 :         return size;
                                517                 : 
 2902 andres                    518 GIC        6390 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
                                519                 : 
                                520            6390 :     size = add_size(size,
                                521                 :                     mul_size(max_replication_slots, sizeof(ReplicationState)));
 2902 andres                    522 CBC        6390 :     return size;
                                523                 : }
 2902 andres                    524 ECB             : 
                                525                 : void
 2902 andres                    526 GIC        1826 : ReplicationOriginShmemInit(void)
                                527                 : {
                                528                 :     bool        found;
                                529                 : 
                                530            1826 :     if (max_replication_slots == 0)
 2902 andres                    531 LBC           0 :         return;
 2902 andres                    532 EUB             : 
 2902 andres                    533 GIC        1826 :     replication_states_ctl = (ReplicationStateCtl *)
 2902 andres                    534 CBC        1826 :         ShmemInitStruct("ReplicationOriginState",
                                535                 :                         ReplicationOriginShmemSize(),
 2902 andres                    536 ECB             :                         &found);
 2878 bruce                     537 GIC        1826 :     replication_states = replication_states_ctl->states;
 2902 andres                    538 ECB             : 
 2902 andres                    539 GIC        1826 :     if (!found)
                                540                 :     {
                                541                 :         int         i;
 2902 andres                    542 ECB             : 
 1059 tgl                       543 GIC      130639 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
                                544                 : 
                                545            1826 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
 2902 andres                    546 ECB             : 
 2902 andres                    547 GBC       19967 :         for (i = 0; i < max_replication_slots; i++)
                                548                 :         {
 2902 andres                    549 CBC       18141 :             LWLockInitialize(&replication_states[i].lock,
                                550           18141 :                              replication_states_ctl->tranche_id);
 2070 alvherre                  551 GIC       18141 :             ConditionVariableInit(&replication_states[i].origin_cv);
                                552                 :         }
 2902 andres                    553 ECB             :     }
                                554                 : }
                                555                 : 
                                556                 : /* ---------------------------------------------------------------------------
                                557                 :  * Perform a checkpoint of each replication origin's progress with respect to
                                558                 :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
                                559                 :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
                                560                 :  * if the transactions were originally committed asynchronously.
                                561                 :  *
                                562                 :  * We store checkpoints in the following format:
                                563                 :  * +-------+------------------------+------------------+-----+--------+
                                564                 :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
                                565                 :  * +-------+------------------------+------------------+-----+--------+
                                566                 :  *
                                567                 :  * So its just the magic, followed by the statically sized
                                568                 :  * ReplicationStateOnDisk structs. Note that the maximum number of
                                569                 :  * ReplicationState is determined by max_replication_slots.
                                570                 :  * ---------------------------------------------------------------------------
                                571                 :  */
                                572                 : void
 2902 andres                    573 GIC        2363 : CheckPointReplicationOrigin(void)
                                574                 : {
                                575            2363 :     const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
                                576            2363 :     const char *path = "pg_logical/replorigin_checkpoint";
                                577                 :     int         tmpfd;
                                578                 :     int         i;
                                579            2363 :     uint32      magic = REPLICATION_STATE_MAGIC;
                                580                 :     pg_crc32c   crc;
                                581                 : 
                                582            2363 :     if (max_replication_slots == 0)
 2902 andres                    583 UIC           0 :         return;
                                584                 : 
 2902 andres                    585 GIC        2363 :     INIT_CRC32C(crc);
                                586                 : 
                                587                 :     /* make sure no old temp file is remaining */
                                588            2363 :     if (unlink(tmppath) < 0 && errno != ENOENT)
 2902 andres                    589 LBC           0 :         ereport(PANIC,
                                590                 :                 (errcode_for_file_access(),
 2902 andres                    591 ECB             :                  errmsg("could not remove file \"%s\": %m",
 2802                           592                 :                         tmppath)));
                                593                 : 
                                594                 :     /*
  697 tgl                       595                 :      * no other backend can perform this at the same time; only one checkpoint
                                596                 :      * can happen at a time.
                                597                 :      */
 2024 peter_e                   598 CBC        2363 :     tmpfd = OpenTransientFile(tmppath,
 2024 peter_e                   599 EUB             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
 2902 andres                    600 GIC        2363 :     if (tmpfd < 0)
 2902 andres                    601 LBC           0 :         ereport(PANIC,
                                602                 :                 (errcode_for_file_access(),
                                603                 :                  errmsg("could not create file \"%s\": %m",
 2902 andres                    604 ECB             :                         tmppath)));
 2902 andres                    605 EUB             : 
                                606                 :     /* write magic */
 1708 michael                   607 GIC        2363 :     errno = 0;
 2902 andres                    608            2363 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
                                609                 :     {
                                610                 :         /* if write didn't set errno, assume problem is no disk space */
 1453 michael                   611 UIC           0 :         if (errno == 0)
                                612               0 :             errno = ENOSPC;
 2902 andres                    613               0 :         ereport(PANIC,
 2902 andres                    614 ECB             :                 (errcode_for_file_access(),
                                615                 :                  errmsg("could not write to file \"%s\": %m",
                                616                 :                         tmppath)));
 2902 andres                    617 EUB             :     }
 2902 andres                    618 GIC        2363 :     COMP_CRC32C(crc, &magic, sizeof(magic));
                                619                 : 
                                620                 :     /* prevent concurrent creations/drops */
                                621            2363 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
                                622                 : 
 2902 andres                    623 ECB             :     /* write actual data */
 2902 andres                    624 CBC       25839 :     for (i = 0; i < max_replication_slots; i++)
                                625                 :     {
                                626                 :         ReplicationStateOnDisk disk_state;
 2902 andres                    627 GBC       23476 :         ReplicationState *curstate = &replication_states[i];
 2878 bruce                     628 EUB             :         XLogRecPtr  local_lsn;
 2902 andres                    629                 : 
 2902 andres                    630 GIC       23476 :         if (curstate->roident == InvalidRepOriginId)
                                631           23449 :             continue;
                                632                 : 
                                633                 :         /* zero, to avoid uninitialized padding bytes */
 2177 andres                    634 CBC          27 :         memset(&disk_state, 0, sizeof(disk_state));
                                635                 : 
 2902 andres                    636 GIC          27 :         LWLockAcquire(&curstate->lock, LW_SHARED);
 2902 andres                    637 ECB             : 
 2902 andres                    638 GIC          27 :         disk_state.roident = curstate->roident;
                                639                 : 
 2902 andres                    640 CBC          27 :         disk_state.remote_lsn = curstate->remote_lsn;
 2902 andres                    641 GIC          27 :         local_lsn = curstate->local_lsn;
                                642                 : 
 2902 andres                    643 CBC          27 :         LWLockRelease(&curstate->lock);
                                644                 : 
                                645                 :         /* make sure we only write out a commit that's persistent */
                                646              27 :         XLogFlush(local_lsn);
 2902 andres                    647 ECB             : 
 1708 michael                   648 GIC          27 :         errno = 0;
 2902 andres                    649              27 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
 2902 andres                    650 ECB             :             sizeof(disk_state))
                                651                 :         {
 1749 michael                   652                 :             /* if write didn't set errno, assume problem is no disk space */
 1453 michael                   653 UIC           0 :             if (errno == 0)
 1453 michael                   654 LBC           0 :                 errno = ENOSPC;
 2902 andres                    655 UIC           0 :             ereport(PANIC,
 2902 andres                    656 ECB             :                     (errcode_for_file_access(),
                                657                 :                      errmsg("could not write to file \"%s\": %m",
                                658                 :                             tmppath)));
                                659                 :         }
                                660                 : 
 2902 andres                    661 GIC          27 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
 2902 andres                    662 ECB             :     }
                                663                 : 
 2902 andres                    664 CBC        2363 :     LWLockRelease(ReplicationOriginLock);
 2902 andres                    665 ECB             : 
                                666                 :     /* write out the CRC */
 2902 andres                    667 GIC        2363 :     FIN_CRC32C(crc);
 1708 michael                   668            2363 :     errno = 0;
 2902 andres                    669 GBC        2363 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
 2902 andres                    670 EUB             :     {
 1749 michael                   671                 :         /* if write didn't set errno, assume problem is no disk space */
 1453 michael                   672 UIC           0 :         if (errno == 0)
                                673               0 :             errno = ENOSPC;
 2902 andres                    674               0 :         ereport(PANIC,
                                675                 :                 (errcode_for_file_access(),
                                676                 :                  errmsg("could not write to file \"%s\": %m",
 2902 andres                    677 ECB             :                         tmppath)));
                                678                 :     }
                                679                 : 
 1373 peter                     680 CBC        2363 :     if (CloseTransientFile(tmpfd) != 0)
 1492 michael                   681 UIC           0 :         ereport(PANIC,
                                682                 :                 (errcode_for_file_access(),
 1492 michael                   683 ECB             :                  errmsg("could not close file \"%s\": %m",
                                684                 :                         tmppath)));
 2902 andres                    685                 : 
                                686                 :     /* fsync, rename to permanent file, fsync file and directory */
 2587 andres                    687 GIC        2363 :     durable_rename(tmppath, path, PANIC);
 2902 andres                    688 EUB             : }
                                689                 : 
                                690                 : /*
                                691                 :  * Recover replication replay status from checkpoint data saved earlier by
                                692                 :  * CheckPointReplicationOrigin.
                                693                 :  *
                                694                 :  * This only needs to be called at startup and *not* during every checkpoint
                                695                 :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
 2902 andres                    696 ECB             :  * state thereafter can be recovered by looking at commit records.
 2902 andres                    697 EUB             :  */
                                698                 : void
 2902 andres                    699 GIC        1176 : StartupReplicationOrigin(void)
                                700                 : {
                                701            1176 :     const char *path = "pg_logical/replorigin_checkpoint";
                                702                 :     int         fd;
 2878 bruce                     703 ECB             :     int         readBytes;
 2878 bruce                     704 GIC        1176 :     uint32      magic = REPLICATION_STATE_MAGIC;
                                705            1176 :     int         last_state = 0;
                                706                 :     pg_crc32c   file_crc;
                                707                 :     pg_crc32c   crc;
                                708                 : 
                                709                 :     /* don't want to overwrite already existing state */
                                710                 : #ifdef USE_ASSERT_CHECKING
                                711                 :     static bool already_started = false;
                                712                 : 
 2902 andres                    713            1176 :     Assert(!already_started);
                                714            1176 :     already_started = true;
 2902 andres                    715 ECB             : #endif
                                716                 : 
 2902 andres                    717 CBC        1176 :     if (max_replication_slots == 0)
 2902 andres                    718 GIC         305 :         return;
                                719                 : 
 2902 andres                    720 CBC        1176 :     INIT_CRC32C(crc);
 2902 andres                    721 ECB             : 
 2902 andres                    722 GIC        1176 :     elog(DEBUG2, "starting up replication origin progress state");
                                723                 : 
 2024 peter_e                   724            1176 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
                                725                 : 
                                726                 :     /*
                                727                 :      * might have had max_replication_slots == 0 last run, or we just brought
                                728                 :      * up a standby.
 2902 andres                    729 ECB             :      */
 2902 andres                    730 CBC        1176 :     if (fd < 0 && errno == ENOENT)
 2902 andres                    731 GIC         305 :         return;
                                732             871 :     else if (fd < 0)
 2902 andres                    733 LBC           0 :         ereport(PANIC,
 2902 andres                    734 ECB             :                 (errcode_for_file_access(),
                                735                 :                  errmsg("could not open file \"%s\": %m",
                                736                 :                         path)));
                                737                 : 
 2624 magnus                    738                 :     /* verify magic, that is written even if nothing was active */
 2902 andres                    739 GIC         871 :     readBytes = read(fd, &magic, sizeof(magic));
 2902 andres                    740 CBC         871 :     if (readBytes != sizeof(magic))
                                741                 :     {
 1726 michael                   742 UIC           0 :         if (readBytes < 0)
                                743               0 :             ereport(PANIC,
                                744                 :                     (errcode_for_file_access(),
                                745                 :                      errmsg("could not read file \"%s\": %m",
 1726 michael                   746 ECB             :                             path)));
                                747                 :         else
 1726 michael                   748 LBC           0 :             ereport(PANIC,
 1721 michael                   749 EUB             :                     (errcode(ERRCODE_DATA_CORRUPTED),
                                750                 :                      errmsg("could not read file \"%s\": read %d of %zu",
                                751                 :                             path, readBytes, sizeof(magic))));
                                752                 :     }
 2902 andres                    753 GIC         871 :     COMP_CRC32C(crc, &magic, sizeof(magic));
                                754                 : 
 2902 andres                    755 CBC         871 :     if (magic != REPLICATION_STATE_MAGIC)
 2902 andres                    756 LBC           0 :         ereport(PANIC,
                                757                 :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
 2118 tgl                       758 EUB             :                         magic, REPLICATION_STATE_MAGIC)));
 2902 andres                    759                 : 
                                760                 :     /* we can skip locking here, no other access is possible */
                                761                 : 
                                762                 :     /* recover individual states, until there are no more to be found */
                                763                 :     while (true)
 2902 andres                    764 GBC           3 :     {
                                765                 :         ReplicationStateOnDisk disk_state;
                                766                 : 
 2902 andres                    767 GIC         874 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
                                768                 : 
 2902 andres                    769 ECB             :         /* no further data */
 2902 andres                    770 GIC         874 :         if (readBytes == sizeof(crc))
 2902 andres                    771 ECB             :         {
 2902 andres                    772 EUB             :             /* not pretty, but simple ... */
 2878 bruce                     773 GIC         871 :             file_crc = *(pg_crc32c *) &disk_state;
 2902 andres                    774             871 :             break;
                                775                 :         }
                                776                 : 
                                777               3 :         if (readBytes < 0)
                                778                 :         {
 2902 andres                    779 UIC           0 :             ereport(PANIC,
 2902 andres                    780 ECB             :                     (errcode_for_file_access(),
                                781                 :                      errmsg("could not read file \"%s\": %m",
                                782                 :                             path)));
                                783                 :         }
                                784                 : 
 2902 andres                    785 GIC           3 :         if (readBytes != sizeof(disk_state))
 2902 andres                    786 ECB             :         {
 2902 andres                    787 UIC           0 :             ereport(PANIC,
                                788                 :                     (errcode_for_file_access(),
 2902 andres                    789 ECB             :                      errmsg("could not read file \"%s\": read %d of %zu",
                                790                 :                             path, readBytes, sizeof(disk_state))));
                                791                 :         }
                                792                 : 
 2902 andres                    793 CBC           3 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
                                794                 : 
 2902 andres                    795 GBC           3 :         if (last_state == max_replication_slots)
 2902 andres                    796 UIC           0 :             ereport(PANIC,
                                797                 :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                798                 :                      errmsg("could not find free replication state, increase max_replication_slots")));
                                799                 : 
                                800                 :         /* copy data to shared memory */
 2902 andres                    801 CBC           3 :         replication_states[last_state].roident = disk_state.roident;
 2902 andres                    802 GIC           3 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
 2902 andres                    803 GBC           3 :         last_state++;
                                804                 : 
  856 peter                     805 GIC           3 :         ereport(LOG,
                                806                 :                 (errmsg("recovered replication state of node %d to %X/%X",
                                807                 :                         disk_state.roident,
                                808                 :                         LSN_FORMAT_ARGS(disk_state.remote_lsn))));
 2902 andres                    809 ECB             :     }
                                810                 : 
                                811                 :     /* now check checksum */
 2902 andres                    812 GBC         871 :     FIN_CRC32C(crc);
 2902 andres                    813 GIC         871 :     if (file_crc != crc)
 2902 andres                    814 UIC           0 :         ereport(PANIC,
                                815                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                                816                 :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
 2902 andres                    817 ECB             :                         crc, file_crc)));
                                818                 : 
 1373 peter                     819 CBC         871 :     if (CloseTransientFile(fd) != 0)
 1492 michael                   820 UIC           0 :         ereport(PANIC,
 1492 michael                   821 ECB             :                 (errcode_for_file_access(),
                                822                 :                  errmsg("could not close file \"%s\": %m",
                                823                 :                         path)));
                                824                 : }
                                825                 : 
                                826                 : void
 2902 andres                    827 GIC           6 : replorigin_redo(XLogReaderState *record)
 2902 andres                    828 ECB             : {
 2902 andres                    829 CBC           6 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 2902 andres                    830 EUB             : 
 2902 andres                    831 GIC           6 :     switch (info)
                                832                 :     {
                                833               3 :         case XLOG_REPLORIGIN_SET:
                                834                 :             {
 2902 andres                    835 CBC           3 :                 xl_replorigin_set *xlrec =
 2878 bruce                     836 GBC           3 :                 (xl_replorigin_set *) XLogRecGetData(record);
                                837                 : 
 2902 andres                    838 GIC           3 :                 replorigin_advance(xlrec->node_id,
                                839                 :                                    xlrec->remote_lsn, record->EndRecPtr,
 2878 bruce                     840               3 :                                    xlrec->force /* backward */ ,
                                841                 :                                    false /* WAL log */ );
 2902 andres                    842               3 :                 break;
 2902 andres                    843 ECB             :             }
 2902 andres                    844 GIC           3 :         case XLOG_REPLORIGIN_DROP:
 2902 andres                    845 ECB             :             {
                                846                 :                 xl_replorigin_drop *xlrec;
 2878 bruce                     847                 :                 int         i;
                                848                 : 
 2902 andres                    849 CBC           3 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
                                850                 : 
                                851               4 :                 for (i = 0; i < max_replication_slots; i++)
 2902 andres                    852 ECB             :                 {
 2902 andres                    853 GIC           4 :                     ReplicationState *state = &replication_states[i];
 2902 andres                    854 ECB             : 
                                855                 :                     /* found our slot */
 2902 andres                    856 CBC           4 :                     if (state->roident == xlrec->node_id)
                                857                 :                     {
 2902 andres                    858 ECB             :                         /* reset entry */
 2902 andres                    859 GIC           3 :                         state->roident = InvalidRepOriginId;
 2902 andres                    860 CBC           3 :                         state->remote_lsn = InvalidXLogRecPtr;
 2902 andres                    861 GIC           3 :                         state->local_lsn = InvalidXLogRecPtr;
                                862               3 :                         break;
                                863                 :                     }
                                864                 :                 }
 2902 andres                    865 CBC           3 :                 break;
                                866                 :             }
 2902 andres                    867 LBC           0 :         default:
 2902 andres                    868 UIC           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
 2902 andres                    869 ECB             :     }
 2902 andres                    870 GIC           6 : }
                                871                 : 
 2902 andres                    872 ECB             : 
                                873                 : /*
                                874                 :  * Tell the replication origin progress machinery that a commit from 'node'
                                875                 :  * that originated at the LSN remote_commit on the remote node was replayed
                                876                 :  * successfully and that we don't need to do so again. In combination with
 2750 alvherre                  877                 :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
  824 akapila                   878                 :  * that ensures we won't lose knowledge about that after a crash if the
                                879                 :  * transaction had a persistent effect (think of asynchronous commits).
                                880                 :  *
 2902 andres                    881                 :  * local_commit needs to be a local LSN of the commit so that we can make sure
                                882                 :  * upon a checkpoint that enough WAL has been persisted to disk.
 2902 andres                    883 EUB             :  *
                                884                 :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
                                885                 :  * unless running in recovery.
 2902 andres                    886 ECB             :  */
                                887                 : void
 2902 andres                    888 GIC         200 : replorigin_advance(RepOriginId node,
                                889                 :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
                                890                 :                    bool go_backward, bool wal_log)
                                891                 : {
                                892                 :     int         i;
                                893             200 :     ReplicationState *replication_state = NULL;
                                894             200 :     ReplicationState *free_state = NULL;
                                895                 : 
                                896             200 :     Assert(node != InvalidRepOriginId);
                                897                 : 
                                898                 :     /* we don't track DoNotReplicateId */
                                899             200 :     if (node == DoNotReplicateId)
 2902 andres                    900 UIC           0 :         return;
                                901                 : 
                                902                 :     /*
                                903                 :      * XXX: For the case where this is called by WAL replay, it'd be more
 2902 andres                    904 ECB             :      * efficient to restore into a backend local hashtable and only dump into
                                905                 :      * shmem after recovery is finished. Let's wait with implementing that
                                906                 :      * till it's shown to be a measurable expense
                                907                 :      */
                                908                 : 
                                909                 :     /* Lock exclusively, as we may have to create a new table entry. */
 2902 andres                    910 CBC         200 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
                                911                 : 
 2902 andres                    912 ECB             :     /*
                                913                 :      * Search for either an existing slot for the origin, or a free one we can
                                914                 :      * use.
                                915                 :      */
 2902 andres                    916 GBC        1749 :     for (i = 0; i < max_replication_slots; i++)
                                917                 :     {
 2902 andres                    918 GIC        1595 :         ReplicationState *curstate = &replication_states[i];
                                919                 : 
                                920                 :         /* remember where to insert if necessary */
                                921            1595 :         if (curstate->roident == InvalidRepOriginId &&
                                922                 :             free_state == NULL)
                                923                 :         {
                                924             156 :             free_state = curstate;
                                925             156 :             continue;
 2902 andres                    926 ECB             :         }
                                927                 : 
                                928                 :         /* not our slot */
 2902 andres                    929 GIC        1439 :         if (curstate->roident != node)
                                930                 :         {
                                931            1393 :             continue;
 2902 andres                    932 ECB             :         }
                                933                 : 
                                934                 :         /* ok, found slot */
 2902 andres                    935 GIC          46 :         replication_state = curstate;
                                936                 : 
 2902 andres                    937 CBC          46 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
                                938                 : 
                                939                 :         /* Make sure it's not used by somebody else */
                                940              46 :         if (replication_state->acquired_by != 0)
 2902 andres                    941 ECB             :         {
 2902 andres                    942 UIC           0 :             ereport(ERROR,
                                943                 :                     (errcode(ERRCODE_OBJECT_IN_USE),
                                944                 :                      errmsg("replication origin with ID %d is already active for PID %d",
 2902 andres                    945 ECB             :                             replication_state->roident,
                                946                 :                             replication_state->acquired_by)));
                                947                 :         }
                                948                 : 
 2902 andres                    949 GIC          46 :         break;
                                950                 :     }
 2902 andres                    951 ECB             : 
 2902 andres                    952 GIC         200 :     if (replication_state == NULL && free_state == NULL)
 2902 andres                    953 LBC           0 :         ereport(ERROR,
                                954                 :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                955                 :                  errmsg("could not find free replication state slot for replication origin with ID %d",
 2902 andres                    956 ECB             :                         node),
                                957                 :                  errhint("Increase max_replication_slots and try again.")));
 2902 andres                    958 EUB             : 
 2902 andres                    959 GIC         200 :     if (replication_state == NULL)
                                960                 :     {
                                961                 :         /* initialize new slot */
                                962             154 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
                                963             154 :         replication_state = free_state;
                                964             154 :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
 2902 andres                    965 CBC         154 :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
 2902 andres                    966 GIC         154 :         replication_state->roident = node;
                                967                 :     }
 2902 andres                    968 ECB             : 
 2902 andres                    969 GBC         200 :     Assert(replication_state->roident != InvalidRepOriginId);
                                970                 : 
                                971                 :     /*
                                972                 :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
                                973                 :      * and the standby gets the message. Primarily this will be called during
                                974                 :      * WAL replay (of commit records) where no WAL logging is necessary.
 2902 andres                    975 ECB             :      */
 2902 andres                    976 GIC         200 :     if (wal_log)
                                977                 :     {
 2902 andres                    978 ECB             :         xl_replorigin_set xlrec;
 2878 bruce                     979                 : 
 2902 andres                    980 CBC         155 :         xlrec.remote_lsn = remote_commit;
                                981             155 :         xlrec.node_id = node;
                                982             155 :         xlrec.force = go_backward;
                                983                 : 
 2902 andres                    984 GIC         155 :         XLogBeginInsert();
 2902 andres                    985 CBC         155 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
                                986                 : 
 2902 andres                    987 GIC         155 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
                                988                 :     }
                                989                 : 
                                990                 :     /*
                                991                 :      * Due to - harmless - race conditions during a checkpoint we could see
  634 akapila                   992 ECB             :      * values here that are older than the ones we already have in memory. We
                                993                 :      * could also see older values for prepared transactions when the prepare
                                994                 :      * is sent at a later point of time along with commit prepared and there
                                995                 :      * are other transactions commits between prepare and commit prepared. See
                                996                 :      * ReorderBufferFinishPrepared. Don't overwrite those.
 2902 andres                    997                 :      */
 2902 andres                    998 CBC         200 :     if (go_backward || replication_state->remote_lsn < remote_commit)
 2902 andres                    999 GIC         189 :         replication_state->remote_lsn = remote_commit;
 2902 andres                   1000 CBC         200 :     if (local_commit != InvalidXLogRecPtr &&
                               1001              42 :         (go_backward || replication_state->local_lsn < local_commit))
 2902 andres                   1002 GIC          45 :         replication_state->local_lsn = local_commit;
 2902 andres                   1003 CBC         200 :     LWLockRelease(&replication_state->lock);
                               1004                 : 
                               1005                 :     /*
                               1006                 :      * Release *after* changing the LSNs, slot isn't acquired and thus could
                               1007                 :      * otherwise be dropped anytime.
                               1008                 :      */
 2902 andres                   1009 GIC         200 :     LWLockRelease(ReplicationOriginLock);
                               1010                 : }
                               1011                 : 
                               1012                 : 
                               1013                 : XLogRecPtr
 2902 andres                   1014 CBC           8 : replorigin_get_progress(RepOriginId node, bool flush)
 2902 andres                   1015 ECB             : {
                               1016                 :     int         i;
 2902 andres                   1017 CBC           8 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
                               1018               8 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
 2902 andres                   1019 ECB             : 
                               1020                 :     /* prevent slots from being concurrently dropped */
 2902 andres                   1021 GIC           8 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
                               1022                 : 
                               1023              38 :     for (i = 0; i < max_replication_slots; i++)
                               1024                 :     {
 2902 andres                   1025 ECB             :         ReplicationState *state;
                               1026                 : 
 2902 andres                   1027 GIC          35 :         state = &replication_states[i];
                               1028                 : 
                               1029              35 :         if (state->roident == node)
 2902 andres                   1030 ECB             :         {
 2902 andres                   1031 GIC           5 :             LWLockAcquire(&state->lock, LW_SHARED);
                               1032                 : 
 2902 andres                   1033 CBC           5 :             remote_lsn = state->remote_lsn;
                               1034               5 :             local_lsn = state->local_lsn;
                               1035                 : 
 2902 andres                   1036 GIC           5 :             LWLockRelease(&state->lock);
 2902 andres                   1037 ECB             : 
 2902 andres                   1038 GIC           5 :             break;
 2902 andres                   1039 ECB             :         }
                               1040                 :     }
                               1041                 : 
 2902 andres                   1042 GIC           8 :     LWLockRelease(ReplicationOriginLock);
 2902 andres                   1043 ECB             : 
 2902 andres                   1044 GIC           8 :     if (flush && local_lsn != InvalidXLogRecPtr)
 2902 andres                   1045 CBC           1 :         XLogFlush(local_lsn);
                               1046                 : 
                               1047               8 :     return remote_lsn;
                               1048                 : }
 2902 andres                   1049 ECB             : 
                               1050                 : /*
                               1051                 :  * Tear down a (possibly) configured session replication origin during process
                               1052                 :  * exit.
                               1053                 :  */
                               1054                 : static void
 2902 andres                   1055 GIC         314 : ReplicationOriginExitCleanup(int code, Datum arg)
                               1056                 : {
 2064 tgl                      1057             314 :     ConditionVariable *cv = NULL;
 2070 alvherre                 1058 ECB             : 
 2902 andres                   1059 GIC         314 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 2902 andres                   1060 ECB             : 
 2902 andres                   1061 CBC         314 :     if (session_replication_state != NULL &&
 2902 andres                   1062 GIC         164 :         session_replication_state->acquired_by == MyProcPid)
 2902 andres                   1063 ECB             :     {
 2070 alvherre                 1064 GIC         154 :         cv = &session_replication_state->origin_cv;
                               1065                 : 
 2902 andres                   1066             154 :         session_replication_state->acquired_by = 0;
                               1067             154 :         session_replication_state = NULL;
                               1068                 :     }
                               1069                 : 
                               1070             314 :     LWLockRelease(ReplicationOriginLock);
 2070 alvherre                 1071 ECB             : 
 2070 alvherre                 1072 GIC         314 :     if (cv)
 2070 alvherre                 1073 CBC         154 :         ConditionVariableBroadcast(cv);
 2902 andres                   1074 GIC         314 : }
 2902 andres                   1075 ECB             : 
                               1076                 : /*
                               1077                 :  * Setup a replication origin in the shared memory struct if it doesn't
  338 michael                  1078                 :  * already exist and cache access to the specific ReplicationSlot so the
                               1079                 :  * array doesn't have to be searched when calling
 2902 andres                   1080                 :  * replorigin_session_advance().
                               1081                 :  *
                               1082                 :  * Normally only one such cached origin can exist per process so the cached
                               1083                 :  * value can only be set again after the previous value is torn down with
                               1084                 :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
                               1085                 :  * (meaning the slot is not allowed to be already acquired by another process).
                               1086                 :  *
                               1087                 :  * However, sometimes multiple processes can safely re-use the same origin slot
                               1088                 :  * (for example, multiple parallel apply processes can safely use the same
                               1089                 :  * origin, provided they maintain commit order by allowing only one process to
                               1090                 :  * commit at a time). For this case the first process must pass acquired_by =
                               1091                 :  * 0, and then the other processes sharing that same origin can pass
                               1092                 :  * acquired_by = PID of the first process.
                               1093                 :  */
                               1094                 : void
   90 akapila                  1095 GNC         316 : replorigin_session_setup(RepOriginId node, int acquired_by)
 2902 andres                   1096 ECB             : {
                               1097                 :     static bool registered_cleanup;
 2878 bruce                    1098                 :     int         i;
 2878 bruce                    1099 GIC         316 :     int         free_slot = -1;
                               1100                 : 
 2902 andres                   1101             316 :     if (!registered_cleanup)
                               1102                 :     {
                               1103             314 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
                               1104             314 :         registered_cleanup = true;
                               1105                 :     }
                               1106                 : 
                               1107             316 :     Assert(max_replication_slots > 0);
                               1108                 : 
                               1109             316 :     if (session_replication_state != NULL)
                               1110               1 :         ereport(ERROR,
                               1111                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1112                 :                  errmsg("cannot setup replication origin when one is already setup")));
                               1113                 : 
                               1114                 :     /* Lock exclusively, as we may have to create a new table entry. */
                               1115             315 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
                               1116                 : 
                               1117                 :     /*
                               1118                 :      * Search for either an existing slot for the origin, or a free one we can
 2902 andres                   1119 ECB             :      * use.
                               1120                 :      */
 2902 andres                   1121 GIC        3453 :     for (i = 0; i < max_replication_slots; i++)
                               1122                 :     {
 2902 andres                   1123 CBC        3138 :         ReplicationState *curstate = &replication_states[i];
                               1124                 : 
 2902 andres                   1125 ECB             :         /* remember where to insert if necessary */
 2902 andres                   1126 GIC        3138 :         if (curstate->roident == InvalidRepOriginId &&
 2902 andres                   1127 ECB             :             free_slot == -1)
                               1128                 :         {
 2902 andres                   1129 GIC         315 :             free_slot = i;
                               1130             315 :             continue;
 2902 andres                   1131 ECB             :         }
                               1132                 : 
                               1133                 :         /* not our slot */
 2902 andres                   1134 CBC        2823 :         if (curstate->roident != node)
 2902 andres                   1135 GIC        2579 :             continue;
                               1136                 : 
   90 akapila                  1137 GNC         244 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
                               1138                 :         {
 2902 andres                   1139 LBC           0 :             ereport(ERROR,
                               1140                 :                     (errcode(ERRCODE_OBJECT_IN_USE),
                               1141                 :                      errmsg("replication origin with ID %d is already active for PID %d",
                               1142                 :                             curstate->roident, curstate->acquired_by)));
                               1143                 :         }
                               1144                 : 
 2902 andres                   1145 ECB             :         /* ok, found slot */
 2902 andres                   1146 GIC         244 :         session_replication_state = curstate;
 2902 andres                   1147 ECB             :     }
                               1148                 : 
                               1149                 : 
 2902 andres                   1150 CBC         315 :     if (session_replication_state == NULL && free_slot == -1)
 2902 andres                   1151 UIC           0 :         ereport(ERROR,
                               1152                 :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
  229 john.naylor              1153 ECB             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
 2902 andres                   1154                 :                         node),
                               1155                 :                  errhint("Increase max_replication_slots and try again.")));
 2902 andres                   1156 GIC         315 :     else if (session_replication_state == NULL)
                               1157                 :     {
 2902 andres                   1158 ECB             :         /* initialize new slot */
 2902 andres                   1159 CBC          71 :         session_replication_state = &replication_states[free_slot];
 2902 andres                   1160 GIC          71 :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
 2902 andres                   1161 CBC          71 :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
 2902 andres                   1162 GIC          71 :         session_replication_state->roident = node;
 2902 andres                   1163 EUB             :     }
                               1164                 : 
                               1165                 : 
 2902 andres                   1166 GIC         315 :     Assert(session_replication_state->roident != InvalidRepOriginId);
                               1167                 : 
   90 akapila                  1168 GNC         315 :     if (acquired_by == 0)
                               1169             305 :         session_replication_state->acquired_by = MyProcPid;
                               1170              10 :     else if (session_replication_state->acquired_by != acquired_by)
   90 akapila                  1171 UNC           0 :         elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
                               1172                 :              node, acquired_by);
                               1173                 : 
 2902 andres                   1174 CBC         315 :     LWLockRelease(ReplicationOriginLock);
                               1175                 : 
                               1176                 :     /* probably this one is pointless */
 2070 alvherre                 1177 GIC         315 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
 2902 andres                   1178 CBC         315 : }
 2902 andres                   1179 EUB             : 
                               1180                 : /*
                               1181                 :  * Reset replay state previously setup in this session.
                               1182                 :  *
                               1183                 :  * This function may only be called if an origin was setup with
 2902 andres                   1184 ECB             :  * replorigin_session_setup().
                               1185                 :  */
                               1186                 : void
 2902 andres                   1187 CBC         152 : replorigin_session_reset(void)
 2902 andres                   1188 ECB             : {
 2064 tgl                      1189                 :     ConditionVariable *cv;
 2070 alvherre                 1190                 : 
 2902 andres                   1191 GIC         152 :     Assert(max_replication_slots != 0);
                               1192                 : 
                               1193             152 :     if (session_replication_state == NULL)
 2902 andres                   1194 CBC           1 :         ereport(ERROR,
                               1195                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 2902 andres                   1196 ECB             :                  errmsg("no replication origin is configured")));
                               1197                 : 
 2902 andres                   1198 CBC         151 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 2902 andres                   1199 EUB             : 
 2902 andres                   1200 GIC         151 :     session_replication_state->acquired_by = 0;
 2070 alvherre                 1201             151 :     cv = &session_replication_state->origin_cv;
 2902 andres                   1202 CBC         151 :     session_replication_state = NULL;
                               1203                 : 
 2902 andres                   1204 GIC         151 :     LWLockRelease(ReplicationOriginLock);
 2070 alvherre                 1205 ECB             : 
 2070 alvherre                 1206 CBC         151 :     ConditionVariableBroadcast(cv);
 2902 andres                   1207 GIC         151 : }
                               1208                 : 
                               1209                 : /*
                               1210                 :  * Do the same work replorigin_advance() does, just on the session's
                               1211                 :  * configured origin.
                               1212                 :  *
                               1213                 :  * This is noticeably cheaper than using replorigin_advance().
                               1214                 :  */
 2902 andres                   1215 ECB             : void
 2902 andres                   1216 GIC         912 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
                               1217                 : {
                               1218             912 :     Assert(session_replication_state != NULL);
 2902 andres                   1219 CBC         912 :     Assert(session_replication_state->roident != InvalidRepOriginId);
                               1220                 : 
                               1221             912 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
                               1222             912 :     if (session_replication_state->local_lsn < local_commit)
 2902 andres                   1223 GIC         912 :         session_replication_state->local_lsn = local_commit;
                               1224             912 :     if (session_replication_state->remote_lsn < remote_commit)
                               1225             440 :         session_replication_state->remote_lsn = remote_commit;
 2902 andres                   1226 CBC         912 :     LWLockRelease(&session_replication_state->lock);
 2902 andres                   1227 GIC         912 : }
 2902 andres                   1228 ECB             : 
                               1229                 : /*
                               1230                 :  * Ask the machinery about the point up to which we successfully replayed
                               1231                 :  * changes from an already setup replication origin.
                               1232                 :  */
                               1233                 : XLogRecPtr
 2902 andres                   1234 CBC         149 : replorigin_session_get_progress(bool flush)
 2902 andres                   1235 ECB             : {
                               1236                 :     XLogRecPtr  remote_lsn;
                               1237                 :     XLogRecPtr  local_lsn;
                               1238                 : 
 2902 andres                   1239 GIC         149 :     Assert(session_replication_state != NULL);
                               1240                 : 
                               1241             149 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
                               1242             149 :     remote_lsn = session_replication_state->remote_lsn;
                               1243             149 :     local_lsn = session_replication_state->local_lsn;
 2902 andres                   1244 CBC         149 :     LWLockRelease(&session_replication_state->lock);
                               1245                 : 
                               1246             149 :     if (flush && local_lsn != InvalidXLogRecPtr)
                               1247               1 :         XLogFlush(local_lsn);
                               1248                 : 
                               1249             149 :     return remote_lsn;
 2902 andres                   1250 ECB             : }
                               1251                 : 
                               1252                 : 
                               1253                 : 
                               1254                 : /* ---------------------------------------------------------------------------
                               1255                 :  * SQL functions for working with replication origin.
                               1256                 :  *
                               1257                 :  * These mostly should be fairly short wrappers around more generic functions.
                               1258                 :  * ---------------------------------------------------------------------------
                               1259                 :  */
                               1260                 : 
                               1261                 : /*
                               1262                 :  * Create replication origin for the passed in name, and return the assigned
                               1263                 :  * oid.
                               1264                 :  */
                               1265                 : Datum
 2902 andres                   1266 GIC           8 : pg_replication_origin_create(PG_FUNCTION_ARGS)
 2902 andres                   1267 ECB             : {
                               1268                 :     char       *name;
                               1269                 :     RepOriginId roident;
                               1270                 : 
 2902 andres                   1271 CBC           8 :     replorigin_check_prerequisites(false, false);
 2902 andres                   1272 ECB             : 
 2902 andres                   1273 GIC           8 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 1380 tgl                      1274 ECB             : 
                               1275                 :     /*
                               1276                 :      * Replication origins "any and "none" are reserved for system options.
                               1277                 :      * The origins "pg_xxx" are reserved for internal use.
                               1278                 :      */
  262 akapila                  1279 GNC           8 :     if (IsReservedName(name) || IsReservedOriginName(name))
 1380 tgl                      1280 CBC           3 :         ereport(ERROR,
                               1281                 :                 (errcode(ERRCODE_RESERVED_NAME),
                               1282                 :                  errmsg("replication origin name \"%s\" is reserved",
                               1283                 :                         name),
                               1284                 :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
                               1285                 :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
                               1286                 : 
                               1287                 :     /*
                               1288                 :      * If built with appropriate switch, whine when regression-testing
                               1289                 :      * conventions for replication origin names are violated.
                               1290                 :      */
                               1291                 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
                               1292                 :     if (strncmp(name, "regress_", 8) != 0)
                               1293                 :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
                               1294                 : #endif
                               1295                 : 
 2902 andres                   1296 GIC           5 :     roident = replorigin_create(name);
                               1297                 : 
 2902 andres                   1298 CBC           4 :     pfree(name);
                               1299                 : 
 2902 andres                   1300 GIC           4 :     PG_RETURN_OID(roident);
                               1301                 : }
                               1302                 : 
 2902 andres                   1303 ECB             : /*
                               1304                 :  * Drop replication origin.
                               1305                 :  */
                               1306                 : Datum
 2902 andres                   1307 GIC           5 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
                               1308                 : {
                               1309                 :     char       *name;
                               1310                 : 
 2902 andres                   1311 CBC           5 :     replorigin_check_prerequisites(false, false);
 2902 andres                   1312 ECB             : 
 2902 andres                   1313 GIC           5 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
                               1314                 : 
  788 akapila                  1315               5 :     replorigin_drop_by_name(name, false, true);
                               1316                 : 
 2902 andres                   1317               4 :     pfree(name);
                               1318                 : 
                               1319               4 :     PG_RETURN_VOID();
                               1320                 : }
                               1321                 : 
                               1322                 : /*
                               1323                 :  * Return oid of a replication origin.
                               1324                 :  */
                               1325                 : Datum
 2902 andres                   1326 UIC           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
                               1327                 : {
 2878 bruce                    1328 ECB             :     char       *name;
                               1329                 :     RepOriginId roident;
 2902 andres                   1330                 : 
 2902 andres                   1331 UIC           0 :     replorigin_check_prerequisites(false, false);
 2902 andres                   1332 ECB             : 
 2902 andres                   1333 UIC           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
                               1334               0 :     roident = replorigin_by_name(name, true);
                               1335                 : 
                               1336               0 :     pfree(name);
                               1337                 : 
                               1338               0 :     if (OidIsValid(roident))
 2902 andres                   1339 LBC           0 :         PG_RETURN_OID(roident);
 2902 andres                   1340 UIC           0 :     PG_RETURN_NULL();
                               1341                 : }
                               1342                 : 
 2902 andres                   1343 ECB             : /*
                               1344                 :  * Setup a replication origin for this session.
                               1345                 :  */
                               1346                 : Datum
 2902 andres                   1347 CBC           5 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
                               1348                 : {
 2878 bruce                    1349 ECB             :     char       *name;
                               1350                 :     RepOriginId origin;
 2902 andres                   1351                 : 
 2902 andres                   1352 GIC           5 :     replorigin_check_prerequisites(true, false);
                               1353                 : 
                               1354               5 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
                               1355               5 :     origin = replorigin_by_name(name, false);
   90 akapila                  1356 GNC           4 :     replorigin_session_setup(origin, 0);
                               1357                 : 
 2750 alvherre                 1358 GBC           3 :     replorigin_session_origin = origin;
                               1359                 : 
 2902 andres                   1360 GIC           3 :     pfree(name);
                               1361                 : 
                               1362               3 :     PG_RETURN_VOID();
 2902 andres                   1363 EUB             : }
                               1364                 : 
                               1365                 : /*
                               1366                 :  * Reset previously setup origin in this session
                               1367                 :  */
                               1368                 : Datum
 2902 andres                   1369 GIC           4 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
 2902 andres                   1370 EUB             : {
 2902 andres                   1371 GBC           4 :     replorigin_check_prerequisites(true, false);
 2902 andres                   1372 EUB             : 
 2902 andres                   1373 GIC           4 :     replorigin_session_reset();
                               1374                 : 
 2750 alvherre                 1375               3 :     replorigin_session_origin = InvalidRepOriginId;
                               1376               3 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
                               1377               3 :     replorigin_session_origin_timestamp = 0;
                               1378                 : 
 2902 andres                   1379 CBC           3 :     PG_RETURN_VOID();
                               1380                 : }
                               1381                 : 
                               1382                 : /*
                               1383                 :  * Has a replication origin been setup for this session.
 2902 andres                   1384 ECB             :  */
                               1385                 : Datum
 2902 andres                   1386 LBC           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
 2902 andres                   1387 ECB             : {
 2902 andres                   1388 LBC           0 :     replorigin_check_prerequisites(false, false);
                               1389                 : 
 2750 alvherre                 1390               0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
                               1391                 : }
 2902 andres                   1392 ECB             : 
                               1393                 : 
                               1394                 : /*
                               1395                 :  * Return the replication progress for origin setup in the current session.
                               1396                 :  *
                               1397                 :  * If 'flush' is set to true it is ensured that the returned value corresponds
                               1398                 :  * to a local transaction that has been flushed. This is useful if asynchronous
                               1399                 :  * commits are used when replaying replicated transactions.
                               1400                 :  */
                               1401                 : Datum
 2902 andres                   1402 GIC           2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
 2902 andres                   1403 ECB             : {
 2902 andres                   1404 GIC           2 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
 2902 andres                   1405 CBC           2 :     bool        flush = PG_GETARG_BOOL(0);
                               1406                 : 
                               1407               2 :     replorigin_check_prerequisites(true, false);
 2902 andres                   1408 ECB             : 
 2902 andres                   1409 CBC           2 :     if (session_replication_state == NULL)
 2902 andres                   1410 UIC           0 :         ereport(ERROR,
 2902 andres                   1411 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1412                 :                  errmsg("no replication origin is configured")));
                               1413                 : 
 2902 andres                   1414 GIC           2 :     remote_lsn = replorigin_session_get_progress(flush);
                               1415                 : 
                               1416               2 :     if (remote_lsn == InvalidXLogRecPtr)
 2902 andres                   1417 UIC           0 :         PG_RETURN_NULL();
 2902 andres                   1418 EUB             : 
 2902 andres                   1419 GIC           2 :     PG_RETURN_LSN(remote_lsn);
 2902 andres                   1420 EUB             : }
                               1421                 : 
                               1422                 : Datum
 2902 andres                   1423 GIC           1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
                               1424                 : {
                               1425               1 :     XLogRecPtr  location = PG_GETARG_LSN(0);
                               1426                 : 
                               1427               1 :     replorigin_check_prerequisites(true, false);
                               1428                 : 
                               1429               1 :     if (session_replication_state == NULL)
 2902 andres                   1430 UIC           0 :         ereport(ERROR,
                               1431                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1432                 :                  errmsg("no replication origin is configured")));
                               1433                 : 
 2750 alvherre                 1434 CBC           1 :     replorigin_session_origin_lsn = location;
 2750 alvherre                 1435 GIC           1 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
 2902 andres                   1436 ECB             : 
 2902 andres                   1437 CBC           1 :     PG_RETURN_VOID();
                               1438                 : }
 2902 andres                   1439 ECB             : 
                               1440                 : Datum
 2902 andres                   1441 LBC           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
 2902 andres                   1442 EUB             : {
 2902 andres                   1443 UIC           0 :     replorigin_check_prerequisites(true, false);
                               1444                 : 
 2750 alvherre                 1445               0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
 2750 alvherre                 1446 LBC           0 :     replorigin_session_origin_timestamp = 0;
                               1447                 : 
 2902 andres                   1448               0 :     PG_RETURN_VOID();
 2902 andres                   1449 EUB             : }
                               1450                 : 
 2902 andres                   1451 ECB             : 
                               1452                 : Datum
 2902 andres                   1453 GIC           1 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
                               1454                 : {
 2219 noah                     1455 CBC           1 :     text       *name = PG_GETARG_TEXT_PP(0);
 2878 bruce                    1456 GIC           1 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
 2878 bruce                    1457 ECB             :     RepOriginId node;
                               1458                 : 
 2902 andres                   1459 CBC           1 :     replorigin_check_prerequisites(true, false);
                               1460                 : 
 2902 andres                   1461 ECB             :     /* lock to prevent the replication origin from vanishing */
 2902 andres                   1462 GBC           1 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1463                 : 
 2902 andres                   1464 GIC           1 :     node = replorigin_by_name(text_to_cstring(name), false);
                               1465                 : 
 2902 andres                   1466 ECB             :     /*
                               1467                 :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
                               1468                 :      * xact hasn't committed yet. This is why this function should be used to
 2881 heikki.linnakangas       1469                 :      * set up the initial replication state, but not for replay.
                               1470                 :      */
 2902 andres                   1471 UIC           0 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
                               1472                 :                        true /* go backward */ , true /* WAL log */ );
 2902 andres                   1473 EUB             : 
 2902 andres                   1474 UIC           0 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 2902 andres                   1475 EUB             : 
 2902 andres                   1476 UIC           0 :     PG_RETURN_VOID();
 2902 andres                   1477 EUB             : }
                               1478                 : 
                               1479                 : 
                               1480                 : /*
                               1481                 :  * Return the replication progress for an individual replication origin.
                               1482                 :  *
                               1483                 :  * If 'flush' is set to true it is ensured that the returned value corresponds
                               1484                 :  * to a local transaction that has been flushed. This is useful if asynchronous
 2902 andres                   1485 ECB             :  * commits are used when replaying replicated transactions.
                               1486                 :  */
                               1487                 : Datum
 2902 andres                   1488 CBC           3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
                               1489                 : {
                               1490                 :     char       *name;
 2902 andres                   1491 ECB             :     bool        flush;
                               1492                 :     RepOriginId roident;
 2902 andres                   1493 GIC           3 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
 2902 andres                   1494 ECB             : 
 2902 andres                   1495 GIC           3 :     replorigin_check_prerequisites(true, true);
 2902 andres                   1496 ECB             : 
 2902 andres                   1497 GIC           3 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
                               1498               3 :     flush = PG_GETARG_BOOL(1);
                               1499                 : 
                               1500               3 :     roident = replorigin_by_name(name, false);
                               1501               2 :     Assert(OidIsValid(roident));
                               1502                 : 
 2902 andres                   1503 GBC           2 :     remote_lsn = replorigin_get_progress(roident, flush);
                               1504                 : 
 2902 andres                   1505 GIC           2 :     if (remote_lsn == InvalidXLogRecPtr)
 2902 andres                   1506 UBC           0 :         PG_RETURN_NULL();
                               1507                 : 
 2902 andres                   1508 GBC           2 :     PG_RETURN_LSN(remote_lsn);
                               1509                 : }
                               1510                 : 
                               1511                 : 
                               1512                 : Datum
 2902 andres                   1513 GIC           2 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
                               1514                 : {
                               1515               2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1516                 :     int         i;
                               1517                 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
                               1518                 : 
                               1519                 :     /* we want to return 0 rows if slot is set to zero */
 2902 andres                   1520 CBC           2 :     replorigin_check_prerequisites(false, true);
                               1521                 : 
  173 michael                  1522 GIC           2 :     InitMaterializedSRF(fcinfo, 0);
                               1523                 : 
                               1524                 :     /* prevent slots from being concurrently dropped */
 2902 andres                   1525 CBC           2 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
                               1526                 : 
 2902 andres                   1527 ECB             :     /*
                               1528                 :      * Iterate through all possible replication_states, display if they are
                               1529                 :      * filled. Note that we do not take any locks, so slightly corrupted/out
                               1530                 :      * of date values are a possibility.
                               1531                 :      */
 2902 andres                   1532 CBC          10 :     for (i = 0; i < max_replication_slots; i++)
 2902 andres                   1533 ECB             :     {
                               1534                 :         ReplicationState *state;
                               1535                 :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
                               1536                 :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
                               1537                 :         char       *roname;
 2902 andres                   1538 EUB             : 
 2902 andres                   1539 GIC           8 :         state = &replication_states[i];
 2902 andres                   1540 ECB             : 
                               1541                 :         /* unused slot, nothing to display */
 2902 andres                   1542 GIC           8 :         if (state->roident == InvalidRepOriginId)
                               1543               6 :             continue;
                               1544                 : 
 2902 andres                   1545 CBC           2 :         memset(values, 0, sizeof(values));
 2902 andres                   1546 GIC           2 :         memset(nulls, 1, sizeof(nulls));
 2902 andres                   1547 ECB             : 
 2902 andres                   1548 GIC           2 :         values[0] = ObjectIdGetDatum(state->roident);
                               1549               2 :         nulls[0] = false;
                               1550                 : 
                               1551                 :         /*
 2902 andres                   1552 ECB             :          * We're not preventing the origin to be dropped concurrently, so
                               1553                 :          * silently accept that it might be gone.
                               1554                 :          */
 2902 andres                   1555 GIC           2 :         if (replorigin_by_oid(state->roident, true,
                               1556                 :                               &roname))
 2902 andres                   1557 ECB             :         {
 2902 andres                   1558 GIC           2 :             values[1] = CStringGetTextDatum(roname);
                               1559               2 :             nulls[1] = false;
                               1560                 :         }
                               1561                 : 
                               1562               2 :         LWLockAcquire(&state->lock, LW_SHARED);
                               1563                 : 
 2878 bruce                    1564 CBC           2 :         values[2] = LSNGetDatum(state->remote_lsn);
 2902 andres                   1565 GIC           2 :         nulls[2] = false;
                               1566                 : 
                               1567               2 :         values[3] = LSNGetDatum(state->local_lsn);
                               1568               2 :         nulls[3] = false;
                               1569                 : 
                               1570               2 :         LWLockRelease(&state->lock);
 2902 andres                   1571 ECB             : 
  398 michael                  1572 GIC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
                               1573                 :                              values, nulls);
 2902 andres                   1574 ECB             :     }
                               1575                 : 
 2902 andres                   1576 GIC           2 :     LWLockRelease(ReplicationOriginLock);
 2902 andres                   1577 ECB             : 
                               1578                 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
                               1579                 : 
 2902 andres                   1580 CBC           2 :     return (Datum) 0;
 2902 andres                   1581 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a