LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 85.2 % 474 404 2 23 44 1 19 254 23 108 49 258 1 15
Current Date: 2023-04-08 15:15:32 Functions: 90.3 % 31 28 3 25 3 3 28
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * origin.c
       4                 :  *    Logical replication progress tracking support.
       5                 :  *
       6                 :  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * IDENTIFICATION
       9                 :  *    src/backend/replication/logical/origin.c
      10                 :  *
      11                 :  * NOTES
      12                 :  *
      13                 :  * This file provides the following:
      14                 :  * * An infrastructure to name nodes in a replication setup
      15                 :  * * A facility to efficiently store and persist replication progress in an
      16                 :  *   efficient and durable manner.
      17                 :  *
      18                 :  * Replication origin consist out of a descriptive, user defined, external
      19                 :  * name and a short, thus space efficient, internal 2 byte one. This split
      20                 :  * exists because replication origin have to be stored in WAL and shared
      21                 :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22                 :  * for the internal id of a replication origin as it seems unlikely that there
      23                 :  * soon will be more than 65k nodes in one replication setup; and using only
      24                 :  * two bytes allow us to be more space efficient.
      25                 :  *
      26                 :  * Replication progress is tracked in a shared memory table
      27                 :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28                 :  * ('slots') in this table are identified by the internal id. That's the case
      29                 :  * because it allows to increase replication progress during crash
      30                 :  * recovery. To allow doing so we store the original LSN (from the originating
      31                 :  * system) of a transaction in the commit record. That allows to recover the
      32                 :  * precise replayed state after crash recovery; without requiring synchronous
      33                 :  * commits. Allowing logical replication to use asynchronous commit is
      34                 :  * generally good for performance, but especially important as it allows a
      35                 :  * single threaded replay process to keep up with a source that has multiple
      36                 :  * backends generating changes concurrently.  For efficiency and simplicity
      37                 :  * reasons a backend can setup one replication origin that's from then used as
      38                 :  * the source of changes produced by the backend, until reset again.
      39                 :  *
      40                 :  * This infrastructure is intended to be used in cooperation with logical
      41                 :  * decoding. When replaying from a remote system the configured origin is
      42                 :  * provided to output plugins, allowing prevention of replication loops and
      43                 :  * other filtering.
      44                 :  *
      45                 :  * There are several levels of locking at work:
      46                 :  *
      47                 :  * * To create and drop replication origins an exclusive lock on
      48                 :  *   pg_replication_slot is required for the duration. That allows us to
      49                 :  *   safely and conflict free assign new origins using a dirty snapshot.
      50                 :  *
      51                 :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52                 :  *   LWLock has to be held exclusively; when iterating over the replication
      53                 :  *   progress a shared lock has to be held, the same when advancing the
      54                 :  *   replication progress of an individual backend that has not setup as the
      55                 :  *   session's replication origin.
      56                 :  *
      57                 :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58                 :  *   replication progress slot that slot's lwlock has to be held. That's
      59                 :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60                 :  *   all our platforms, but it also simplifies memory ordering concerns
      61                 :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62                 :  *   so it's less harmful to hold the lock over a WAL write
      63                 :  *   (cf. AdvanceReplicationProgress).
      64                 :  *
      65                 :  * ---------------------------------------------------------------------------
      66                 :  */
      67                 : 
      68                 : #include "postgres.h"
      69                 : 
      70                 : #include <unistd.h>
      71                 : #include <sys/stat.h>
      72                 : 
      73                 : #include "access/genam.h"
      74                 : #include "access/htup_details.h"
      75                 : #include "access/table.h"
      76                 : #include "access/xact.h"
      77                 : #include "access/xloginsert.h"
      78                 : #include "catalog/catalog.h"
      79                 : #include "catalog/indexing.h"
      80                 : #include "catalog/pg_subscription.h"
      81                 : #include "funcapi.h"
      82                 : #include "miscadmin.h"
      83                 : #include "nodes/execnodes.h"
      84                 : #include "pgstat.h"
      85                 : #include "replication/logical.h"
      86                 : #include "replication/origin.h"
      87                 : #include "storage/condition_variable.h"
      88                 : #include "storage/copydir.h"
      89                 : #include "storage/fd.h"
      90                 : #include "storage/ipc.h"
      91                 : #include "storage/lmgr.h"
      92                 : #include "utils/builtins.h"
      93                 : #include "utils/fmgroids.h"
      94                 : #include "utils/pg_lsn.h"
      95                 : #include "utils/rel.h"
      96                 : #include "utils/snapmgr.h"
      97                 : #include "utils/syscache.h"
      98                 : 
      99                 : /*
     100                 :  * Replay progress of a single remote node.
     101                 :  */
     102                 : typedef struct ReplicationState
     103                 : {
     104                 :     /*
     105                 :      * Local identifier for the remote node.
     106                 :      */
     107                 :     RepOriginId roident;
     108                 : 
     109                 :     /*
     110                 :      * Location of the latest commit from the remote side.
     111                 :      */
     112                 :     XLogRecPtr  remote_lsn;
     113                 : 
     114                 :     /*
     115                 :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     116                 :      * during a checkpoint so we know the commit record actually is safe on
     117                 :      * disk.
     118                 :      */
     119                 :     XLogRecPtr  local_lsn;
     120                 : 
     121                 :     /*
     122                 :      * PID of backend that's acquired slot, or 0 if none.
     123                 :      */
     124                 :     int         acquired_by;
     125                 : 
     126                 :     /*
     127                 :      * Condition variable that's signaled when acquired_by changes.
     128                 :      */
     129                 :     ConditionVariable origin_cv;
     130                 : 
     131                 :     /*
     132                 :      * Lock protecting remote_lsn and local_lsn.
     133                 :      */
     134                 :     LWLock      lock;
     135                 : } ReplicationState;
     136                 : 
     137                 : /*
     138                 :  * On disk version of ReplicationState.
     139                 :  */
     140                 : typedef struct ReplicationStateOnDisk
     141                 : {
     142                 :     RepOriginId roident;
     143                 :     XLogRecPtr  remote_lsn;
     144                 : } ReplicationStateOnDisk;
     145                 : 
     146                 : 
     147                 : typedef struct ReplicationStateCtl
     148                 : {
     149                 :     /* Tranche to use for per-origin LWLocks */
     150                 :     int         tranche_id;
     151                 :     /* Array of length max_replication_slots */
     152                 :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     153                 : } ReplicationStateCtl;
     154                 : 
     155                 : /* external variables */
     156                 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     157                 : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     158                 : TimestampTz replorigin_session_origin_timestamp = 0;
     159                 : 
     160                 : /*
     161                 :  * Base address into a shared memory array of replication states of size
     162                 :  * max_replication_slots.
     163                 :  *
     164                 :  * XXX: Should we use a separate variable to size this rather than
     165                 :  * max_replication_slots?
     166                 :  */
     167                 : static ReplicationState *replication_states;
     168                 : 
     169                 : /*
     170                 :  * Actual shared memory block (replication_states[] is now part of this).
     171                 :  */
     172                 : static ReplicationStateCtl *replication_states_ctl;
     173                 : 
     174                 : /*
     175                 :  * Backend-local, cached element from ReplicationState for use in a backend
     176                 :  * replaying remote commits, so we don't have to search ReplicationState for
     177                 :  * the backends current RepOriginId.
     178                 :  */
     179                 : static ReplicationState *session_replication_state = NULL;
     180                 : 
     181                 : /* Magic for on disk files. */
     182                 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     183                 : 
     184                 : static void
     185 GIC          31 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
     186 ECB             : {
     187 GIC          31 :     if (check_slots && max_replication_slots == 0)
     188 LBC           0 :         ereport(ERROR,
     189 EUB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     190                 :                  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
     191                 : 
     192 GIC          31 :     if (!recoveryOK && RecoveryInProgress())
     193 LBC           0 :         ereport(ERROR,
     194 EUB             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     195                 :                  errmsg("cannot manipulate replication origins during recovery")));
     196 GIC          31 : }
     197 ECB             : 
     198                 : 
     199                 : /*
     200                 :  * IsReservedOriginName
     201                 :  *      True iff name is either "none" or "any".
     202                 :  */
     203                 : static bool
     204 GNC           7 : IsReservedOriginName(const char *name)
     205                 : {
     206              13 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     207               6 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     208                 : }
     209                 : 
     210                 : /* ---------------------------------------------------------------------------
     211                 :  * Functions for working with replication origins themselves.
     212                 :  * ---------------------------------------------------------------------------
     213                 :  */
     214                 : 
     215                 : /*
     216 ECB             :  * Check for a persistent replication origin identified by name.
     217                 :  *
     218                 :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     219                 :  */
     220                 : RepOriginId
     221 GIC         694 : replorigin_by_name(const char *roname, bool missing_ok)
     222                 : {
     223                 :     Form_pg_replication_origin ident;
     224             694 :     Oid         roident = InvalidOid;
     225                 :     HeapTuple   tuple;
     226                 :     Datum       roname_d;
     227                 : 
     228             694 :     roname_d = CStringGetTextDatum(roname);
     229                 : 
     230             694 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     231             694 :     if (HeapTupleIsValid(tuple))
     232                 :     {
     233 CBC         390 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     234 GIC         390 :         roident = ident->roident;
     235             390 :         ReleaseSysCache(tuple);
     236 ECB             :     }
     237 GIC         304 :     else if (!missing_ok)
     238               4 :         ereport(ERROR,
     239                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     240 ECB             :                  errmsg("replication origin \"%s\" does not exist",
     241                 :                         roname)));
     242                 : 
     243 CBC         690 :     return roident;
     244                 : }
     245 ECB             : 
     246                 : /*
     247                 :  * Create a replication origin.
     248                 :  *
     249                 :  * Needs to be called in a transaction.
     250                 :  */
     251                 : RepOriginId
     252 GIC         271 : replorigin_create(const char *roname)
     253                 : {
     254                 :     Oid         roident;
     255 CBC         271 :     HeapTuple   tuple = NULL;
     256                 :     Relation    rel;
     257                 :     Datum       roname_d;
     258                 :     SnapshotData SnapshotDirty;
     259                 :     SysScanDesc scan;
     260                 :     ScanKeyData key;
     261                 : 
     262 GIC         271 :     roname_d = CStringGetTextDatum(roname);
     263                 : 
     264 CBC         271 :     Assert(IsTransactionState());
     265                 : 
     266                 :     /*
     267 ECB             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     268                 :      * rely on the normal oid allocation. Instead we simply scan
     269                 :      * pg_replication_origin for the first unused id. That's not particularly
     270                 :      * efficient, but this should be a fairly infrequent operation - we can
     271                 :      * easily spend a bit more code on this when it turns out it needs to be
     272                 :      * faster.
     273                 :      *
     274                 :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     275                 :      * over the table for the duration of the search. Because we use a "dirty
     276                 :      * snapshot" we can read rows that other in-progress sessions have
     277                 :      * written, even though they would be invisible with normal snapshots. Due
     278                 :      * to the exclusive lock there's no danger that new rows can appear while
     279                 :      * we're checking.
     280                 :      */
     281 GIC         271 :     InitDirtySnapshot(SnapshotDirty);
     282                 : 
     283             271 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     284                 : 
     285             492 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     286                 :     {
     287                 :         bool        nulls[Natts_pg_replication_origin];
     288                 :         Datum       values[Natts_pg_replication_origin];
     289                 :         bool        collides;
     290                 : 
     291             492 :         CHECK_FOR_INTERRUPTS();
     292                 : 
     293 CBC         492 :         ScanKeyInit(&key,
     294                 :                     Anum_pg_replication_origin_roident,
     295 ECB             :                     BTEqualStrategyNumber, F_OIDEQ,
     296                 :                     ObjectIdGetDatum(roident));
     297                 : 
     298 GIC         492 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     299                 :                                   true /* indexOK */ ,
     300                 :                                   &SnapshotDirty,
     301                 :                                   1, &key);
     302                 : 
     303 CBC         492 :         collides = HeapTupleIsValid(systable_getnext(scan));
     304                 : 
     305             492 :         systable_endscan(scan);
     306                 : 
     307 GIC         492 :         if (!collides)
     308                 :         {
     309                 :             /*
     310 ECB             :              * Ok, found an unused roident, insert the new row and do a CCI,
     311                 :              * so our callers can look it up if they want to.
     312                 :              */
     313 GIC         271 :             memset(&nulls, 0, sizeof(nulls));
     314                 : 
     315 CBC         271 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     316 GIC         271 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     317 ECB             : 
     318 GIC         271 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     319 CBC         271 :             CatalogTupleInsert(rel, tuple);
     320 GIC         270 :             CommandCounterIncrement();
     321             270 :             break;
     322                 :         }
     323                 :     }
     324                 : 
     325 ECB             :     /* now release lock again,  */
     326 GIC         270 :     table_close(rel, ExclusiveLock);
     327 ECB             : 
     328 CBC         270 :     if (tuple == NULL)
     329 UIC           0 :         ereport(ERROR,
     330 ECB             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     331                 :                  errmsg("could not find free replication origin ID")));
     332                 : 
     333 CBC         270 :     heap_freetuple(tuple);
     334 GIC         270 :     return roident;
     335                 : }
     336                 : 
     337                 : /*
     338 ECB             :  * Helper function to drop a replication origin.
     339                 :  */
     340                 : static void
     341 GNC         221 : replorigin_state_clear(RepOriginId roident, bool nowait)
     342                 : {
     343                 :     int         i;
     344 ECB             : 
     345                 :     /*
     346                 :      * Clean up the slot state info, if there is any matching slot.
     347                 :      */
     348 GIC         221 : restart:
     349             221 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     350                 : 
     351 CBC         752 :     for (i = 0; i < max_replication_slots; i++)
     352                 :     {
     353 GIC         717 :         ReplicationState *state = &replication_states[i];
     354                 : 
     355             717 :         if (state->roident == roident)
     356                 :         {
     357                 :             /* found our slot, is it busy? */
     358 CBC         186 :             if (state->acquired_by != 0)
     359 ECB             :             {
     360                 :                 ConditionVariable *cv;
     361                 : 
     362 UIC           0 :                 if (nowait)
     363 LBC           0 :                     ereport(ERROR,
     364                 :                             (errcode(ERRCODE_OBJECT_IN_USE),
     365 ECB             :                              errmsg("could not drop replication origin with ID %d, in use by PID %d",
     366                 :                                     state->roident,
     367                 :                                     state->acquired_by)));
     368                 : 
     369                 :                 /*
     370                 :                  * We must wait and then retry.  Since we don't know which CV
     371                 :                  * to wait on until here, we can't readily use
     372 EUB             :                  * ConditionVariablePrepareToSleep (calling it here would be
     373                 :                  * wrong, since we could miss the signal if we did so); just
     374                 :                  * use ConditionVariableSleep directly.
     375                 :                  */
     376 UIC           0 :                 cv = &state->origin_cv;
     377                 : 
     378               0 :                 LWLockRelease(ReplicationOriginLock);
     379                 : 
     380               0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     381               0 :                 goto restart;
     382                 :             }
     383                 : 
     384                 :             /* first make a WAL log entry */
     385                 :             {
     386 EUB             :                 xl_replorigin_drop xlrec;
     387                 : 
     388 GBC         186 :                 xlrec.node_id = roident;
     389 GIC         186 :                 XLogBeginInsert();
     390 GBC         186 :                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     391             186 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     392                 :             }
     393                 : 
     394                 :             /* then clear the in-memory slot */
     395 GIC         186 :             state->roident = InvalidRepOriginId;
     396             186 :             state->remote_lsn = InvalidXLogRecPtr;
     397             186 :             state->local_lsn = InvalidXLogRecPtr;
     398 CBC         186 :             break;
     399 ECB             :         }
     400                 :     }
     401 CBC         221 :     LWLockRelease(ReplicationOriginLock);
     402 GIC         221 :     ConditionVariableCancelSleep();
     403             221 : }
     404                 : 
     405                 : /*
     406                 :  * Drop replication origin (by name).
     407                 :  *
     408 ECB             :  * Needs to be called in a transaction.
     409                 :  */
     410                 : void
     411 GIC         367 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     412                 : {
     413                 :     RepOriginId roident;
     414 ECB             :     Relation    rel;
     415                 :     HeapTuple   tuple;
     416                 : 
     417 CBC         367 :     Assert(IsTransactionState());
     418                 : 
     419 GNC         367 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     420 EUB             : 
     421 GIC         367 :     roident = replorigin_by_name(name, missing_ok);
     422                 : 
     423                 :     /* Lock the origin to prevent concurrent drops. */
     424 GNC         366 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     425                 :                      AccessExclusiveLock);
     426                 : 
     427             366 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     428             366 :     if (!HeapTupleIsValid(tuple))
     429                 :     {
     430             145 :         if (!missing_ok)
     431 UNC           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     432                 :                  roident);
     433                 : 
     434                 :         /*
     435                 :          * We don't need to retain the locks if the origin is already dropped.
     436                 :          */
     437 GNC         145 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     438                 :                            AccessExclusiveLock);
     439             145 :         table_close(rel, RowExclusiveLock);
     440             145 :         return;
     441                 :     }
     442                 : 
     443             221 :     replorigin_state_clear(roident, nowait);
     444                 : 
     445                 :     /*
     446                 :      * Now, we can delete the catalog entry.
     447                 :      */
     448             221 :     CatalogTupleDelete(rel, &tuple->t_self);
     449             221 :     ReleaseSysCache(tuple);
     450                 : 
     451             221 :     CommandCounterIncrement();
     452                 : 
     453 ECB             :     /* We keep the lock on pg_replication_origin until commit */
     454 GIC         221 :     table_close(rel, NoLock);
     455 ECB             : }
     456                 : 
     457                 : /*
     458                 :  * Lookup replication origin via its oid and return the name.
     459                 :  *
     460                 :  * The external name is palloc'd in the calling context.
     461                 :  *
     462                 :  * Returns true if the origin is known, false otherwise.
     463                 :  */
     464                 : bool
     465 CBC           9 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     466                 : {
     467 ECB             :     HeapTuple   tuple;
     468                 :     Form_pg_replication_origin ric;
     469                 : 
     470 CBC           9 :     Assert(OidIsValid((Oid) roident));
     471 GIC           9 :     Assert(roident != InvalidRepOriginId);
     472               9 :     Assert(roident != DoNotReplicateId);
     473                 : 
     474               9 :     tuple = SearchSysCache1(REPLORIGIDENT,
     475                 :                             ObjectIdGetDatum((Oid) roident));
     476                 : 
     477               9 :     if (HeapTupleIsValid(tuple))
     478                 :     {
     479               9 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     480               9 :         *roname = text_to_cstring(&ric->roname);
     481 CBC           9 :         ReleaseSysCache(tuple);
     482                 : 
     483 GIC           9 :         return true;
     484                 :     }
     485                 :     else
     486 ECB             :     {
     487 LBC           0 :         *roname = NULL;
     488 ECB             : 
     489 UIC           0 :         if (!missing_ok)
     490 LBC           0 :             ereport(ERROR,
     491                 :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     492                 :                      errmsg("replication origin with ID %d does not exist",
     493 ECB             :                             roident)));
     494                 : 
     495 LBC           0 :         return false;
     496 ECB             :     }
     497                 : }
     498                 : 
     499                 : 
     500                 : /* ---------------------------------------------------------------------------
     501                 :  * Functions for handling replication progress.
     502                 :  * ---------------------------------------------------------------------------
     503 EUB             :  */
     504                 : 
     505                 : Size
     506 GBC        6390 : ReplicationOriginShmemSize(void)
     507                 : {
     508 GIC        6390 :     Size        size = 0;
     509                 : 
     510                 :     /*
     511 EUB             :      * XXX: max_replication_slots is arguably the wrong thing to use, as here
     512                 :      * we keep the replay state of *remote* transactions. But for now it seems
     513                 :      * sufficient to reuse it, rather than introduce a separate GUC.
     514                 :      */
     515 GIC        6390 :     if (max_replication_slots == 0)
     516 UIC           0 :         return size;
     517                 : 
     518 GIC        6390 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     519                 : 
     520            6390 :     size = add_size(size,
     521                 :                     mul_size(max_replication_slots, sizeof(ReplicationState)));
     522 CBC        6390 :     return size;
     523                 : }
     524 ECB             : 
     525                 : void
     526 GIC        1826 : ReplicationOriginShmemInit(void)
     527                 : {
     528                 :     bool        found;
     529                 : 
     530            1826 :     if (max_replication_slots == 0)
     531 LBC           0 :         return;
     532 EUB             : 
     533 GIC        1826 :     replication_states_ctl = (ReplicationStateCtl *)
     534 CBC        1826 :         ShmemInitStruct("ReplicationOriginState",
     535                 :                         ReplicationOriginShmemSize(),
     536 ECB             :                         &found);
     537 GIC        1826 :     replication_states = replication_states_ctl->states;
     538 ECB             : 
     539 GIC        1826 :     if (!found)
     540                 :     {
     541                 :         int         i;
     542 ECB             : 
     543 GIC      130639 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     544                 : 
     545            1826 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     546 ECB             : 
     547 GBC       19967 :         for (i = 0; i < max_replication_slots; i++)
     548                 :         {
     549 CBC       18141 :             LWLockInitialize(&replication_states[i].lock,
     550           18141 :                              replication_states_ctl->tranche_id);
     551 GIC       18141 :             ConditionVariableInit(&replication_states[i].origin_cv);
     552                 :         }
     553 ECB             :     }
     554                 : }
     555                 : 
     556                 : /* ---------------------------------------------------------------------------
     557                 :  * Perform a checkpoint of each replication origin's progress with respect to
     558                 :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     559                 :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     560                 :  * if the transactions were originally committed asynchronously.
     561                 :  *
     562                 :  * We store checkpoints in the following format:
     563                 :  * +-------+------------------------+------------------+-----+--------+
     564                 :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     565                 :  * +-------+------------------------+------------------+-----+--------+
     566                 :  *
     567                 :  * So its just the magic, followed by the statically sized
     568                 :  * ReplicationStateOnDisk structs. Note that the maximum number of
     569                 :  * ReplicationState is determined by max_replication_slots.
     570                 :  * ---------------------------------------------------------------------------
     571                 :  */
     572                 : void
     573 GIC        2363 : CheckPointReplicationOrigin(void)
     574                 : {
     575            2363 :     const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
     576            2363 :     const char *path = "pg_logical/replorigin_checkpoint";
     577                 :     int         tmpfd;
     578                 :     int         i;
     579            2363 :     uint32      magic = REPLICATION_STATE_MAGIC;
     580                 :     pg_crc32c   crc;
     581                 : 
     582            2363 :     if (max_replication_slots == 0)
     583 UIC           0 :         return;
     584                 : 
     585 GIC        2363 :     INIT_CRC32C(crc);
     586                 : 
     587                 :     /* make sure no old temp file is remaining */
     588            2363 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     589 LBC           0 :         ereport(PANIC,
     590                 :                 (errcode_for_file_access(),
     591 ECB             :                  errmsg("could not remove file \"%s\": %m",
     592                 :                         tmppath)));
     593                 : 
     594                 :     /*
     595                 :      * no other backend can perform this at the same time; only one checkpoint
     596                 :      * can happen at a time.
     597                 :      */
     598 CBC        2363 :     tmpfd = OpenTransientFile(tmppath,
     599 EUB             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     600 GIC        2363 :     if (tmpfd < 0)
     601 LBC           0 :         ereport(PANIC,
     602                 :                 (errcode_for_file_access(),
     603                 :                  errmsg("could not create file \"%s\": %m",
     604 ECB             :                         tmppath)));
     605 EUB             : 
     606                 :     /* write magic */
     607 GIC        2363 :     errno = 0;
     608            2363 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     609                 :     {
     610                 :         /* if write didn't set errno, assume problem is no disk space */
     611 UIC           0 :         if (errno == 0)
     612               0 :             errno = ENOSPC;
     613               0 :         ereport(PANIC,
     614 ECB             :                 (errcode_for_file_access(),
     615                 :                  errmsg("could not write to file \"%s\": %m",
     616                 :                         tmppath)));
     617 EUB             :     }
     618 GIC        2363 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     619                 : 
     620                 :     /* prevent concurrent creations/drops */
     621            2363 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     622                 : 
     623 ECB             :     /* write actual data */
     624 CBC       25839 :     for (i = 0; i < max_replication_slots; i++)
     625                 :     {
     626                 :         ReplicationStateOnDisk disk_state;
     627 GBC       23476 :         ReplicationState *curstate = &replication_states[i];
     628 EUB             :         XLogRecPtr  local_lsn;
     629                 : 
     630 GIC       23476 :         if (curstate->roident == InvalidRepOriginId)
     631           23449 :             continue;
     632                 : 
     633                 :         /* zero, to avoid uninitialized padding bytes */
     634 CBC          27 :         memset(&disk_state, 0, sizeof(disk_state));
     635                 : 
     636 GIC          27 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     637 ECB             : 
     638 GIC          27 :         disk_state.roident = curstate->roident;
     639                 : 
     640 CBC          27 :         disk_state.remote_lsn = curstate->remote_lsn;
     641 GIC          27 :         local_lsn = curstate->local_lsn;
     642                 : 
     643 CBC          27 :         LWLockRelease(&curstate->lock);
     644                 : 
     645                 :         /* make sure we only write out a commit that's persistent */
     646              27 :         XLogFlush(local_lsn);
     647 ECB             : 
     648 GIC          27 :         errno = 0;
     649              27 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     650 ECB             :             sizeof(disk_state))
     651                 :         {
     652                 :             /* if write didn't set errno, assume problem is no disk space */
     653 UIC           0 :             if (errno == 0)
     654 LBC           0 :                 errno = ENOSPC;
     655 UIC           0 :             ereport(PANIC,
     656 ECB             :                     (errcode_for_file_access(),
     657                 :                      errmsg("could not write to file \"%s\": %m",
     658                 :                             tmppath)));
     659                 :         }
     660                 : 
     661 GIC          27 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     662 ECB             :     }
     663                 : 
     664 CBC        2363 :     LWLockRelease(ReplicationOriginLock);
     665 ECB             : 
     666                 :     /* write out the CRC */
     667 GIC        2363 :     FIN_CRC32C(crc);
     668            2363 :     errno = 0;
     669 GBC        2363 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     670 EUB             :     {
     671                 :         /* if write didn't set errno, assume problem is no disk space */
     672 UIC           0 :         if (errno == 0)
     673               0 :             errno = ENOSPC;
     674               0 :         ereport(PANIC,
     675                 :                 (errcode_for_file_access(),
     676                 :                  errmsg("could not write to file \"%s\": %m",
     677 ECB             :                         tmppath)));
     678                 :     }
     679                 : 
     680 CBC        2363 :     if (CloseTransientFile(tmpfd) != 0)
     681 UIC           0 :         ereport(PANIC,
     682                 :                 (errcode_for_file_access(),
     683 ECB             :                  errmsg("could not close file \"%s\": %m",
     684                 :                         tmppath)));
     685                 : 
     686                 :     /* fsync, rename to permanent file, fsync file and directory */
     687 GIC        2363 :     durable_rename(tmppath, path, PANIC);
     688 EUB             : }
     689                 : 
     690                 : /*
     691                 :  * Recover replication replay status from checkpoint data saved earlier by
     692                 :  * CheckPointReplicationOrigin.
     693                 :  *
     694                 :  * This only needs to be called at startup and *not* during every checkpoint
     695                 :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     696 ECB             :  * state thereafter can be recovered by looking at commit records.
     697 EUB             :  */
     698                 : void
     699 GIC        1176 : StartupReplicationOrigin(void)
     700                 : {
     701            1176 :     const char *path = "pg_logical/replorigin_checkpoint";
     702                 :     int         fd;
     703 ECB             :     int         readBytes;
     704 GIC        1176 :     uint32      magic = REPLICATION_STATE_MAGIC;
     705            1176 :     int         last_state = 0;
     706                 :     pg_crc32c   file_crc;
     707                 :     pg_crc32c   crc;
     708                 : 
     709                 :     /* don't want to overwrite already existing state */
     710                 : #ifdef USE_ASSERT_CHECKING
     711                 :     static bool already_started = false;
     712                 : 
     713            1176 :     Assert(!already_started);
     714            1176 :     already_started = true;
     715 ECB             : #endif
     716                 : 
     717 CBC        1176 :     if (max_replication_slots == 0)
     718 GIC         305 :         return;
     719                 : 
     720 CBC        1176 :     INIT_CRC32C(crc);
     721 ECB             : 
     722 GIC        1176 :     elog(DEBUG2, "starting up replication origin progress state");
     723                 : 
     724            1176 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     725                 : 
     726                 :     /*
     727                 :      * might have had max_replication_slots == 0 last run, or we just brought
     728                 :      * up a standby.
     729 ECB             :      */
     730 CBC        1176 :     if (fd < 0 && errno == ENOENT)
     731 GIC         305 :         return;
     732             871 :     else if (fd < 0)
     733 LBC           0 :         ereport(PANIC,
     734 ECB             :                 (errcode_for_file_access(),
     735                 :                  errmsg("could not open file \"%s\": %m",
     736                 :                         path)));
     737                 : 
     738                 :     /* verify magic, that is written even if nothing was active */
     739 GIC         871 :     readBytes = read(fd, &magic, sizeof(magic));
     740 CBC         871 :     if (readBytes != sizeof(magic))
     741                 :     {
     742 UIC           0 :         if (readBytes < 0)
     743               0 :             ereport(PANIC,
     744                 :                     (errcode_for_file_access(),
     745                 :                      errmsg("could not read file \"%s\": %m",
     746 ECB             :                             path)));
     747                 :         else
     748 LBC           0 :             ereport(PANIC,
     749 EUB             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     750                 :                      errmsg("could not read file \"%s\": read %d of %zu",
     751                 :                             path, readBytes, sizeof(magic))));
     752                 :     }
     753 GIC         871 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     754                 : 
     755 CBC         871 :     if (magic != REPLICATION_STATE_MAGIC)
     756 LBC           0 :         ereport(PANIC,
     757                 :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     758 EUB             :                         magic, REPLICATION_STATE_MAGIC)));
     759                 : 
     760                 :     /* we can skip locking here, no other access is possible */
     761                 : 
     762                 :     /* recover individual states, until there are no more to be found */
     763                 :     while (true)
     764 GBC           3 :     {
     765                 :         ReplicationStateOnDisk disk_state;
     766                 : 
     767 GIC         874 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     768                 : 
     769 ECB             :         /* no further data */
     770 GIC         874 :         if (readBytes == sizeof(crc))
     771 ECB             :         {
     772 EUB             :             /* not pretty, but simple ... */
     773 GIC         871 :             file_crc = *(pg_crc32c *) &disk_state;
     774             871 :             break;
     775                 :         }
     776                 : 
     777               3 :         if (readBytes < 0)
     778                 :         {
     779 UIC           0 :             ereport(PANIC,
     780 ECB             :                     (errcode_for_file_access(),
     781                 :                      errmsg("could not read file \"%s\": %m",
     782                 :                             path)));
     783                 :         }
     784                 : 
     785 GIC           3 :         if (readBytes != sizeof(disk_state))
     786 ECB             :         {
     787 UIC           0 :             ereport(PANIC,
     788                 :                     (errcode_for_file_access(),
     789 ECB             :                      errmsg("could not read file \"%s\": read %d of %zu",
     790                 :                             path, readBytes, sizeof(disk_state))));
     791                 :         }
     792                 : 
     793 CBC           3 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     794                 : 
     795 GBC           3 :         if (last_state == max_replication_slots)
     796 UIC           0 :             ereport(PANIC,
     797                 :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     798                 :                      errmsg("could not find free replication state, increase max_replication_slots")));
     799                 : 
     800                 :         /* copy data to shared memory */
     801 CBC           3 :         replication_states[last_state].roident = disk_state.roident;
     802 GIC           3 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     803 GBC           3 :         last_state++;
     804                 : 
     805 GIC           3 :         ereport(LOG,
     806                 :                 (errmsg("recovered replication state of node %d to %X/%X",
     807                 :                         disk_state.roident,
     808                 :                         LSN_FORMAT_ARGS(disk_state.remote_lsn))));
     809 ECB             :     }
     810                 : 
     811                 :     /* now check checksum */
     812 GBC         871 :     FIN_CRC32C(crc);
     813 GIC         871 :     if (file_crc != crc)
     814 UIC           0 :         ereport(PANIC,
     815                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
     816                 :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     817 ECB             :                         crc, file_crc)));
     818                 : 
     819 CBC         871 :     if (CloseTransientFile(fd) != 0)
     820 UIC           0 :         ereport(PANIC,
     821 ECB             :                 (errcode_for_file_access(),
     822                 :                  errmsg("could not close file \"%s\": %m",
     823                 :                         path)));
     824                 : }
     825                 : 
     826                 : void
     827 GIC           6 : replorigin_redo(XLogReaderState *record)
     828 ECB             : {
     829 CBC           6 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     830 EUB             : 
     831 GIC           6 :     switch (info)
     832                 :     {
     833               3 :         case XLOG_REPLORIGIN_SET:
     834                 :             {
     835 CBC           3 :                 xl_replorigin_set *xlrec =
     836 GBC           3 :                 (xl_replorigin_set *) XLogRecGetData(record);
     837                 : 
     838 GIC           3 :                 replorigin_advance(xlrec->node_id,
     839                 :                                    xlrec->remote_lsn, record->EndRecPtr,
     840               3 :                                    xlrec->force /* backward */ ,
     841                 :                                    false /* WAL log */ );
     842               3 :                 break;
     843 ECB             :             }
     844 GIC           3 :         case XLOG_REPLORIGIN_DROP:
     845 ECB             :             {
     846                 :                 xl_replorigin_drop *xlrec;
     847                 :                 int         i;
     848                 : 
     849 CBC           3 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     850                 : 
     851               4 :                 for (i = 0; i < max_replication_slots; i++)
     852 ECB             :                 {
     853 GIC           4 :                     ReplicationState *state = &replication_states[i];
     854 ECB             : 
     855                 :                     /* found our slot */
     856 CBC           4 :                     if (state->roident == xlrec->node_id)
     857                 :                     {
     858 ECB             :                         /* reset entry */
     859 GIC           3 :                         state->roident = InvalidRepOriginId;
     860 CBC           3 :                         state->remote_lsn = InvalidXLogRecPtr;
     861 GIC           3 :                         state->local_lsn = InvalidXLogRecPtr;
     862               3 :                         break;
     863                 :                     }
     864                 :                 }
     865 CBC           3 :                 break;
     866                 :             }
     867 LBC           0 :         default:
     868 UIC           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     869 ECB             :     }
     870 GIC           6 : }
     871                 : 
     872 ECB             : 
     873                 : /*
     874                 :  * Tell the replication origin progress machinery that a commit from 'node'
     875                 :  * that originated at the LSN remote_commit on the remote node was replayed
     876                 :  * successfully and that we don't need to do so again. In combination with
     877                 :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     878                 :  * that ensures we won't lose knowledge about that after a crash if the
     879                 :  * transaction had a persistent effect (think of asynchronous commits).
     880                 :  *
     881                 :  * local_commit needs to be a local LSN of the commit so that we can make sure
     882                 :  * upon a checkpoint that enough WAL has been persisted to disk.
     883 EUB             :  *
     884                 :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     885                 :  * unless running in recovery.
     886 ECB             :  */
     887                 : void
     888 GIC         200 : replorigin_advance(RepOriginId node,
     889                 :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     890                 :                    bool go_backward, bool wal_log)
     891                 : {
     892                 :     int         i;
     893             200 :     ReplicationState *replication_state = NULL;
     894             200 :     ReplicationState *free_state = NULL;
     895                 : 
     896             200 :     Assert(node != InvalidRepOriginId);
     897                 : 
     898                 :     /* we don't track DoNotReplicateId */
     899             200 :     if (node == DoNotReplicateId)
     900 UIC           0 :         return;
     901                 : 
     902                 :     /*
     903                 :      * XXX: For the case where this is called by WAL replay, it'd be more
     904 ECB             :      * efficient to restore into a backend local hashtable and only dump into
     905                 :      * shmem after recovery is finished. Let's wait with implementing that
     906                 :      * till it's shown to be a measurable expense
     907                 :      */
     908                 : 
     909                 :     /* Lock exclusively, as we may have to create a new table entry. */
     910 CBC         200 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     911                 : 
     912 ECB             :     /*
     913                 :      * Search for either an existing slot for the origin, or a free one we can
     914                 :      * use.
     915                 :      */
     916 GBC        1749 :     for (i = 0; i < max_replication_slots; i++)
     917                 :     {
     918 GIC        1595 :         ReplicationState *curstate = &replication_states[i];
     919                 : 
     920                 :         /* remember where to insert if necessary */
     921            1595 :         if (curstate->roident == InvalidRepOriginId &&
     922                 :             free_state == NULL)
     923                 :         {
     924             156 :             free_state = curstate;
     925             156 :             continue;
     926 ECB             :         }
     927                 : 
     928                 :         /* not our slot */
     929 GIC        1439 :         if (curstate->roident != node)
     930                 :         {
     931            1393 :             continue;
     932 ECB             :         }
     933                 : 
     934                 :         /* ok, found slot */
     935 GIC          46 :         replication_state = curstate;
     936                 : 
     937 CBC          46 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     938                 : 
     939                 :         /* Make sure it's not used by somebody else */
     940              46 :         if (replication_state->acquired_by != 0)
     941 ECB             :         {
     942 UIC           0 :             ereport(ERROR,
     943                 :                     (errcode(ERRCODE_OBJECT_IN_USE),
     944                 :                      errmsg("replication origin with ID %d is already active for PID %d",
     945 ECB             :                             replication_state->roident,
     946                 :                             replication_state->acquired_by)));
     947                 :         }
     948                 : 
     949 GIC          46 :         break;
     950                 :     }
     951 ECB             : 
     952 GIC         200 :     if (replication_state == NULL && free_state == NULL)
     953 LBC           0 :         ereport(ERROR,
     954                 :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     955                 :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     956 ECB             :                         node),
     957                 :                  errhint("Increase max_replication_slots and try again.")));
     958 EUB             : 
     959 GIC         200 :     if (replication_state == NULL)
     960                 :     {
     961                 :         /* initialize new slot */
     962             154 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     963             154 :         replication_state = free_state;
     964             154 :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     965 CBC         154 :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     966 GIC         154 :         replication_state->roident = node;
     967                 :     }
     968 ECB             : 
     969 GBC         200 :     Assert(replication_state->roident != InvalidRepOriginId);
     970                 : 
     971                 :     /*
     972                 :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     973                 :      * and the standby gets the message. Primarily this will be called during
     974                 :      * WAL replay (of commit records) where no WAL logging is necessary.
     975 ECB             :      */
     976 GIC         200 :     if (wal_log)
     977                 :     {
     978 ECB             :         xl_replorigin_set xlrec;
     979                 : 
     980 CBC         155 :         xlrec.remote_lsn = remote_commit;
     981             155 :         xlrec.node_id = node;
     982             155 :         xlrec.force = go_backward;
     983                 : 
     984 GIC         155 :         XLogBeginInsert();
     985 CBC         155 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     986                 : 
     987 GIC         155 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
     988                 :     }
     989                 : 
     990                 :     /*
     991                 :      * Due to - harmless - race conditions during a checkpoint we could see
     992 ECB             :      * values here that are older than the ones we already have in memory. We
     993                 :      * could also see older values for prepared transactions when the prepare
     994                 :      * is sent at a later point of time along with commit prepared and there
     995                 :      * are other transactions commits between prepare and commit prepared. See
     996                 :      * ReorderBufferFinishPrepared. Don't overwrite those.
     997                 :      */
     998 CBC         200 :     if (go_backward || replication_state->remote_lsn < remote_commit)
     999 GIC         189 :         replication_state->remote_lsn = remote_commit;
    1000 CBC         200 :     if (local_commit != InvalidXLogRecPtr &&
    1001              42 :         (go_backward || replication_state->local_lsn < local_commit))
    1002 GIC          45 :         replication_state->local_lsn = local_commit;
    1003 CBC         200 :     LWLockRelease(&replication_state->lock);
    1004                 : 
    1005                 :     /*
    1006                 :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1007                 :      * otherwise be dropped anytime.
    1008                 :      */
    1009 GIC         200 :     LWLockRelease(ReplicationOriginLock);
    1010                 : }
    1011                 : 
    1012                 : 
    1013                 : XLogRecPtr
    1014 CBC           8 : replorigin_get_progress(RepOriginId node, bool flush)
    1015 ECB             : {
    1016                 :     int         i;
    1017 CBC           8 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1018               8 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1019 ECB             : 
    1020                 :     /* prevent slots from being concurrently dropped */
    1021 GIC           8 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1022                 : 
    1023              38 :     for (i = 0; i < max_replication_slots; i++)
    1024                 :     {
    1025 ECB             :         ReplicationState *state;
    1026                 : 
    1027 GIC          35 :         state = &replication_states[i];
    1028                 : 
    1029              35 :         if (state->roident == node)
    1030 ECB             :         {
    1031 GIC           5 :             LWLockAcquire(&state->lock, LW_SHARED);
    1032                 : 
    1033 CBC           5 :             remote_lsn = state->remote_lsn;
    1034               5 :             local_lsn = state->local_lsn;
    1035                 : 
    1036 GIC           5 :             LWLockRelease(&state->lock);
    1037 ECB             : 
    1038 GIC           5 :             break;
    1039 ECB             :         }
    1040                 :     }
    1041                 : 
    1042 GIC           8 :     LWLockRelease(ReplicationOriginLock);
    1043 ECB             : 
    1044 GIC           8 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1045 CBC           1 :         XLogFlush(local_lsn);
    1046                 : 
    1047               8 :     return remote_lsn;
    1048                 : }
    1049 ECB             : 
    1050                 : /*
    1051                 :  * Tear down a (possibly) configured session replication origin during process
    1052                 :  * exit.
    1053                 :  */
    1054                 : static void
    1055 GIC         314 : ReplicationOriginExitCleanup(int code, Datum arg)
    1056                 : {
    1057             314 :     ConditionVariable *cv = NULL;
    1058 ECB             : 
    1059 GIC         314 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1060 ECB             : 
    1061 CBC         314 :     if (session_replication_state != NULL &&
    1062 GIC         164 :         session_replication_state->acquired_by == MyProcPid)
    1063 ECB             :     {
    1064 GIC         154 :         cv = &session_replication_state->origin_cv;
    1065                 : 
    1066             154 :         session_replication_state->acquired_by = 0;
    1067             154 :         session_replication_state = NULL;
    1068                 :     }
    1069                 : 
    1070             314 :     LWLockRelease(ReplicationOriginLock);
    1071 ECB             : 
    1072 GIC         314 :     if (cv)
    1073 CBC         154 :         ConditionVariableBroadcast(cv);
    1074 GIC         314 : }
    1075 ECB             : 
    1076                 : /*
    1077                 :  * Setup a replication origin in the shared memory struct if it doesn't
    1078                 :  * already exist and cache access to the specific ReplicationSlot so the
    1079                 :  * array doesn't have to be searched when calling
    1080                 :  * replorigin_session_advance().
    1081                 :  *
    1082                 :  * Normally only one such cached origin can exist per process so the cached
    1083                 :  * value can only be set again after the previous value is torn down with
    1084                 :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1085                 :  * (meaning the slot is not allowed to be already acquired by another process).
    1086                 :  *
    1087                 :  * However, sometimes multiple processes can safely re-use the same origin slot
    1088                 :  * (for example, multiple parallel apply processes can safely use the same
    1089                 :  * origin, provided they maintain commit order by allowing only one process to
    1090                 :  * commit at a time). For this case the first process must pass acquired_by =
    1091                 :  * 0, and then the other processes sharing that same origin can pass
    1092                 :  * acquired_by = PID of the first process.
    1093                 :  */
    1094                 : void
    1095 GNC         316 : replorigin_session_setup(RepOriginId node, int acquired_by)
    1096 ECB             : {
    1097                 :     static bool registered_cleanup;
    1098                 :     int         i;
    1099 GIC         316 :     int         free_slot = -1;
    1100                 : 
    1101             316 :     if (!registered_cleanup)
    1102                 :     {
    1103             314 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1104             314 :         registered_cleanup = true;
    1105                 :     }
    1106                 : 
    1107             316 :     Assert(max_replication_slots > 0);
    1108                 : 
    1109             316 :     if (session_replication_state != NULL)
    1110               1 :         ereport(ERROR,
    1111                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1112                 :                  errmsg("cannot setup replication origin when one is already setup")));
    1113                 : 
    1114                 :     /* Lock exclusively, as we may have to create a new table entry. */
    1115             315 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1116                 : 
    1117                 :     /*
    1118                 :      * Search for either an existing slot for the origin, or a free one we can
    1119 ECB             :      * use.
    1120                 :      */
    1121 GIC        3453 :     for (i = 0; i < max_replication_slots; i++)
    1122                 :     {
    1123 CBC        3138 :         ReplicationState *curstate = &replication_states[i];
    1124                 : 
    1125 ECB             :         /* remember where to insert if necessary */
    1126 GIC        3138 :         if (curstate->roident == InvalidRepOriginId &&
    1127 ECB             :             free_slot == -1)
    1128                 :         {
    1129 GIC         315 :             free_slot = i;
    1130             315 :             continue;
    1131 ECB             :         }
    1132                 : 
    1133                 :         /* not our slot */
    1134 CBC        2823 :         if (curstate->roident != node)
    1135 GIC        2579 :             continue;
    1136                 : 
    1137 GNC         244 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1138                 :         {
    1139 LBC           0 :             ereport(ERROR,
    1140                 :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1141                 :                      errmsg("replication origin with ID %d is already active for PID %d",
    1142                 :                             curstate->roident, curstate->acquired_by)));
    1143                 :         }
    1144                 : 
    1145 ECB             :         /* ok, found slot */
    1146 GIC         244 :         session_replication_state = curstate;
    1147 ECB             :     }
    1148                 : 
    1149                 : 
    1150 CBC         315 :     if (session_replication_state == NULL && free_slot == -1)
    1151 UIC           0 :         ereport(ERROR,
    1152                 :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1153 ECB             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1154                 :                         node),
    1155                 :                  errhint("Increase max_replication_slots and try again.")));
    1156 GIC         315 :     else if (session_replication_state == NULL)
    1157                 :     {
    1158 ECB             :         /* initialize new slot */
    1159 CBC          71 :         session_replication_state = &replication_states[free_slot];
    1160 GIC          71 :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1161 CBC          71 :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1162 GIC          71 :         session_replication_state->roident = node;
    1163 EUB             :     }
    1164                 : 
    1165                 : 
    1166 GIC         315 :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1167                 : 
    1168 GNC         315 :     if (acquired_by == 0)
    1169             305 :         session_replication_state->acquired_by = MyProcPid;
    1170              10 :     else if (session_replication_state->acquired_by != acquired_by)
    1171 UNC           0 :         elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1172                 :              node, acquired_by);
    1173                 : 
    1174 CBC         315 :     LWLockRelease(ReplicationOriginLock);
    1175                 : 
    1176                 :     /* probably this one is pointless */
    1177 GIC         315 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1178 CBC         315 : }
    1179 EUB             : 
    1180                 : /*
    1181                 :  * Reset replay state previously setup in this session.
    1182                 :  *
    1183                 :  * This function may only be called if an origin was setup with
    1184 ECB             :  * replorigin_session_setup().
    1185                 :  */
    1186                 : void
    1187 CBC         152 : replorigin_session_reset(void)
    1188 ECB             : {
    1189                 :     ConditionVariable *cv;
    1190                 : 
    1191 GIC         152 :     Assert(max_replication_slots != 0);
    1192                 : 
    1193             152 :     if (session_replication_state == NULL)
    1194 CBC           1 :         ereport(ERROR,
    1195                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1196 ECB             :                  errmsg("no replication origin is configured")));
    1197                 : 
    1198 CBC         151 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1199 EUB             : 
    1200 GIC         151 :     session_replication_state->acquired_by = 0;
    1201             151 :     cv = &session_replication_state->origin_cv;
    1202 CBC         151 :     session_replication_state = NULL;
    1203                 : 
    1204 GIC         151 :     LWLockRelease(ReplicationOriginLock);
    1205 ECB             : 
    1206 CBC         151 :     ConditionVariableBroadcast(cv);
    1207 GIC         151 : }
    1208                 : 
    1209                 : /*
    1210                 :  * Do the same work replorigin_advance() does, just on the session's
    1211                 :  * configured origin.
    1212                 :  *
    1213                 :  * This is noticeably cheaper than using replorigin_advance().
    1214                 :  */
    1215 ECB             : void
    1216 GIC         912 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1217                 : {
    1218             912 :     Assert(session_replication_state != NULL);
    1219 CBC         912 :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1220                 : 
    1221             912 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1222             912 :     if (session_replication_state->local_lsn < local_commit)
    1223 GIC         912 :         session_replication_state->local_lsn = local_commit;
    1224             912 :     if (session_replication_state->remote_lsn < remote_commit)
    1225             440 :         session_replication_state->remote_lsn = remote_commit;
    1226 CBC         912 :     LWLockRelease(&session_replication_state->lock);
    1227 GIC         912 : }
    1228 ECB             : 
    1229                 : /*
    1230                 :  * Ask the machinery about the point up to which we successfully replayed
    1231                 :  * changes from an already setup replication origin.
    1232                 :  */
    1233                 : XLogRecPtr
    1234 CBC         149 : replorigin_session_get_progress(bool flush)
    1235 ECB             : {
    1236                 :     XLogRecPtr  remote_lsn;
    1237                 :     XLogRecPtr  local_lsn;
    1238                 : 
    1239 GIC         149 :     Assert(session_replication_state != NULL);
    1240                 : 
    1241             149 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1242             149 :     remote_lsn = session_replication_state->remote_lsn;
    1243             149 :     local_lsn = session_replication_state->local_lsn;
    1244 CBC         149 :     LWLockRelease(&session_replication_state->lock);
    1245                 : 
    1246             149 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1247               1 :         XLogFlush(local_lsn);
    1248                 : 
    1249             149 :     return remote_lsn;
    1250 ECB             : }
    1251                 : 
    1252                 : 
    1253                 : 
    1254                 : /* ---------------------------------------------------------------------------
    1255                 :  * SQL functions for working with replication origin.
    1256                 :  *
    1257                 :  * These mostly should be fairly short wrappers around more generic functions.
    1258                 :  * ---------------------------------------------------------------------------
    1259                 :  */
    1260                 : 
    1261                 : /*
    1262                 :  * Create replication origin for the passed in name, and return the assigned
    1263                 :  * oid.
    1264                 :  */
    1265                 : Datum
    1266 GIC           8 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1267 ECB             : {
    1268                 :     char       *name;
    1269                 :     RepOriginId roident;
    1270                 : 
    1271 CBC           8 :     replorigin_check_prerequisites(false, false);
    1272 ECB             : 
    1273 GIC           8 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1274 ECB             : 
    1275                 :     /*
    1276                 :      * Replication origins "any and "none" are reserved for system options.
    1277                 :      * The origins "pg_xxx" are reserved for internal use.
    1278                 :      */
    1279 GNC           8 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1280 CBC           3 :         ereport(ERROR,
    1281                 :                 (errcode(ERRCODE_RESERVED_NAME),
    1282                 :                  errmsg("replication origin name \"%s\" is reserved",
    1283                 :                         name),
    1284                 :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1285                 :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1286                 : 
    1287                 :     /*
    1288                 :      * If built with appropriate switch, whine when regression-testing
    1289                 :      * conventions for replication origin names are violated.
    1290                 :      */
    1291                 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1292                 :     if (strncmp(name, "regress_", 8) != 0)
    1293                 :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1294                 : #endif
    1295                 : 
    1296 GIC           5 :     roident = replorigin_create(name);
    1297                 : 
    1298 CBC           4 :     pfree(name);
    1299                 : 
    1300 GIC           4 :     PG_RETURN_OID(roident);
    1301                 : }
    1302                 : 
    1303 ECB             : /*
    1304                 :  * Drop replication origin.
    1305                 :  */
    1306                 : Datum
    1307 GIC           5 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1308                 : {
    1309                 :     char       *name;
    1310                 : 
    1311 CBC           5 :     replorigin_check_prerequisites(false, false);
    1312 ECB             : 
    1313 GIC           5 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1314                 : 
    1315               5 :     replorigin_drop_by_name(name, false, true);
    1316                 : 
    1317               4 :     pfree(name);
    1318                 : 
    1319               4 :     PG_RETURN_VOID();
    1320                 : }
    1321                 : 
    1322                 : /*
    1323                 :  * Return oid of a replication origin.
    1324                 :  */
    1325                 : Datum
    1326 UIC           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1327                 : {
    1328 ECB             :     char       *name;
    1329                 :     RepOriginId roident;
    1330                 : 
    1331 UIC           0 :     replorigin_check_prerequisites(false, false);
    1332 ECB             : 
    1333 UIC           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1334               0 :     roident = replorigin_by_name(name, true);
    1335                 : 
    1336               0 :     pfree(name);
    1337                 : 
    1338               0 :     if (OidIsValid(roident))
    1339 LBC           0 :         PG_RETURN_OID(roident);
    1340 UIC           0 :     PG_RETURN_NULL();
    1341                 : }
    1342                 : 
    1343 ECB             : /*
    1344                 :  * Setup a replication origin for this session.
    1345                 :  */
    1346                 : Datum
    1347 CBC           5 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1348                 : {
    1349 ECB             :     char       *name;
    1350                 :     RepOriginId origin;
    1351                 : 
    1352 GIC           5 :     replorigin_check_prerequisites(true, false);
    1353                 : 
    1354               5 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1355               5 :     origin = replorigin_by_name(name, false);
    1356 GNC           4 :     replorigin_session_setup(origin, 0);
    1357                 : 
    1358 GBC           3 :     replorigin_session_origin = origin;
    1359                 : 
    1360 GIC           3 :     pfree(name);
    1361                 : 
    1362               3 :     PG_RETURN_VOID();
    1363 EUB             : }
    1364                 : 
    1365                 : /*
    1366                 :  * Reset previously setup origin in this session
    1367                 :  */
    1368                 : Datum
    1369 GIC           4 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1370 EUB             : {
    1371 GBC           4 :     replorigin_check_prerequisites(true, false);
    1372 EUB             : 
    1373 GIC           4 :     replorigin_session_reset();
    1374                 : 
    1375               3 :     replorigin_session_origin = InvalidRepOriginId;
    1376               3 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1377               3 :     replorigin_session_origin_timestamp = 0;
    1378                 : 
    1379 CBC           3 :     PG_RETURN_VOID();
    1380                 : }
    1381                 : 
    1382                 : /*
    1383                 :  * Has a replication origin been setup for this session.
    1384 ECB             :  */
    1385                 : Datum
    1386 LBC           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1387 ECB             : {
    1388 LBC           0 :     replorigin_check_prerequisites(false, false);
    1389                 : 
    1390               0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1391                 : }
    1392 ECB             : 
    1393                 : 
    1394                 : /*
    1395                 :  * Return the replication progress for origin setup in the current session.
    1396                 :  *
    1397                 :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1398                 :  * to a local transaction that has been flushed. This is useful if asynchronous
    1399                 :  * commits are used when replaying replicated transactions.
    1400                 :  */
    1401                 : Datum
    1402 GIC           2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1403 ECB             : {
    1404 GIC           2 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1405 CBC           2 :     bool        flush = PG_GETARG_BOOL(0);
    1406                 : 
    1407               2 :     replorigin_check_prerequisites(true, false);
    1408 ECB             : 
    1409 CBC           2 :     if (session_replication_state == NULL)
    1410 UIC           0 :         ereport(ERROR,
    1411 ECB             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1412                 :                  errmsg("no replication origin is configured")));
    1413                 : 
    1414 GIC           2 :     remote_lsn = replorigin_session_get_progress(flush);
    1415                 : 
    1416               2 :     if (remote_lsn == InvalidXLogRecPtr)
    1417 UIC           0 :         PG_RETURN_NULL();
    1418 EUB             : 
    1419 GIC           2 :     PG_RETURN_LSN(remote_lsn);
    1420 EUB             : }
    1421                 : 
    1422                 : Datum
    1423 GIC           1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1424                 : {
    1425               1 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1426                 : 
    1427               1 :     replorigin_check_prerequisites(true, false);
    1428                 : 
    1429               1 :     if (session_replication_state == NULL)
    1430 UIC           0 :         ereport(ERROR,
    1431                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1432                 :                  errmsg("no replication origin is configured")));
    1433                 : 
    1434 CBC           1 :     replorigin_session_origin_lsn = location;
    1435 GIC           1 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1436 ECB             : 
    1437 CBC           1 :     PG_RETURN_VOID();
    1438                 : }
    1439 ECB             : 
    1440                 : Datum
    1441 LBC           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1442 EUB             : {
    1443 UIC           0 :     replorigin_check_prerequisites(true, false);
    1444                 : 
    1445               0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1446 LBC           0 :     replorigin_session_origin_timestamp = 0;
    1447                 : 
    1448               0 :     PG_RETURN_VOID();
    1449 EUB             : }
    1450                 : 
    1451 ECB             : 
    1452                 : Datum
    1453 GIC           1 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1454                 : {
    1455 CBC           1 :     text       *name = PG_GETARG_TEXT_PP(0);
    1456 GIC           1 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1457 ECB             :     RepOriginId node;
    1458                 : 
    1459 CBC           1 :     replorigin_check_prerequisites(true, false);
    1460                 : 
    1461 ECB             :     /* lock to prevent the replication origin from vanishing */
    1462 GBC           1 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1463                 : 
    1464 GIC           1 :     node = replorigin_by_name(text_to_cstring(name), false);
    1465                 : 
    1466 ECB             :     /*
    1467                 :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1468                 :      * xact hasn't committed yet. This is why this function should be used to
    1469                 :      * set up the initial replication state, but not for replay.
    1470                 :      */
    1471 UIC           0 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1472                 :                        true /* go backward */ , true /* WAL log */ );
    1473 EUB             : 
    1474 UIC           0 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1475 EUB             : 
    1476 UIC           0 :     PG_RETURN_VOID();
    1477 EUB             : }
    1478                 : 
    1479                 : 
    1480                 : /*
    1481                 :  * Return the replication progress for an individual replication origin.
    1482                 :  *
    1483                 :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1484                 :  * to a local transaction that has been flushed. This is useful if asynchronous
    1485 ECB             :  * commits are used when replaying replicated transactions.
    1486                 :  */
    1487                 : Datum
    1488 CBC           3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1489                 : {
    1490                 :     char       *name;
    1491 ECB             :     bool        flush;
    1492                 :     RepOriginId roident;
    1493 GIC           3 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1494 ECB             : 
    1495 GIC           3 :     replorigin_check_prerequisites(true, true);
    1496 ECB             : 
    1497 GIC           3 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1498               3 :     flush = PG_GETARG_BOOL(1);
    1499                 : 
    1500               3 :     roident = replorigin_by_name(name, false);
    1501               2 :     Assert(OidIsValid(roident));
    1502                 : 
    1503 GBC           2 :     remote_lsn = replorigin_get_progress(roident, flush);
    1504                 : 
    1505 GIC           2 :     if (remote_lsn == InvalidXLogRecPtr)
    1506 UBC           0 :         PG_RETURN_NULL();
    1507                 : 
    1508 GBC           2 :     PG_RETURN_LSN(remote_lsn);
    1509                 : }
    1510                 : 
    1511                 : 
    1512                 : Datum
    1513 GIC           2 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1514                 : {
    1515               2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1516                 :     int         i;
    1517                 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1518                 : 
    1519                 :     /* we want to return 0 rows if slot is set to zero */
    1520 CBC           2 :     replorigin_check_prerequisites(false, true);
    1521                 : 
    1522 GIC           2 :     InitMaterializedSRF(fcinfo, 0);
    1523                 : 
    1524                 :     /* prevent slots from being concurrently dropped */
    1525 CBC           2 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1526                 : 
    1527 ECB             :     /*
    1528                 :      * Iterate through all possible replication_states, display if they are
    1529                 :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1530                 :      * of date values are a possibility.
    1531                 :      */
    1532 CBC          10 :     for (i = 0; i < max_replication_slots; i++)
    1533 ECB             :     {
    1534                 :         ReplicationState *state;
    1535                 :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1536                 :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1537                 :         char       *roname;
    1538 EUB             : 
    1539 GIC           8 :         state = &replication_states[i];
    1540 ECB             : 
    1541                 :         /* unused slot, nothing to display */
    1542 GIC           8 :         if (state->roident == InvalidRepOriginId)
    1543               6 :             continue;
    1544                 : 
    1545 CBC           2 :         memset(values, 0, sizeof(values));
    1546 GIC           2 :         memset(nulls, 1, sizeof(nulls));
    1547 ECB             : 
    1548 GIC           2 :         values[0] = ObjectIdGetDatum(state->roident);
    1549               2 :         nulls[0] = false;
    1550                 : 
    1551                 :         /*
    1552 ECB             :          * We're not preventing the origin to be dropped concurrently, so
    1553                 :          * silently accept that it might be gone.
    1554                 :          */
    1555 GIC           2 :         if (replorigin_by_oid(state->roident, true,
    1556                 :                               &roname))
    1557 ECB             :         {
    1558 GIC           2 :             values[1] = CStringGetTextDatum(roname);
    1559               2 :             nulls[1] = false;
    1560                 :         }
    1561                 : 
    1562               2 :         LWLockAcquire(&state->lock, LW_SHARED);
    1563                 : 
    1564 CBC           2 :         values[2] = LSNGetDatum(state->remote_lsn);
    1565 GIC           2 :         nulls[2] = false;
    1566                 : 
    1567               2 :         values[3] = LSNGetDatum(state->local_lsn);
    1568               2 :         nulls[3] = false;
    1569                 : 
    1570               2 :         LWLockRelease(&state->lock);
    1571 ECB             : 
    1572 GIC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1573                 :                              values, nulls);
    1574 ECB             :     }
    1575                 : 
    1576 GIC           2 :     LWLockRelease(ReplicationOriginLock);
    1577 ECB             : 
    1578                 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1579                 : 
    1580 CBC           2 :     return (Datum) 0;
    1581 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a