LCOV - differential code coverage report
Current view: top level - src/backend/replication - slotfuncs.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 94.3 % 318 300 4 9 5 5 147 6 142 8 152 2
Current Date: 2023-04-08 17:13:01 Functions: 93.3 % 15 14 1 9 5 1 9
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 6 6 6
Legend: Lines: hit not hit (120,180] days: 100.0 % 1 1 1
(240..) days: 94.2 % 311 293 4 9 5 5 146 142 8 149
Function coverage date bins:
(240..) days: 58.3 % 24 14 1 9 5 1 8

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * slotfuncs.c
                                  4                 :  *     Support functions for replication slots
                                  5                 :  *
                                  6                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *    src/backend/replication/slotfuncs.c
                                 10                 :  *
                                 11                 :  *-------------------------------------------------------------------------
                                 12                 :  */
                                 13                 : #include "postgres.h"
                                 14                 : 
                                 15                 : #include "access/htup_details.h"
                                 16                 : #include "access/xlog_internal.h"
                                 17                 : #include "access/xlogrecovery.h"
                                 18                 : #include "access/xlogutils.h"
                                 19                 : #include "funcapi.h"
                                 20                 : #include "miscadmin.h"
                                 21                 : #include "replication/decode.h"
                                 22                 : #include "replication/logical.h"
                                 23                 : #include "replication/slot.h"
                                 24                 : #include "utils/builtins.h"
                                 25                 : #include "utils/inval.h"
                                 26                 : #include "utils/pg_lsn.h"
                                 27                 : #include "utils/resowner.h"
                                 28                 : 
                                 29                 : /*
                                 30                 :  * Helper function for creating a new physical replication slot with
                                 31                 :  * given arguments. Note that this function doesn't release the created
                                 32                 :  * slot.
                                 33                 :  *
                                 34                 :  * If restart_lsn is a valid value, we use it without WAL reservation
                                 35                 :  * routine. So the caller must guarantee that WAL is available.
                                 36                 :  */
                                 37                 : static void
 1465 alvherre                   38 CBC          29 : create_physical_replication_slot(char *name, bool immediately_reserve,
                                 39                 :                                  bool temporary, XLogRecPtr restart_lsn)
                                 40                 : {
                                 41              29 :     Assert(!MyReplicationSlot);
                                 42                 : 
                                 43                 :     /* acquire replication slot, this will check for conflicting names */
                                 44              29 :     ReplicationSlotCreate(name, false,
                                 45                 :                           temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
                                 46                 : 
                                 47              29 :     if (immediately_reserve)
                                 48                 :     {
                                 49                 :         /* Reserve WAL as the user asked for it */
                                 50              15 :         if (XLogRecPtrIsInvalid(restart_lsn))
                                 51              11 :             ReplicationSlotReserveWal();
                                 52                 :         else
                                 53               4 :             MyReplicationSlot->data.restart_lsn = restart_lsn;
                                 54                 : 
                                 55                 :         /* Write this slot to disk */
                                 56              15 :         ReplicationSlotMarkDirty();
                                 57              15 :         ReplicationSlotSave();
                                 58                 :     }
                                 59              29 : }
                                 60                 : 
                                 61                 : /*
                                 62                 :  * SQL function for creating a new physical (streaming replication)
                                 63                 :  * replication slot.
                                 64                 :  */
                                 65                 : Datum
 3355 rhaas                      66              25 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
                                 67                 : {
                                 68              25 :     Name        name = PG_GETARG_NAME(0);
 2495                            69              25 :     bool        immediately_reserve = PG_GETARG_BOOL(1);
 2313 peter_e                    70              25 :     bool        temporary = PG_GETARG_BOOL(2);
                                 71                 :     Datum       values[2];
                                 72                 :     bool        nulls[2];
                                 73                 :     TupleDesc   tupdesc;
                                 74                 :     HeapTuple   tuple;
                                 75                 :     Datum       result;
                                 76                 : 
 3355 rhaas                      77              25 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 3355 rhaas                      78 UBC           0 :         elog(ERROR, "return type must be a row type");
                                 79                 : 
  572 michael                    80 CBC          25 :     CheckSlotPermissions();
                                 81                 : 
 3223 andres                     82              25 :     CheckSlotRequirements();
                                 83                 : 
 1465 alvherre                   84              25 :     create_physical_replication_slot(NameStr(*name),
                                 85                 :                                      immediately_reserve,
                                 86                 :                                      temporary,
                                 87                 :                                      InvalidXLogRecPtr);
                                 88                 : 
 3324 rhaas                      89              25 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 3355                            90              25 :     nulls[0] = false;
                                 91                 : 
 2798 andres                     92              25 :     if (immediately_reserve)
                                 93                 :     {
                                 94              11 :         values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
                                 95              11 :         nulls[1] = false;
                                 96                 :     }
                                 97                 :     else
                                 98              14 :         nulls[1] = true;
                                 99                 : 
 3355 rhaas                     100              25 :     tuple = heap_form_tuple(tupdesc, values, nulls);
                                101              25 :     result = HeapTupleGetDatum(tuple);
                                102                 : 
                                103              25 :     ReplicationSlotRelease();
                                104                 : 
                                105              25 :     PG_RETURN_DATUM(result);
                                106                 : }
                                107                 : 
                                108                 : 
                                109                 : /*
                                110                 :  * Helper function for creating a new logical replication slot with
                                111                 :  * given arguments. Note that this function doesn't release the created
                                112                 :  * slot.
                                113                 :  *
                                114                 :  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
                                115                 :  * caller's responsibility to ensure it's set to something sensible.
                                116                 :  */
                                117                 : static void
 1465 alvherre                  118             101 : create_logical_replication_slot(char *name, char *plugin,
                                119                 :                                 bool temporary, bool two_phase,
                                120                 :                                 XLogRecPtr restart_lsn,
                                121                 :                                 bool find_startpoint)
                                122                 : {
 3324 rhaas                     123             101 :     LogicalDecodingContext *ctx = NULL;
                                124                 : 
 3223 andres                    125             101 :     Assert(!MyReplicationSlot);
                                126                 : 
                                127                 :     /*
                                128                 :      * Acquire a logical decoding slot, this will check for conflicting names.
                                129                 :      * Initially create persistent slot as ephemeral - that allows us to
                                130                 :      * nicely handle errors during initialization because it'll get dropped if
                                131                 :      * this transaction fails. We'll make it persistent at the end. Temporary
                                132                 :      * slots can be created as temporary from beginning as they get dropped on
                                133                 :      * error as well.
                                134                 :      */
 1465 alvherre                  135             101 :     ReplicationSlotCreate(name, true,
                                136                 :                           temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
                                137                 : 
                                138                 :     /*
                                139                 :      * Create logical decoding context to find start point or, if we don't
                                140                 :      * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
                                141                 :      *
                                142                 :      * Note: when !find_startpoint this is still important, because it's at
                                143                 :      * this point that the output plugin is validated.
                                144                 :      */
                                145              97 :     ctx = CreateInitDecodingContext(plugin, NIL,
                                146                 :                                     false,  /* just catalogs is OK */
                                147                 :                                     restart_lsn,
  699 tmunro                    148              97 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
                                149                 :                                                .segment_open = wal_segment_open,
                                150                 :                                                .segment_close = wal_segment_close),
                                151                 :                                     NULL, NULL, NULL);
                                152                 : 
                                153                 :     /*
                                154                 :      * If caller needs us to determine the decoding start point, do so now.
                                155                 :      * This might take a while.
                                156                 :      */
 1118 alvherre                  157              94 :     if (find_startpoint)
                                158              88 :         DecodingContextFindStartpoint(ctx);
                                159                 : 
                                160                 :     /* don't need the decoding context anymore */
 3324 rhaas                     161              92 :     FreeDecodingContext(ctx);
 1465 alvherre                  162              92 : }
                                163                 : 
                                164                 : /*
                                165                 :  * SQL function for creating a new logical replication slot.
                                166                 :  */
                                167                 : Datum
                                168              95 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
                                169                 : {
                                170              95 :     Name        name = PG_GETARG_NAME(0);
                                171              95 :     Name        plugin = PG_GETARG_NAME(1);
                                172              95 :     bool        temporary = PG_GETARG_BOOL(2);
  767 akapila                   173              95 :     bool        two_phase = PG_GETARG_BOOL(3);
                                174                 :     Datum       result;
                                175                 :     TupleDesc   tupdesc;
                                176                 :     HeapTuple   tuple;
                                177                 :     Datum       values[2];
                                178                 :     bool        nulls[2];
                                179                 : 
 1465 alvherre                  180              95 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 1465 alvherre                  181 UBC           0 :         elog(ERROR, "return type must be a row type");
                                182                 : 
  572 michael                   183 CBC          95 :     CheckSlotPermissions();
                                184                 : 
 1465 alvherre                  185              94 :     CheckLogicalDecodingRequirements();
                                186                 : 
                                187              94 :     create_logical_replication_slot(NameStr(*name),
                                188              94 :                                     NameStr(*plugin),
                                189                 :                                     temporary,
                                190                 :                                     two_phase,
                                191                 :                                     InvalidXLogRecPtr,
                                192                 :                                     true);
                                193                 : 
                                194              86 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
                                195              86 :     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
                                196                 : 
 3324 rhaas                     197              86 :     memset(nulls, 0, sizeof(nulls));
                                198                 : 
                                199              86 :     tuple = heap_form_tuple(tupdesc, values, nulls);
                                200              86 :     result = HeapTupleGetDatum(tuple);
                                201                 : 
                                202                 :     /* ok, slot is now fully created, mark it as persistent if needed */
 2313 peter_e                   203              86 :     if (!temporary)
                                204              81 :         ReplicationSlotPersist();
 3324 rhaas                     205              86 :     ReplicationSlotRelease();
                                206                 : 
                                207              86 :     PG_RETURN_DATUM(result);
                                208                 : }
                                209                 : 
                                210                 : 
                                211                 : /*
                                212                 :  * SQL function for dropping a replication slot.
                                213                 :  */
                                214                 : Datum
 3355                           215             109 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
                                216                 : {
                                217             109 :     Name        name = PG_GETARG_NAME(0);
                                218                 : 
  572 michael                   219             109 :     CheckSlotPermissions();
                                220                 : 
 3355 rhaas                     221             107 :     CheckSlotRequirements();
                                222                 : 
 2046 alvherre                  223             107 :     ReplicationSlotDrop(NameStr(*name), true);
                                224                 : 
 3355 rhaas                     225             102 :     PG_RETURN_VOID();
                                226                 : }
                                227                 : 
                                228                 : /*
                                229                 :  * pg_get_replication_slots - SQL SRF showing all replication slots
                                230                 :  * that currently exist on the database cluster.
                                231                 :  */
                                232                 : Datum
 3355 rhaas                     233 GIC         171 : pg_get_replication_slots(PG_FUNCTION_ARGS)
 3355 rhaas                     234 ECB             : {
                                235                 : #define PG_GET_REPLICATION_SLOTS_COLS 15
 3355 rhaas                     236 GIC         171 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 1006 alvherre                  237 ECB             :     XLogRecPtr  currlsn;
                                238                 :     int         slotno;
                                239                 : 
                                240                 :     /*
                                241                 :      * We don't require any special permission to see this function's data
                                242                 :      * because nothing should be sensitive. The most critical being the slot
                                243                 :      * name, which shouldn't contain anything particularly sensitive.
                                244                 :      */
                                245                 : 
  173 michael                   246 GIC         171 :     InitMaterializedSRF(fcinfo, 0);
 3355 rhaas                     247 ECB             : 
 1006 alvherre                  248 GIC         171 :     currlsn = GetXLogWriteRecPtr();
 1006 alvherre                  249 ECB             : 
 2084 alvherre                  250 GIC         171 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 3355 rhaas                     251 CBC        1113 :     for (slotno = 0; slotno < max_replication_slots; slotno++)
 3355 rhaas                     252 ECB             :     {
 3355 rhaas                     253 GIC         942 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
 1040 tgl                       254 ECB             :         ReplicationSlot slot_contents;
                                255                 :         Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
                                256                 :         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
                                257                 :         WALAvailability walstate;
                                258                 :         int         i;
                                259                 : 
 3355 rhaas                     260 GIC         942 :         if (!slot->in_use)
 3355 rhaas                     261 CBC         623 :             continue;
 2084 alvherre                  262 ECB             : 
                                263                 :         /* Copy slot contents while holding spinlock, then examine at leisure */
 2084 alvherre                  264 GIC         319 :         SpinLockAcquire(&slot->mutex);
 1040 tgl                       265 CBC         319 :         slot_contents = *slot;
 3355 rhaas                     266             319 :         SpinLockRelease(&slot->mutex);
 3355 rhaas                     267 ECB             : 
 1040 tgl                       268 GIC         319 :         memset(values, 0, sizeof(values));
 3355 rhaas                     269 CBC         319 :         memset(nulls, 0, sizeof(nulls));
 3355 rhaas                     270 ECB             : 
 3355 rhaas                     271 GIC         319 :         i = 0;
 1040 tgl                       272 CBC         319 :         values[i++] = NameGetDatum(&slot_contents.data.name);
 3324 rhaas                     273 ECB             : 
 1040 tgl                       274 GIC         319 :         if (slot_contents.data.database == InvalidOid)
 3324 rhaas                     275 CBC         124 :             nulls[i++] = true;
 3324 rhaas                     276 ECB             :         else
 1040 tgl                       277 GIC         195 :             values[i++] = NameGetDatum(&slot_contents.data.plugin);
 3324 rhaas                     278 ECB             : 
 1040 tgl                       279 GIC         319 :         if (slot_contents.data.database == InvalidOid)
 3355 rhaas                     280 CBC         124 :             values[i++] = CStringGetTextDatum("physical");
 3355 rhaas                     281 ECB             :         else
 3355 rhaas                     282 GIC         195 :             values[i++] = CStringGetTextDatum("logical");
 3324 rhaas                     283 ECB             : 
 1040 tgl                       284 GIC         319 :         if (slot_contents.data.database == InvalidOid)
 3324 rhaas                     285 CBC         124 :             nulls[i++] = true;
 3324 rhaas                     286 ECB             :         else
 1040 tgl                       287 GIC         195 :             values[i++] = ObjectIdGetDatum(slot_contents.data.database);
 3324 rhaas                     288 ECB             : 
 1040 tgl                       289 GIC         319 :         values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
 1040 tgl                       290 CBC         319 :         values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
 2910 andres                    291 ECB             : 
 1040 tgl                       292 GIC         319 :         if (slot_contents.active_pid != 0)
 1040 tgl                       293 CBC         132 :             values[i++] = Int32GetDatum(slot_contents.active_pid);
 2910 andres                    294 ECB             :         else
 2910 andres                    295 GIC         187 :             nulls[i++] = true;
 3324 rhaas                     296 ECB             : 
 1040 tgl                       297 GIC         319 :         if (slot_contents.data.xmin != InvalidTransactionId)
 1040 tgl                       298 CBC          16 :             values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
 3355 rhaas                     299 ECB             :         else
 3355 rhaas                     300 GIC         303 :             nulls[i++] = true;
 3324 rhaas                     301 ECB             : 
 1040 tgl                       302 GIC         319 :         if (slot_contents.data.catalog_xmin != InvalidTransactionId)
 1040 tgl                       303 CBC         199 :             values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
 3324 rhaas                     304 ECB             :         else
 3324 rhaas                     305 GIC         120 :             nulls[i++] = true;
 3324 rhaas                     306 ECB             : 
 1040 tgl                       307 GIC         319 :         if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
 1040 tgl                       308 CBC         306 :             values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
 3355 rhaas                     309 ECB             :         else
 3355 rhaas                     310 GIC          13 :             nulls[i++] = true;
 3355 rhaas                     311 ECB             : 
 1040 tgl                       312 GIC         319 :         if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
 1040 tgl                       313 CBC         173 :             values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
 2799 andres                    314 ECB             :         else
 2799 andres                    315 GIC         146 :             nulls[i++] = true;
 2799 andres                    316 ECB             : 
                                317                 :         /*
                                318                 :          * If the slot has not been invalidated, test availability from
                                319                 :          * restart_lsn.
                                320                 :          */
    2 andres                    321 GNC         319 :         if (slot_contents.data.invalidated != RS_INVAL_NONE)
 1017 alvherre                  322 GIC          14 :             walstate = WALAVAIL_REMOVED;
 1017 alvherre                  323 ECB             :         else
 1017 alvherre                  324 GIC         305 :             walstate = GetWALAvailability(slot_contents.data.restart_lsn);
 1097 alvherre                  325 ECB             : 
 1097 alvherre                  326 GIC         319 :         switch (walstate)
 1097 alvherre                  327 ECB             :         {
 1097 alvherre                  328 CBC          11 :             case WALAVAIL_INVALID_LSN:
                                329              11 :                 nulls[i++] = true;
 1097 alvherre                  330 GIC          11 :                 break;
 1097 alvherre                  331 ECB             : 
 1097 alvherre                  332 CBC         291 :             case WALAVAIL_RESERVED:
                                333             291 :                 values[i++] = CStringGetTextDatum("reserved");
 1097 alvherre                  334 GIC         291 :                 break;
 1097 alvherre                  335 ECB             : 
 1019 alvherre                  336 CBC           2 :             case WALAVAIL_EXTENDED:
                                337               2 :                 values[i++] = CStringGetTextDatum("extended");
 1019 alvherre                  338 GIC           2 :                 break;
 1019 alvherre                  339 ECB             : 
 1019 alvherre                  340 CBC           1 :             case WALAVAIL_UNRESERVED:
                                341               1 :                 values[i++] = CStringGetTextDatum("unreserved");
 1019 alvherre                  342 GIC           1 :                 break;
 1019 alvherre                  343 ECB             : 
 1097 alvherre                  344 GIC          14 :             case WALAVAIL_REMOVED:
                                345                 : 
                                346                 :                 /*
                                347                 :                  * If we read the restart_lsn long enough ago, maybe that file
                                348                 :                  * has been removed by now.  However, the walsender could have
                                349                 :                  * moved forward enough that it jumped to another file after
                                350                 :                  * we looked.  If checkpointer signalled the process to
                                351                 :                  * termination, then it's definitely lost; but if a process is
                                352                 :                  * still alive, then "unreserved" seems more appropriate.
                                353                 :                  *
                                354                 :                  * If we do change it, save the state for safe_wal_size below.
 1019 alvherre                  355 ECB             :                  */
 1019 alvherre                  356 GIC          14 :                 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
                                357                 :                 {
                                358                 :                     int         pid;
 1019 alvherre                  359 ECB             : 
 1019 alvherre                  360 CBC          12 :                     SpinLockAcquire(&slot->mutex);
                                361              12 :                     pid = slot->active_pid;
 1006                           362              12 :                     slot_contents.data.restart_lsn = slot->data.restart_lsn;
 1019                           363              12 :                     SpinLockRelease(&slot->mutex);
 1019 alvherre                  364 GIC          12 :                     if (pid != 0)
 1019 alvherre                  365 EUB             :                     {
 1019 alvherre                  366 UBC           0 :                         values[i++] = CStringGetTextDatum("unreserved");
 1006                           367               0 :                         walstate = WALAVAIL_UNRESERVED;
 1019 alvherre                  368 UIC           0 :                         break;
                                369                 :                     }
 1019 alvherre                  370 ECB             :                 }
 1097 alvherre                  371 CBC          14 :                 values[i++] = CStringGetTextDatum("lost");
 1097 alvherre                  372 GIC          14 :                 break;
                                373                 :         }
                                374                 : 
                                375                 :         /*
                                376                 :          * safe_wal_size is only computed for slots that have not been lost,
                                377                 :          * and only if there's a configured maximum size.
 1006 alvherre                  378 ECB             :          */
 1006 alvherre                  379 CBC         319 :         if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
 1006 alvherre                  380 GIC         314 :             nulls[i++] = true;
                                381                 :         else
                                382                 :         {
                                383                 :             XLogSegNo   targetSeg;
                                384                 :             uint64      slotKeepSegs;
                                385                 :             uint64      keepSegs;
                                386                 :             XLogSegNo   failSeg;
                                387                 :             XLogRecPtr  failLSN;
 1097 alvherre                  388 ECB             : 
 1006 alvherre                  389 GIC           5 :             XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
                                390                 : 
  494 michael                   391 ECB             :             /* determine how many segments can be kept by slots */
  993 fujii                     392 GIC           5 :             slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
  993 fujii                     393 ECB             :             /* ditto for wal_keep_size */
  993 fujii                     394 GIC           5 :             keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
                                395                 : 
 1006 alvherre                  396 ECB             :             /* if currpos reaches failLSN, we lose our segment */
  993 fujii                     397 CBC           5 :             failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
 1006 alvherre                  398 GIC           5 :             XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
 1006 alvherre                  399 ECB             : 
 1006 alvherre                  400 GIC           5 :             values[i++] = Int64GetDatum(failLSN - currlsn);
                                401                 :         }
 1097 alvherre                  402 ECB             : 
  767 akapila                   403 GIC         319 :         values[i++] = BoolGetDatum(slot_contents.data.two_phase);
  767 akapila                   404 ECB             : 
    2 andres                    405 GNC         319 :         if (slot_contents.data.database == InvalidOid)
                                406             124 :             nulls[i++] = true;
                                407                 :         else
                                408                 :         {
                                409             195 :             if (slot_contents.data.invalidated != RS_INVAL_NONE)
                                410              12 :                 values[i++] = BoolGetDatum(true);
                                411                 :             else
                                412             183 :                 values[i++] = BoolGetDatum(false);
                                413                 :         }
                                414                 : 
 1040 tgl                       415 CBC         319 :         Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
                                416                 : 
  398 michael                   417 GIC         319 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
  398 michael                   418 ECB             :                              values, nulls);
 3355 rhaas                     419                 :     }
                                420                 : 
 2084 alvherre                  421 CBC         171 :     LWLockRelease(ReplicationSlotControlLock);
                                422                 : 
 3355 rhaas                     423 GIC         171 :     return (Datum) 0;
 3355 rhaas                     424 ECB             : }
                                425                 : 
 1908 simon                     426                 : /*
                                427                 :  * Helper function for advancing our physical replication slot forward.
                                428                 :  *
                                429                 :  * The LSN position to move to is compared simply to the slot's restart_lsn,
 1725 alvherre                  430                 :  * knowing that any position older than that would be removed by successive
                                431                 :  * checkpoints.
 1908 simon                     432                 :  */
                                433                 : static XLogRecPtr
 1763 michael                   434 GIC           1 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
                                435                 : {
                                436               1 :     XLogRecPtr  startlsn = MyReplicationSlot->data.restart_lsn;
                                437               1 :     XLogRecPtr  retlsn = startlsn;
                                438                 : 
 1097 alvherre                  439               1 :     Assert(moveto != InvalidXLogRecPtr);
                                440                 : 
 1763 michael                   441               1 :     if (startlsn < moveto)
                                442                 :     {
 1763 michael                   443 CBC           1 :         SpinLockAcquire(&MyReplicationSlot->mutex);
 1908 simon                     444 GIC           1 :         MyReplicationSlot->data.restart_lsn = moveto;
 1763 michael                   445 CBC           1 :         SpinLockRelease(&MyReplicationSlot->mutex);
 1908 simon                     446               1 :         retlsn = moveto;
                                447                 : 
 1165 michael                   448 ECB             :         /*
                                449                 :          * Dirty the slot so as it is written out at the next checkpoint. Note
 1060 tgl                       450                 :          * that the LSN position advanced may still be lost in the event of a
                                451                 :          * crash, but this makes the data consistent after a clean shutdown.
 1165 michael                   452                 :          */
 1165 michael                   453 CBC           1 :         ReplicationSlotMarkDirty();
 1908 simon                     454 ECB             :     }
                                455                 : 
 1908 simon                     456 GIC           1 :     return retlsn;
                                457                 : }
                                458                 : 
                                459                 : /*
                                460                 :  * Helper function for advancing our logical replication slot forward.
                                461                 :  *
 1200 michael                   462 ECB             :  * The slot's restart_lsn is used as start point for reading records, while
                                463                 :  * confirmed_flush is used as base point for the decoding context.
                                464                 :  *
 1725 alvherre                  465                 :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
                                466                 :  * because we need to digest WAL to advance restart_lsn allowing to recycle
                                467                 :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
                                468                 :  * mode, no changes are generated anyway.
                                469                 :  */
                                470                 : static XLogRecPtr
 1763 michael                   471 GIC           3 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
                                472                 : {
                                473                 :     LogicalDecodingContext *ctx;
 1809 tgl                       474               3 :     ResourceOwner old_resowner = CurrentResourceOwner;
                                475                 :     XLogRecPtr  retlsn;
                                476                 : 
 1097 alvherre                  477               3 :     Assert(moveto != InvalidXLogRecPtr);
                                478                 : 
 1908 simon                     479               3 :     PG_TRY();
 1908 simon                     480 ECB             :     {
                                481                 :         /*
                                482                 :          * Create our decoding context in fast_forward mode, passing start_lsn
 1725 alvherre                  483                 :          * as InvalidXLogRecPtr, so that we start processing from my slot's
                                484                 :          * confirmed_flush.
                                485                 :          */
 1908 simon                     486 CBC           6 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                487                 :                                     NIL,
 1725 alvherre                  488 ECB             :                                     true,   /* fast_forward */
  699 tmunro                    489 GIC           3 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
                                490                 :                                                .segment_open = wal_segment_open,
                                491                 :                                                .segment_close = wal_segment_close),
                                492                 :                                     NULL, NULL, NULL);
                                493                 : 
                                494                 :         /*
 1725 alvherre                  495 ECB             :          * Start reading at the slot's restart_lsn, which we know to point to
                                496                 :          * a valid record.
                                497                 :          */
 1169 heikki.linnakangas        498 CBC           3 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
                                499                 : 
                                500                 :         /* invalidate non-timetravel entries */
 1908 simon                     501 GIC           3 :         InvalidateSystemCaches();
                                502                 : 
                                503                 :         /* Decode at least one record, until we run out of records */
 1169 heikki.linnakangas        504              91 :         while (ctx->reader->EndRecPtr < moveto)
                                505                 :         {
 1908 simon                     506              91 :             char       *errm = NULL;
 1725 alvherre                  507 ECB             :             XLogRecord *record;
                                508                 : 
                                509                 :             /*
                                510                 :              * Read records.  No changes are generated in fast_forward mode,
                                511                 :              * but snapbuilder/slot statuses are updated properly.
                                512                 :              */
  699 tmunro                    513 CBC          91 :             record = XLogReadRecord(ctx->reader, &errm);
 1908 simon                     514 GIC          91 :             if (errm)
  515 michael                   515 LBC           0 :                 elog(ERROR, "could not find record while advancing replication slot: %s",
                                516                 :                      errm);
                                517                 : 
                                518                 :             /*
                                519                 :              * Process the record.  Storage-level changes are ignored in
                                520                 :              * fast_forward mode, but other modules (such as snapbuilder)
                                521                 :              * might still have critical updates to do.
 1908 simon                     522 ECB             :              */
 1725 alvherre                  523 CBC          91 :             if (record)
 1908 simon                     524 GBC          91 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
                                525                 : 
                                526                 :             /* Stop once the requested target has been reached */
 1908 simon                     527 GIC          91 :             if (moveto <= ctx->reader->EndRecPtr)
                                528               3 :                 break;
                                529                 : 
                                530              88 :             CHECK_FOR_INTERRUPTS();
                                531                 :         }
 1908 simon                     532 ECB             : 
 1726 tgl                       533                 :         /*
                                534                 :          * Logical decoding could have clobbered CurrentResourceOwner during
                                535                 :          * transaction management, so restore the executor's value.  (This is
                                536                 :          * a kluge, but it's not worth cleaning up right now.)
                                537                 :          */
 1908 simon                     538 GIC           3 :         CurrentResourceOwner = old_resowner;
 1908 simon                     539 ECB             : 
 1908 simon                     540 GIC           3 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
                                541                 :         {
                                542               3 :             LogicalConfirmReceivedLocation(moveto);
                                543                 : 
                                544                 :             /*
                                545                 :              * If only the confirmed_flush LSN has changed the slot won't get
                                546                 :              * marked as dirty by the above. Callers on the walsender
 1908 simon                     547 ECB             :              * interface are expected to keep track of their own progress and
                                548                 :              * don't need it written out. But SQL-interface users cannot
                                549                 :              * specify their own start positions and it's harder for them to
                                550                 :              * keep track of their progress, so we should make more of an
                                551                 :              * effort to save it for them.
                                552                 :              *
                                553                 :              * Dirty the slot so it is written out at the next checkpoint. The
                                554                 :              * LSN position advanced to may still be lost on a crash but this
                                555                 :              * makes the data consistent after a clean shutdown.
                                556                 :              */
 1908 simon                     557 GIC           3 :             ReplicationSlotMarkDirty();
                                558                 :         }
                                559                 : 
                                560               3 :         retlsn = MyReplicationSlot->data.confirmed_flush;
                                561                 : 
                                562                 :         /* free context, call shutdown callback */
                                563               3 :         FreeDecodingContext(ctx);
                                564                 : 
                                565               3 :         InvalidateSystemCaches();
 1908 simon                     566 ECB             :     }
 1908 simon                     567 UIC           0 :     PG_CATCH();
                                568                 :     {
 1908 simon                     569 ECB             :         /* clear all timetravel entries */
 1908 simon                     570 UIC           0 :         InvalidateSystemCaches();
                                571                 : 
 1908 simon                     572 LBC           0 :         PG_RE_THROW();
                                573                 :     }
 1908 simon                     574 CBC           3 :     PG_END_TRY();
                                575                 : 
 1908 simon                     576 GBC           3 :     return retlsn;
                                577                 : }
                                578                 : 
 1908 simon                     579 EUB             : /*
                                580                 :  * SQL function for moving the position in a replication slot.
                                581                 :  */
                                582                 : Datum
 1908 simon                     583 CBC           6 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
                                584                 : {
                                585               6 :     Name        slotname = PG_GETARG_NAME(0);
 1908 simon                     586 GIC           6 :     XLogRecPtr  moveto = PG_GETARG_LSN(1);
                                587                 :     XLogRecPtr  endlsn;
                                588                 :     XLogRecPtr  minlsn;
                                589                 :     TupleDesc   tupdesc;
                                590                 :     Datum       values[2];
                                591                 :     bool        nulls[2];
 1908 simon                     592 ECB             :     HeapTuple   tuple;
                                593                 :     Datum       result;
                                594                 : 
 1908 simon                     595 CBC           6 :     Assert(!MyReplicationSlot);
                                596                 : 
  572 michael                   597 GIC           6 :     CheckSlotPermissions();
                                598                 : 
 1908 simon                     599               6 :     if (XLogRecPtrIsInvalid(moveto))
                                600               1 :         ereport(ERROR,
                                601                 :                 (errmsg("invalid target WAL LSN")));
                                602                 : 
                                603                 :     /* Build a tuple descriptor for our result type */
 1908 simon                     604 CBC           5 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 1908 simon                     605 UIC           0 :         elog(ERROR, "return type must be a row type");
 1908 simon                     606 ECB             : 
                                607                 :     /*
                                608                 :      * We can't move slot past what's been flushed/replayed so clamp the
 1906                           609                 :      * target position accordingly.
                                610                 :      */
 1908 simon                     611 GIC           5 :     if (!RecoveryInProgress())
  520 rhaas                     612               5 :         moveto = Min(moveto, GetFlushRecPtr(NULL));
 1908 simon                     613 ECB             :     else
  520 rhaas                     614 UBC           0 :         moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
                                615                 : 
                                616                 :     /* Acquire the slot so we "own" it */
  667 alvherre                  617 GIC           5 :     ReplicationSlotAcquire(NameStr(*slotname), true);
                                618                 : 
                                619                 :     /* A slot whose restart_lsn has never been reserved cannot be advanced */
 1733 michael                   620 CBC           5 :     if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
                                621               1 :         ereport(ERROR,
                                622                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 1076 alvherre                  623 EUB             :                  errmsg("replication slot \"%s\" cannot be advanced",
                                624                 :                         NameStr(*slotname)),
                                625                 :                  errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
 1733 michael                   626 ECB             : 
                                627                 :     /*
                                628                 :      * Check if the slot is not moving backwards.  Physical slots rely simply
 1763                           629                 :      * on restart_lsn as a minimum point, while logical slots have confirmed
 1200                           630                 :      * consumption up to confirmed_flush, meaning that in both cases data
                                631                 :      * older than that is not available anymore.
                                632                 :      */
 1763 michael                   633 GIC           4 :     if (OidIsValid(MyReplicationSlot->data.database))
                                634               3 :         minlsn = MyReplicationSlot->data.confirmed_flush;
                                635                 :     else
                                636               1 :         minlsn = MyReplicationSlot->data.restart_lsn;
                                637                 : 
                                638               4 :     if (moveto < minlsn)
 1908 simon                     639 UIC           0 :         ereport(ERROR,
                                640                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                641                 :                  errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
  775 peter                     642 ECB             :                         LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
 1908 simon                     643                 : 
                                644                 :     /* Do the actual slot update, depending on the slot type */
 1908 simon                     645 CBC           4 :     if (OidIsValid(MyReplicationSlot->data.database))
 1763 michael                   646 GIC           3 :         endlsn = pg_logical_replication_slot_advance(moveto);
 1908 simon                     647 ECB             :     else
 1763 michael                   648 GBC           1 :         endlsn = pg_physical_replication_slot_advance(moveto);
                                649                 : 
 1908 simon                     650 GIC           4 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
                                651               4 :     nulls[0] = false;
                                652                 : 
                                653                 :     /*
 1025 michael                   654 ECB             :      * Recompute the minimum LSN and xmin across all slots to adjust with the
                                655                 :      * advancing potentially done.
                                656                 :      */
 1025 michael                   657 CBC           4 :     ReplicationSlotsComputeRequiredXmin(false);
 1025 michael                   658 GIC           4 :     ReplicationSlotsComputeRequiredLSN();
 1025 michael                   659 ECB             : 
 1908 simon                     660 CBC           4 :     ReplicationSlotRelease();
                                661                 : 
                                662                 :     /* Return the reached position. */
 1908 simon                     663 GIC           4 :     values[1] = LSNGetDatum(endlsn);
                                664               4 :     nulls[1] = false;
                                665                 : 
 1908 simon                     666 CBC           4 :     tuple = heap_form_tuple(tupdesc, values, nulls);
                                667               4 :     result = HeapTupleGetDatum(tuple);
                                668                 : 
                                669               4 :     PG_RETURN_DATUM(result);
                                670                 : }
                                671                 : 
 1465 alvherre                  672 ECB             : /*
                                673                 :  * Helper function of copying a replication slot.
                                674                 :  */
                                675                 : static Datum
 1465 alvherre                  676 CBC          14 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                                677                 : {
                                678              14 :     Name        src_name = PG_GETARG_NAME(0);
 1465 alvherre                  679 GIC          14 :     Name        dst_name = PG_GETARG_NAME(1);
                                680              14 :     ReplicationSlot *src = NULL;
                                681                 :     ReplicationSlot first_slot_contents;
                                682                 :     ReplicationSlot second_slot_contents;
                                683                 :     XLogRecPtr  src_restart_lsn;
                                684                 :     bool        src_islogical;
 1465 alvherre                  685 ECB             :     bool        temporary;
                                686                 :     char       *plugin;
                                687                 :     Datum       values[2];
                                688                 :     bool        nulls[2];
                                689                 :     Datum       result;
                                690                 :     TupleDesc   tupdesc;
                                691                 :     HeapTuple   tuple;
                                692                 : 
 1465 alvherre                  693 GIC          14 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
 1465 alvherre                  694 UIC           0 :         elog(ERROR, "return type must be a row type");
                                695                 : 
  572 michael                   696 GIC          14 :     CheckSlotPermissions();
                                697                 : 
 1465 alvherre                  698              14 :     if (logical_slot)
                                699               8 :         CheckLogicalDecodingRequirements();
                                700                 :     else
                                701               6 :         CheckSlotRequirements();
 1465 alvherre                  702 ECB             : 
 1465 alvherre                  703 GBC          14 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
                                704                 : 
 1465 alvherre                  705 ECB             :     /*
                                706                 :      * We need to prevent the source slot's reserved WAL from being removed,
                                707                 :      * but we don't want to lock that slot for very long, and it can advance
                                708                 :      * in the meantime.  So obtain the source slot's data, and create a new
                                709                 :      * slot using its restart_lsn.  Afterwards we lock the source slot again
                                710                 :      * and verify that the data we copied (name, type) has not changed
                                711                 :      * incompatibly.  No inconvenient WAL removal can occur once the new slot
                                712                 :      * is created -- but since WAL removal could have occurred before we
                                713                 :      * managed to create the new slot, we advance the new slot's restart_lsn
                                714                 :      * to the source slot's updated restart_lsn the second time we lock it.
                                715                 :      */
 1465 alvherre                  716 GIC          15 :     for (int i = 0; i < max_replication_slots; i++)
                                717                 :     {
                                718              15 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                                719                 : 
                                720              15 :         if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
                                721                 :         {
                                722                 :             /* Copy the slot contents while holding spinlock */
                                723              14 :             SpinLockAcquire(&s->mutex);
 1040 tgl                       724              14 :             first_slot_contents = *s;
 1465 alvherre                  725 CBC          14 :             SpinLockRelease(&s->mutex);
 1465 alvherre                  726 GIC          14 :             src = s;
 1465 alvherre                  727 CBC          14 :             break;
                                728                 :         }
 1465 alvherre                  729 ECB             :     }
                                730                 : 
 1465 alvherre                  731 GIC          14 :     LWLockRelease(ReplicationSlotControlLock);
 1465 alvherre                  732 ECB             : 
 1465 alvherre                  733 CBC          14 :     if (src == NULL)
 1465 alvherre                  734 LBC           0 :         ereport(ERROR,
 1465 alvherre                  735 ECB             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                                736                 :                  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
                                737                 : 
 1040 tgl                       738 GIC          14 :     src_islogical = SlotIsLogical(&first_slot_contents);
                                739              14 :     src_restart_lsn = first_slot_contents.data.restart_lsn;
 1040 tgl                       740 CBC          14 :     temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
 1040 tgl                       741 GIC          14 :     plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
 1040 tgl                       742 ECB             : 
 1465 alvherre                  743 EUB             :     /* Check type of replication slot */
 1465 alvherre                  744 GIC          14 :     if (src_islogical != logical_slot)
                                745               2 :         ereport(ERROR,
                                746                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 1465 alvherre                  747 ECB             :                  src_islogical ?
                                748                 :                  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
                                749                 :                         NameStr(*src_name)) :
                                750                 :                  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
                                751                 :                         NameStr(*src_name))));
                                752                 : 
                                753                 :     /* Copying non-reserved slot doesn't make sense */
 1465 alvherre                  754 CBC          12 :     if (XLogRecPtrIsInvalid(src_restart_lsn))
 1465 alvherre                  755 GIC           1 :         ereport(ERROR,
                                756                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                757                 :                  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
                                758                 : 
                                759                 :     /* Overwrite params from optional arguments */
                                760              11 :     if (PG_NARGS() >= 3)
                                761               6 :         temporary = PG_GETARG_BOOL(2);
                                762              11 :     if (PG_NARGS() >= 4)
 1465 alvherre                  763 ECB             :     {
 1465 alvherre                  764 CBC           4 :         Assert(logical_slot);
 1465 alvherre                  765 GIC           4 :         plugin = NameStr(*(PG_GETARG_NAME(3)));
                                766                 :     }
                                767                 : 
                                768                 :     /* Create new slot and acquire it */
 1465 alvherre                  769 CBC          11 :     if (logical_slot)
 1118 alvherre                  770 ECB             :     {
                                771                 :         /*
                                772                 :          * We must not try to read WAL, since we haven't reserved it yet --
                                773                 :          * hence pass find_startpoint false.  confirmed_flush will be set
                                774                 :          * below, by copying from the source slot.
                                775                 :          */
 1465 alvherre                  776 GIC           7 :         create_logical_replication_slot(NameStr(*dst_name),
                                777                 :                                         plugin,
 1465 alvherre                  778 ECB             :                                         temporary,
                                779                 :                                         false,
                                780                 :                                         src_restart_lsn,
                                781                 :                                         false);
                                782                 :     }
                                783                 :     else
 1465 alvherre                  784 GIC           4 :         create_physical_replication_slot(NameStr(*dst_name),
 1465 alvherre                  785 ECB             :                                          true,
                                786                 :                                          temporary,
                                787                 :                                          src_restart_lsn);
                                788                 : 
                                789                 :     /*
                                790                 :      * Update the destination slot to current values of the source slot;
                                791                 :      * recheck that the source slot is still the one we saw previously.
                                792                 :      */
                                793                 :     {
                                794                 :         TransactionId copy_effective_xmin;
                                795                 :         TransactionId copy_effective_catalog_xmin;
                                796                 :         TransactionId copy_xmin;
                                797                 :         TransactionId copy_catalog_xmin;
                                798                 :         XLogRecPtr  copy_restart_lsn;
                                799                 :         XLogRecPtr  copy_confirmed_flush;
                                800                 :         bool        copy_islogical;
                                801                 :         char       *copy_name;
                                802                 : 
                                803                 :         /* Copy data of source slot again */
 1465 alvherre                  804 GIC          10 :         SpinLockAcquire(&src->mutex);
 1040 tgl                       805              10 :         second_slot_contents = *src;
                                806              10 :         SpinLockRelease(&src->mutex);
                                807                 : 
                                808              10 :         copy_effective_xmin = second_slot_contents.effective_xmin;
                                809              10 :         copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
                                810                 : 
                                811              10 :         copy_xmin = second_slot_contents.data.xmin;
                                812              10 :         copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
 1040 tgl                       813 CBC          10 :         copy_restart_lsn = second_slot_contents.data.restart_lsn;
                                814              10 :         copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
 1465 alvherre                  815 ECB             : 
                                816                 :         /* for existence check */
 1040 tgl                       817 CBC          10 :         copy_name = NameStr(second_slot_contents.data.name);
                                818              10 :         copy_islogical = SlotIsLogical(&second_slot_contents);
                                819                 : 
 1465 alvherre                  820 ECB             :         /*
 1418 tgl                       821                 :          * Check if the source slot still exists and is valid. We regard it as
                                822                 :          * invalid if the type of replication slot or name has been changed,
                                823                 :          * or the restart_lsn either is invalid or has gone backward. (The
                                824                 :          * restart_lsn could go backwards if the source slot is dropped and
                                825                 :          * copied from an older slot during installation.)
 1465 alvherre                  826                 :          *
                                827                 :          * Since erroring out will release and drop the destination slot we
                                828                 :          * don't need to release it here.
                                829                 :          */
 1465 alvherre                  830 GIC          10 :         if (copy_restart_lsn < src_restart_lsn ||
                                831              10 :             src_islogical != copy_islogical ||
                                832              10 :             strcmp(copy_name, NameStr(*src_name)) != 0)
 1465 alvherre                  833 UIC           0 :             ereport(ERROR,
                                834                 :                     (errmsg("could not copy replication slot \"%s\"",
                                835                 :                             NameStr(*src_name)),
                                836                 :                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
                                837                 : 
                                838                 :         /* The source slot must have a consistent snapshot */
 1118 alvherre                  839 CBC          10 :         if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
 1118 alvherre                  840 LBC           0 :             ereport(ERROR,
 1118 alvherre                  841 ECB             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 1118 alvherre                  842 EUB             :                      errmsg("cannot copy unfinished logical replication slot \"%s\"",
                                843                 :                             NameStr(*src_name)),
                                844                 :                      errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
                                845                 : 
                                846                 :         /* Install copied values again */
 1465 alvherre                  847 GIC          10 :         SpinLockAcquire(&MyReplicationSlot->mutex);
 1465 alvherre                  848 CBC          10 :         MyReplicationSlot->effective_xmin = copy_effective_xmin;
 1465 alvherre                  849 GBC          10 :         MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
                                850                 : 
 1465 alvherre                  851 GIC          10 :         MyReplicationSlot->data.xmin = copy_xmin;
                                852              10 :         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
                                853              10 :         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
 1118                           854              10 :         MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
 1465                           855              10 :         SpinLockRelease(&MyReplicationSlot->mutex);
 1465 alvherre                  856 ECB             : 
 1465 alvherre                  857 CBC          10 :         ReplicationSlotMarkDirty();
                                858              10 :         ReplicationSlotsComputeRequiredXmin(false);
 1465 alvherre                  859 GIC          10 :         ReplicationSlotsComputeRequiredLSN();
 1465 alvherre                  860 CBC          10 :         ReplicationSlotSave();
 1465 alvherre                  861 ECB             : 
                                862                 : #ifdef USE_ASSERT_CHECKING
                                863                 :         /* Check that the restart_lsn is available */
                                864                 :         {
                                865                 :             XLogSegNo   segno;
                                866                 : 
 1465 alvherre                  867 CBC          10 :             XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
                                868              10 :             Assert(XLogGetLastRemovedSegno() < segno);
 1465 alvherre                  869 ECB             :         }
                                870                 : #endif
                                871                 :     }
                                872                 : 
                                873                 :     /* target slot fully created, mark as persistent if needed */
 1465 alvherre                  874 GIC          10 :     if (logical_slot && !temporary)
                                875               3 :         ReplicationSlotPersist();
 1465 alvherre                  876 ECB             : 
                                877                 :     /* All done.  Set up the return values */
 1465 alvherre                  878 GIC          10 :     values[0] = NameGetDatum(dst_name);
                                879              10 :     nulls[0] = false;
                                880              10 :     if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
                                881                 :     {
                                882               6 :         values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
 1465 alvherre                  883 CBC           6 :         nulls[1] = false;
 1465 alvherre                  884 ECB             :     }
                                885                 :     else
 1465 alvherre                  886 GIC           4 :         nulls[1] = true;
 1465 alvherre                  887 ECB             : 
 1465 alvherre                  888 CBC          10 :     tuple = heap_form_tuple(tupdesc, values, nulls);
                                889              10 :     result = HeapTupleGetDatum(tuple);
                                890                 : 
                                891              10 :     ReplicationSlotRelease();
 1465 alvherre                  892 ECB             : 
 1465 alvherre                  893 GIC          10 :     PG_RETURN_DATUM(result);
                                894                 : }
 1465 alvherre                  895 ECB             : 
                                896                 : /* The wrappers below are all to appease opr_sanity */
                                897                 : Datum
 1465 alvherre                  898 CBC           4 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
                                899                 : {
                                900               4 :     return copy_replication_slot(fcinfo, true);
                                901                 : }
 1465 alvherre                  902 ECB             : 
                                903                 : Datum
 1465 alvherre                  904 UIC           0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
                                905                 : {
                                906               0 :     return copy_replication_slot(fcinfo, true);
 1465 alvherre                  907 ECB             : }
                                908                 : 
                                909                 : Datum
 1465 alvherre                  910 GIC           4 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
                                911                 : {
                                912               4 :     return copy_replication_slot(fcinfo, true);
 1465 alvherre                  913 EUB             : }
                                914                 : 
                                915                 : Datum
 1465 alvherre                  916 GIC           2 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
                                917                 : {
                                918               2 :     return copy_replication_slot(fcinfo, false);
 1465 alvherre                  919 ECB             : }
                                920                 : 
                                921                 : Datum
 1465 alvherre                  922 GIC           4 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
                                923                 : {
                                924               4 :     return copy_replication_slot(fcinfo, false);
 1465 alvherre                  925 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a