LCOV - differential code coverage report
Current view: top level - src/backend/replication - slotfuncs.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 94.3 % 318 300 4 9 5 5 147 6 142 8 152 2
Current Date: 2023-04-08 15:15:32 Functions: 93.3 % 15 14 1 9 5 1 9
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * slotfuncs.c
       4                 :  *     Support functions for replication slots
       5                 :  *
       6                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * IDENTIFICATION
       9                 :  *    src/backend/replication/slotfuncs.c
      10                 :  *
      11                 :  *-------------------------------------------------------------------------
      12                 :  */
      13                 : #include "postgres.h"
      14                 : 
      15                 : #include "access/htup_details.h"
      16                 : #include "access/xlog_internal.h"
      17                 : #include "access/xlogrecovery.h"
      18                 : #include "access/xlogutils.h"
      19                 : #include "funcapi.h"
      20                 : #include "miscadmin.h"
      21                 : #include "replication/decode.h"
      22                 : #include "replication/logical.h"
      23                 : #include "replication/slot.h"
      24                 : #include "utils/builtins.h"
      25                 : #include "utils/inval.h"
      26                 : #include "utils/pg_lsn.h"
      27                 : #include "utils/resowner.h"
      28                 : 
      29                 : /*
      30                 :  * Helper function for creating a new physical replication slot with
      31                 :  * given arguments. Note that this function doesn't release the created
      32                 :  * slot.
      33                 :  *
      34                 :  * If restart_lsn is a valid value, we use it without WAL reservation
      35                 :  * routine. So the caller must guarantee that WAL is available.
      36                 :  */
      37                 : static void
      38 CBC          29 : create_physical_replication_slot(char *name, bool immediately_reserve,
      39                 :                                  bool temporary, XLogRecPtr restart_lsn)
      40                 : {
      41              29 :     Assert(!MyReplicationSlot);
      42                 : 
      43                 :     /* acquire replication slot, this will check for conflicting names */
      44              29 :     ReplicationSlotCreate(name, false,
      45                 :                           temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
      46                 : 
      47              29 :     if (immediately_reserve)
      48                 :     {
      49                 :         /* Reserve WAL as the user asked for it */
      50              15 :         if (XLogRecPtrIsInvalid(restart_lsn))
      51              11 :             ReplicationSlotReserveWal();
      52                 :         else
      53               4 :             MyReplicationSlot->data.restart_lsn = restart_lsn;
      54                 : 
      55                 :         /* Write this slot to disk */
      56              15 :         ReplicationSlotMarkDirty();
      57              15 :         ReplicationSlotSave();
      58                 :     }
      59              29 : }
      60                 : 
      61                 : /*
      62                 :  * SQL function for creating a new physical (streaming replication)
      63                 :  * replication slot.
      64                 :  */
      65                 : Datum
      66              25 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
      67                 : {
      68              25 :     Name        name = PG_GETARG_NAME(0);
      69              25 :     bool        immediately_reserve = PG_GETARG_BOOL(1);
      70              25 :     bool        temporary = PG_GETARG_BOOL(2);
      71                 :     Datum       values[2];
      72                 :     bool        nulls[2];
      73                 :     TupleDesc   tupdesc;
      74                 :     HeapTuple   tuple;
      75                 :     Datum       result;
      76                 : 
      77              25 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
      78 UBC           0 :         elog(ERROR, "return type must be a row type");
      79                 : 
      80 CBC          25 :     CheckSlotPermissions();
      81                 : 
      82              25 :     CheckSlotRequirements();
      83                 : 
      84              25 :     create_physical_replication_slot(NameStr(*name),
      85                 :                                      immediately_reserve,
      86                 :                                      temporary,
      87                 :                                      InvalidXLogRecPtr);
      88                 : 
      89              25 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
      90              25 :     nulls[0] = false;
      91                 : 
      92              25 :     if (immediately_reserve)
      93                 :     {
      94              11 :         values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
      95              11 :         nulls[1] = false;
      96                 :     }
      97                 :     else
      98              14 :         nulls[1] = true;
      99                 : 
     100              25 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     101              25 :     result = HeapTupleGetDatum(tuple);
     102                 : 
     103              25 :     ReplicationSlotRelease();
     104                 : 
     105              25 :     PG_RETURN_DATUM(result);
     106                 : }
     107                 : 
     108                 : 
     109                 : /*
     110                 :  * Helper function for creating a new logical replication slot with
     111                 :  * given arguments. Note that this function doesn't release the created
     112                 :  * slot.
     113                 :  *
     114                 :  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
     115                 :  * caller's responsibility to ensure it's set to something sensible.
     116                 :  */
     117                 : static void
     118             101 : create_logical_replication_slot(char *name, char *plugin,
     119                 :                                 bool temporary, bool two_phase,
     120                 :                                 XLogRecPtr restart_lsn,
     121                 :                                 bool find_startpoint)
     122                 : {
     123             101 :     LogicalDecodingContext *ctx = NULL;
     124                 : 
     125             101 :     Assert(!MyReplicationSlot);
     126                 : 
     127                 :     /*
     128                 :      * Acquire a logical decoding slot, this will check for conflicting names.
     129                 :      * Initially create persistent slot as ephemeral - that allows us to
     130                 :      * nicely handle errors during initialization because it'll get dropped if
     131                 :      * this transaction fails. We'll make it persistent at the end. Temporary
     132                 :      * slots can be created as temporary from beginning as they get dropped on
     133                 :      * error as well.
     134                 :      */
     135             101 :     ReplicationSlotCreate(name, true,
     136                 :                           temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
     137                 : 
     138                 :     /*
     139                 :      * Create logical decoding context to find start point or, if we don't
     140                 :      * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
     141                 :      *
     142                 :      * Note: when !find_startpoint this is still important, because it's at
     143                 :      * this point that the output plugin is validated.
     144                 :      */
     145              97 :     ctx = CreateInitDecodingContext(plugin, NIL,
     146                 :                                     false,  /* just catalogs is OK */
     147                 :                                     restart_lsn,
     148              97 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     149                 :                                                .segment_open = wal_segment_open,
     150                 :                                                .segment_close = wal_segment_close),
     151                 :                                     NULL, NULL, NULL);
     152                 : 
     153                 :     /*
     154                 :      * If caller needs us to determine the decoding start point, do so now.
     155                 :      * This might take a while.
     156                 :      */
     157              94 :     if (find_startpoint)
     158              88 :         DecodingContextFindStartpoint(ctx);
     159                 : 
     160                 :     /* don't need the decoding context anymore */
     161              92 :     FreeDecodingContext(ctx);
     162              92 : }
     163                 : 
     164                 : /*
     165                 :  * SQL function for creating a new logical replication slot.
     166                 :  */
     167                 : Datum
     168              95 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     169                 : {
     170              95 :     Name        name = PG_GETARG_NAME(0);
     171              95 :     Name        plugin = PG_GETARG_NAME(1);
     172              95 :     bool        temporary = PG_GETARG_BOOL(2);
     173              95 :     bool        two_phase = PG_GETARG_BOOL(3);
     174                 :     Datum       result;
     175                 :     TupleDesc   tupdesc;
     176                 :     HeapTuple   tuple;
     177                 :     Datum       values[2];
     178                 :     bool        nulls[2];
     179                 : 
     180              95 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     181 UBC           0 :         elog(ERROR, "return type must be a row type");
     182                 : 
     183 CBC          95 :     CheckSlotPermissions();
     184                 : 
     185              94 :     CheckLogicalDecodingRequirements();
     186                 : 
     187              94 :     create_logical_replication_slot(NameStr(*name),
     188              94 :                                     NameStr(*plugin),
     189                 :                                     temporary,
     190                 :                                     two_phase,
     191                 :                                     InvalidXLogRecPtr,
     192                 :                                     true);
     193                 : 
     194              86 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     195              86 :     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     196                 : 
     197              86 :     memset(nulls, 0, sizeof(nulls));
     198                 : 
     199              86 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     200              86 :     result = HeapTupleGetDatum(tuple);
     201                 : 
     202                 :     /* ok, slot is now fully created, mark it as persistent if needed */
     203              86 :     if (!temporary)
     204              81 :         ReplicationSlotPersist();
     205              86 :     ReplicationSlotRelease();
     206                 : 
     207              86 :     PG_RETURN_DATUM(result);
     208                 : }
     209                 : 
     210                 : 
     211                 : /*
     212                 :  * SQL function for dropping a replication slot.
     213                 :  */
     214                 : Datum
     215             109 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
     216                 : {
     217             109 :     Name        name = PG_GETARG_NAME(0);
     218                 : 
     219             109 :     CheckSlotPermissions();
     220                 : 
     221             107 :     CheckSlotRequirements();
     222                 : 
     223             107 :     ReplicationSlotDrop(NameStr(*name), true);
     224                 : 
     225             102 :     PG_RETURN_VOID();
     226                 : }
     227                 : 
     228                 : /*
     229                 :  * pg_get_replication_slots - SQL SRF showing all replication slots
     230                 :  * that currently exist on the database cluster.
     231                 :  */
     232                 : Datum
     233 GIC         171 : pg_get_replication_slots(PG_FUNCTION_ARGS)
     234 ECB             : {
     235                 : #define PG_GET_REPLICATION_SLOTS_COLS 15
     236 GIC         171 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     237 ECB             :     XLogRecPtr  currlsn;
     238                 :     int         slotno;
     239                 : 
     240                 :     /*
     241                 :      * We don't require any special permission to see this function's data
     242                 :      * because nothing should be sensitive. The most critical being the slot
     243                 :      * name, which shouldn't contain anything particularly sensitive.
     244                 :      */
     245                 : 
     246 GIC         171 :     InitMaterializedSRF(fcinfo, 0);
     247 ECB             : 
     248 GIC         171 :     currlsn = GetXLogWriteRecPtr();
     249 ECB             : 
     250 GIC         171 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     251 CBC        1113 :     for (slotno = 0; slotno < max_replication_slots; slotno++)
     252 ECB             :     {
     253 GIC         942 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
     254 ECB             :         ReplicationSlot slot_contents;
     255                 :         Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
     256                 :         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
     257                 :         WALAvailability walstate;
     258                 :         int         i;
     259                 : 
     260 GIC         942 :         if (!slot->in_use)
     261 CBC         623 :             continue;
     262 ECB             : 
     263                 :         /* Copy slot contents while holding spinlock, then examine at leisure */
     264 GIC         319 :         SpinLockAcquire(&slot->mutex);
     265 CBC         319 :         slot_contents = *slot;
     266             319 :         SpinLockRelease(&slot->mutex);
     267 ECB             : 
     268 GIC         319 :         memset(values, 0, sizeof(values));
     269 CBC         319 :         memset(nulls, 0, sizeof(nulls));
     270 ECB             : 
     271 GIC         319 :         i = 0;
     272 CBC         319 :         values[i++] = NameGetDatum(&slot_contents.data.name);
     273 ECB             : 
     274 GIC         319 :         if (slot_contents.data.database == InvalidOid)
     275 CBC         124 :             nulls[i++] = true;
     276 ECB             :         else
     277 GIC         195 :             values[i++] = NameGetDatum(&slot_contents.data.plugin);
     278 ECB             : 
     279 GIC         319 :         if (slot_contents.data.database == InvalidOid)
     280 CBC         124 :             values[i++] = CStringGetTextDatum("physical");
     281 ECB             :         else
     282 GIC         195 :             values[i++] = CStringGetTextDatum("logical");
     283 ECB             : 
     284 GIC         319 :         if (slot_contents.data.database == InvalidOid)
     285 CBC         124 :             nulls[i++] = true;
     286 ECB             :         else
     287 GIC         195 :             values[i++] = ObjectIdGetDatum(slot_contents.data.database);
     288 ECB             : 
     289 GIC         319 :         values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
     290 CBC         319 :         values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
     291 ECB             : 
     292 GIC         319 :         if (slot_contents.active_pid != 0)
     293 CBC         132 :             values[i++] = Int32GetDatum(slot_contents.active_pid);
     294 ECB             :         else
     295 GIC         187 :             nulls[i++] = true;
     296 ECB             : 
     297 GIC         319 :         if (slot_contents.data.xmin != InvalidTransactionId)
     298 CBC          16 :             values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
     299 ECB             :         else
     300 GIC         303 :             nulls[i++] = true;
     301 ECB             : 
     302 GIC         319 :         if (slot_contents.data.catalog_xmin != InvalidTransactionId)
     303 CBC         199 :             values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
     304 ECB             :         else
     305 GIC         120 :             nulls[i++] = true;
     306 ECB             : 
     307 GIC         319 :         if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
     308 CBC         306 :             values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
     309 ECB             :         else
     310 GIC          13 :             nulls[i++] = true;
     311 ECB             : 
     312 GIC         319 :         if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
     313 CBC         173 :             values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
     314 ECB             :         else
     315 GIC         146 :             nulls[i++] = true;
     316 ECB             : 
     317                 :         /*
     318                 :          * If the slot has not been invalidated, test availability from
     319                 :          * restart_lsn.
     320                 :          */
     321 GNC         319 :         if (slot_contents.data.invalidated != RS_INVAL_NONE)
     322 GIC          14 :             walstate = WALAVAIL_REMOVED;
     323 ECB             :         else
     324 GIC         305 :             walstate = GetWALAvailability(slot_contents.data.restart_lsn);
     325 ECB             : 
     326 GIC         319 :         switch (walstate)
     327 ECB             :         {
     328 CBC          11 :             case WALAVAIL_INVALID_LSN:
     329              11 :                 nulls[i++] = true;
     330 GIC          11 :                 break;
     331 ECB             : 
     332 CBC         291 :             case WALAVAIL_RESERVED:
     333             291 :                 values[i++] = CStringGetTextDatum("reserved");
     334 GIC         291 :                 break;
     335 ECB             : 
     336 CBC           2 :             case WALAVAIL_EXTENDED:
     337               2 :                 values[i++] = CStringGetTextDatum("extended");
     338 GIC           2 :                 break;
     339 ECB             : 
     340 CBC           1 :             case WALAVAIL_UNRESERVED:
     341               1 :                 values[i++] = CStringGetTextDatum("unreserved");
     342 GIC           1 :                 break;
     343 ECB             : 
     344 GIC          14 :             case WALAVAIL_REMOVED:
     345                 : 
     346                 :                 /*
     347                 :                  * If we read the restart_lsn long enough ago, maybe that file
     348                 :                  * has been removed by now.  However, the walsender could have
     349                 :                  * moved forward enough that it jumped to another file after
     350                 :                  * we looked.  If checkpointer signalled the process to
     351                 :                  * termination, then it's definitely lost; but if a process is
     352                 :                  * still alive, then "unreserved" seems more appropriate.
     353                 :                  *
     354                 :                  * If we do change it, save the state for safe_wal_size below.
     355 ECB             :                  */
     356 GIC          14 :                 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
     357                 :                 {
     358                 :                     int         pid;
     359 ECB             : 
     360 CBC          12 :                     SpinLockAcquire(&slot->mutex);
     361              12 :                     pid = slot->active_pid;
     362              12 :                     slot_contents.data.restart_lsn = slot->data.restart_lsn;
     363              12 :                     SpinLockRelease(&slot->mutex);
     364 GIC          12 :                     if (pid != 0)
     365 EUB             :                     {
     366 UBC           0 :                         values[i++] = CStringGetTextDatum("unreserved");
     367               0 :                         walstate = WALAVAIL_UNRESERVED;
     368 UIC           0 :                         break;
     369                 :                     }
     370 ECB             :                 }
     371 CBC          14 :                 values[i++] = CStringGetTextDatum("lost");
     372 GIC          14 :                 break;
     373                 :         }
     374                 : 
     375                 :         /*
     376                 :          * safe_wal_size is only computed for slots that have not been lost,
     377                 :          * and only if there's a configured maximum size.
     378 ECB             :          */
     379 CBC         319 :         if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
     380 GIC         314 :             nulls[i++] = true;
     381                 :         else
     382                 :         {
     383                 :             XLogSegNo   targetSeg;
     384                 :             uint64      slotKeepSegs;
     385                 :             uint64      keepSegs;
     386                 :             XLogSegNo   failSeg;
     387                 :             XLogRecPtr  failLSN;
     388 ECB             : 
     389 GIC           5 :             XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
     390                 : 
     391 ECB             :             /* determine how many segments can be kept by slots */
     392 GIC           5 :             slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
     393 ECB             :             /* ditto for wal_keep_size */
     394 GIC           5 :             keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
     395                 : 
     396 ECB             :             /* if currpos reaches failLSN, we lose our segment */
     397 CBC           5 :             failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
     398 GIC           5 :             XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
     399 ECB             : 
     400 GIC           5 :             values[i++] = Int64GetDatum(failLSN - currlsn);
     401                 :         }
     402 ECB             : 
     403 GIC         319 :         values[i++] = BoolGetDatum(slot_contents.data.two_phase);
     404 ECB             : 
     405 GNC         319 :         if (slot_contents.data.database == InvalidOid)
     406             124 :             nulls[i++] = true;
     407                 :         else
     408                 :         {
     409             195 :             if (slot_contents.data.invalidated != RS_INVAL_NONE)
     410              12 :                 values[i++] = BoolGetDatum(true);
     411                 :             else
     412             183 :                 values[i++] = BoolGetDatum(false);
     413                 :         }
     414                 : 
     415 CBC         319 :         Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
     416                 : 
     417 GIC         319 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
     418 ECB             :                              values, nulls);
     419                 :     }
     420                 : 
     421 CBC         171 :     LWLockRelease(ReplicationSlotControlLock);
     422                 : 
     423 GIC         171 :     return (Datum) 0;
     424 ECB             : }
     425                 : 
     426                 : /*
     427                 :  * Helper function for advancing our physical replication slot forward.
     428                 :  *
     429                 :  * The LSN position to move to is compared simply to the slot's restart_lsn,
     430                 :  * knowing that any position older than that would be removed by successive
     431                 :  * checkpoints.
     432                 :  */
     433                 : static XLogRecPtr
     434 GIC           1 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
     435                 : {
     436               1 :     XLogRecPtr  startlsn = MyReplicationSlot->data.restart_lsn;
     437               1 :     XLogRecPtr  retlsn = startlsn;
     438                 : 
     439               1 :     Assert(moveto != InvalidXLogRecPtr);
     440                 : 
     441               1 :     if (startlsn < moveto)
     442                 :     {
     443 CBC           1 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     444 GIC           1 :         MyReplicationSlot->data.restart_lsn = moveto;
     445 CBC           1 :         SpinLockRelease(&MyReplicationSlot->mutex);
     446               1 :         retlsn = moveto;
     447                 : 
     448 ECB             :         /*
     449                 :          * Dirty the slot so as it is written out at the next checkpoint. Note
     450                 :          * that the LSN position advanced may still be lost in the event of a
     451                 :          * crash, but this makes the data consistent after a clean shutdown.
     452                 :          */
     453 CBC           1 :         ReplicationSlotMarkDirty();
     454 ECB             :     }
     455                 : 
     456 GIC           1 :     return retlsn;
     457                 : }
     458                 : 
     459                 : /*
     460                 :  * Helper function for advancing our logical replication slot forward.
     461                 :  *
     462 ECB             :  * The slot's restart_lsn is used as start point for reading records, while
     463                 :  * confirmed_flush is used as base point for the decoding context.
     464                 :  *
     465                 :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
     466                 :  * because we need to digest WAL to advance restart_lsn allowing to recycle
     467                 :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
     468                 :  * mode, no changes are generated anyway.
     469                 :  */
     470                 : static XLogRecPtr
     471 GIC           3 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
     472                 : {
     473                 :     LogicalDecodingContext *ctx;
     474               3 :     ResourceOwner old_resowner = CurrentResourceOwner;
     475                 :     XLogRecPtr  retlsn;
     476                 : 
     477               3 :     Assert(moveto != InvalidXLogRecPtr);
     478                 : 
     479               3 :     PG_TRY();
     480 ECB             :     {
     481                 :         /*
     482                 :          * Create our decoding context in fast_forward mode, passing start_lsn
     483                 :          * as InvalidXLogRecPtr, so that we start processing from my slot's
     484                 :          * confirmed_flush.
     485                 :          */
     486 CBC           6 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     487                 :                                     NIL,
     488 ECB             :                                     true,   /* fast_forward */
     489 GIC           3 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     490                 :                                                .segment_open = wal_segment_open,
     491                 :                                                .segment_close = wal_segment_close),
     492                 :                                     NULL, NULL, NULL);
     493                 : 
     494                 :         /*
     495 ECB             :          * Start reading at the slot's restart_lsn, which we know to point to
     496                 :          * a valid record.
     497                 :          */
     498 CBC           3 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     499                 : 
     500                 :         /* invalidate non-timetravel entries */
     501 GIC           3 :         InvalidateSystemCaches();
     502                 : 
     503                 :         /* Decode at least one record, until we run out of records */
     504              91 :         while (ctx->reader->EndRecPtr < moveto)
     505                 :         {
     506              91 :             char       *errm = NULL;
     507 ECB             :             XLogRecord *record;
     508                 : 
     509                 :             /*
     510                 :              * Read records.  No changes are generated in fast_forward mode,
     511                 :              * but snapbuilder/slot statuses are updated properly.
     512                 :              */
     513 CBC          91 :             record = XLogReadRecord(ctx->reader, &errm);
     514 GIC          91 :             if (errm)
     515 LBC           0 :                 elog(ERROR, "could not find record while advancing replication slot: %s",
     516                 :                      errm);
     517                 : 
     518                 :             /*
     519                 :              * Process the record.  Storage-level changes are ignored in
     520                 :              * fast_forward mode, but other modules (such as snapbuilder)
     521                 :              * might still have critical updates to do.
     522 ECB             :              */
     523 CBC          91 :             if (record)
     524 GBC          91 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     525                 : 
     526                 :             /* Stop once the requested target has been reached */
     527 GIC          91 :             if (moveto <= ctx->reader->EndRecPtr)
     528               3 :                 break;
     529                 : 
     530              88 :             CHECK_FOR_INTERRUPTS();
     531                 :         }
     532 ECB             : 
     533                 :         /*
     534                 :          * Logical decoding could have clobbered CurrentResourceOwner during
     535                 :          * transaction management, so restore the executor's value.  (This is
     536                 :          * a kluge, but it's not worth cleaning up right now.)
     537                 :          */
     538 GIC           3 :         CurrentResourceOwner = old_resowner;
     539 ECB             : 
     540 GIC           3 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
     541                 :         {
     542               3 :             LogicalConfirmReceivedLocation(moveto);
     543                 : 
     544                 :             /*
     545                 :              * If only the confirmed_flush LSN has changed the slot won't get
     546                 :              * marked as dirty by the above. Callers on the walsender
     547 ECB             :              * interface are expected to keep track of their own progress and
     548                 :              * don't need it written out. But SQL-interface users cannot
     549                 :              * specify their own start positions and it's harder for them to
     550                 :              * keep track of their progress, so we should make more of an
     551                 :              * effort to save it for them.
     552                 :              *
     553                 :              * Dirty the slot so it is written out at the next checkpoint. The
     554                 :              * LSN position advanced to may still be lost on a crash but this
     555                 :              * makes the data consistent after a clean shutdown.
     556                 :              */
     557 GIC           3 :             ReplicationSlotMarkDirty();
     558                 :         }
     559                 : 
     560               3 :         retlsn = MyReplicationSlot->data.confirmed_flush;
     561                 : 
     562                 :         /* free context, call shutdown callback */
     563               3 :         FreeDecodingContext(ctx);
     564                 : 
     565               3 :         InvalidateSystemCaches();
     566 ECB             :     }
     567 UIC           0 :     PG_CATCH();
     568                 :     {
     569 ECB             :         /* clear all timetravel entries */
     570 UIC           0 :         InvalidateSystemCaches();
     571                 : 
     572 LBC           0 :         PG_RE_THROW();
     573                 :     }
     574 CBC           3 :     PG_END_TRY();
     575                 : 
     576 GBC           3 :     return retlsn;
     577                 : }
     578                 : 
     579 EUB             : /*
     580                 :  * SQL function for moving the position in a replication slot.
     581                 :  */
     582                 : Datum
     583 CBC           6 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
     584                 : {
     585               6 :     Name        slotname = PG_GETARG_NAME(0);
     586 GIC           6 :     XLogRecPtr  moveto = PG_GETARG_LSN(1);
     587                 :     XLogRecPtr  endlsn;
     588                 :     XLogRecPtr  minlsn;
     589                 :     TupleDesc   tupdesc;
     590                 :     Datum       values[2];
     591                 :     bool        nulls[2];
     592 ECB             :     HeapTuple   tuple;
     593                 :     Datum       result;
     594                 : 
     595 CBC           6 :     Assert(!MyReplicationSlot);
     596                 : 
     597 GIC           6 :     CheckSlotPermissions();
     598                 : 
     599               6 :     if (XLogRecPtrIsInvalid(moveto))
     600               1 :         ereport(ERROR,
     601                 :                 (errmsg("invalid target WAL LSN")));
     602                 : 
     603                 :     /* Build a tuple descriptor for our result type */
     604 CBC           5 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     605 UIC           0 :         elog(ERROR, "return type must be a row type");
     606 ECB             : 
     607                 :     /*
     608                 :      * We can't move slot past what's been flushed/replayed so clamp the
     609                 :      * target position accordingly.
     610                 :      */
     611 GIC           5 :     if (!RecoveryInProgress())
     612               5 :         moveto = Min(moveto, GetFlushRecPtr(NULL));
     613 ECB             :     else
     614 UBC           0 :         moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
     615                 : 
     616                 :     /* Acquire the slot so we "own" it */
     617 GIC           5 :     ReplicationSlotAcquire(NameStr(*slotname), true);
     618                 : 
     619                 :     /* A slot whose restart_lsn has never been reserved cannot be advanced */
     620 CBC           5 :     if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
     621               1 :         ereport(ERROR,
     622                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     623 EUB             :                  errmsg("replication slot \"%s\" cannot be advanced",
     624                 :                         NameStr(*slotname)),
     625                 :                  errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
     626 ECB             : 
     627                 :     /*
     628                 :      * Check if the slot is not moving backwards.  Physical slots rely simply
     629                 :      * on restart_lsn as a minimum point, while logical slots have confirmed
     630                 :      * consumption up to confirmed_flush, meaning that in both cases data
     631                 :      * older than that is not available anymore.
     632                 :      */
     633 GIC           4 :     if (OidIsValid(MyReplicationSlot->data.database))
     634               3 :         minlsn = MyReplicationSlot->data.confirmed_flush;
     635                 :     else
     636               1 :         minlsn = MyReplicationSlot->data.restart_lsn;
     637                 : 
     638               4 :     if (moveto < minlsn)
     639 UIC           0 :         ereport(ERROR,
     640                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     641                 :                  errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
     642 ECB             :                         LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
     643                 : 
     644                 :     /* Do the actual slot update, depending on the slot type */
     645 CBC           4 :     if (OidIsValid(MyReplicationSlot->data.database))
     646 GIC           3 :         endlsn = pg_logical_replication_slot_advance(moveto);
     647 ECB             :     else
     648 GBC           1 :         endlsn = pg_physical_replication_slot_advance(moveto);
     649                 : 
     650 GIC           4 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     651               4 :     nulls[0] = false;
     652                 : 
     653                 :     /*
     654 ECB             :      * Recompute the minimum LSN and xmin across all slots to adjust with the
     655                 :      * advancing potentially done.
     656                 :      */
     657 CBC           4 :     ReplicationSlotsComputeRequiredXmin(false);
     658 GIC           4 :     ReplicationSlotsComputeRequiredLSN();
     659 ECB             : 
     660 CBC           4 :     ReplicationSlotRelease();
     661                 : 
     662                 :     /* Return the reached position. */
     663 GIC           4 :     values[1] = LSNGetDatum(endlsn);
     664               4 :     nulls[1] = false;
     665                 : 
     666 CBC           4 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     667               4 :     result = HeapTupleGetDatum(tuple);
     668                 : 
     669               4 :     PG_RETURN_DATUM(result);
     670                 : }
     671                 : 
     672 ECB             : /*
     673                 :  * Helper function of copying a replication slot.
     674                 :  */
     675                 : static Datum
     676 CBC          14 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
     677                 : {
     678              14 :     Name        src_name = PG_GETARG_NAME(0);
     679 GIC          14 :     Name        dst_name = PG_GETARG_NAME(1);
     680              14 :     ReplicationSlot *src = NULL;
     681                 :     ReplicationSlot first_slot_contents;
     682                 :     ReplicationSlot second_slot_contents;
     683                 :     XLogRecPtr  src_restart_lsn;
     684                 :     bool        src_islogical;
     685 ECB             :     bool        temporary;
     686                 :     char       *plugin;
     687                 :     Datum       values[2];
     688                 :     bool        nulls[2];
     689                 :     Datum       result;
     690                 :     TupleDesc   tupdesc;
     691                 :     HeapTuple   tuple;
     692                 : 
     693 GIC          14 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     694 UIC           0 :         elog(ERROR, "return type must be a row type");
     695                 : 
     696 GIC          14 :     CheckSlotPermissions();
     697                 : 
     698              14 :     if (logical_slot)
     699               8 :         CheckLogicalDecodingRequirements();
     700                 :     else
     701               6 :         CheckSlotRequirements();
     702 ECB             : 
     703 GBC          14 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     704                 : 
     705 ECB             :     /*
     706                 :      * We need to prevent the source slot's reserved WAL from being removed,
     707                 :      * but we don't want to lock that slot for very long, and it can advance
     708                 :      * in the meantime.  So obtain the source slot's data, and create a new
     709                 :      * slot using its restart_lsn.  Afterwards we lock the source slot again
     710                 :      * and verify that the data we copied (name, type) has not changed
     711                 :      * incompatibly.  No inconvenient WAL removal can occur once the new slot
     712                 :      * is created -- but since WAL removal could have occurred before we
     713                 :      * managed to create the new slot, we advance the new slot's restart_lsn
     714                 :      * to the source slot's updated restart_lsn the second time we lock it.
     715                 :      */
     716 GIC          15 :     for (int i = 0; i < max_replication_slots; i++)
     717                 :     {
     718              15 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     719                 : 
     720              15 :         if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
     721                 :         {
     722                 :             /* Copy the slot contents while holding spinlock */
     723              14 :             SpinLockAcquire(&s->mutex);
     724              14 :             first_slot_contents = *s;
     725 CBC          14 :             SpinLockRelease(&s->mutex);
     726 GIC          14 :             src = s;
     727 CBC          14 :             break;
     728                 :         }
     729 ECB             :     }
     730                 : 
     731 GIC          14 :     LWLockRelease(ReplicationSlotControlLock);
     732 ECB             : 
     733 CBC          14 :     if (src == NULL)
     734 LBC           0 :         ereport(ERROR,
     735 ECB             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     736                 :                  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
     737                 : 
     738 GIC          14 :     src_islogical = SlotIsLogical(&first_slot_contents);
     739              14 :     src_restart_lsn = first_slot_contents.data.restart_lsn;
     740 CBC          14 :     temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
     741 GIC          14 :     plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
     742 ECB             : 
     743 EUB             :     /* Check type of replication slot */
     744 GIC          14 :     if (src_islogical != logical_slot)
     745               2 :         ereport(ERROR,
     746                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     747 ECB             :                  src_islogical ?
     748                 :                  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
     749                 :                         NameStr(*src_name)) :
     750                 :                  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
     751                 :                         NameStr(*src_name))));
     752                 : 
     753                 :     /* Copying non-reserved slot doesn't make sense */
     754 CBC          12 :     if (XLogRecPtrIsInvalid(src_restart_lsn))
     755 GIC           1 :         ereport(ERROR,
     756                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     757                 :                  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
     758                 : 
     759                 :     /* Overwrite params from optional arguments */
     760              11 :     if (PG_NARGS() >= 3)
     761               6 :         temporary = PG_GETARG_BOOL(2);
     762              11 :     if (PG_NARGS() >= 4)
     763 ECB             :     {
     764 CBC           4 :         Assert(logical_slot);
     765 GIC           4 :         plugin = NameStr(*(PG_GETARG_NAME(3)));
     766                 :     }
     767                 : 
     768                 :     /* Create new slot and acquire it */
     769 CBC          11 :     if (logical_slot)
     770 ECB             :     {
     771                 :         /*
     772                 :          * We must not try to read WAL, since we haven't reserved it yet --
     773                 :          * hence pass find_startpoint false.  confirmed_flush will be set
     774                 :          * below, by copying from the source slot.
     775                 :          */
     776 GIC           7 :         create_logical_replication_slot(NameStr(*dst_name),
     777                 :                                         plugin,
     778 ECB             :                                         temporary,
     779                 :                                         false,
     780                 :                                         src_restart_lsn,
     781                 :                                         false);
     782                 :     }
     783                 :     else
     784 GIC           4 :         create_physical_replication_slot(NameStr(*dst_name),
     785 ECB             :                                          true,
     786                 :                                          temporary,
     787                 :                                          src_restart_lsn);
     788                 : 
     789                 :     /*
     790                 :      * Update the destination slot to current values of the source slot;
     791                 :      * recheck that the source slot is still the one we saw previously.
     792                 :      */
     793                 :     {
     794                 :         TransactionId copy_effective_xmin;
     795                 :         TransactionId copy_effective_catalog_xmin;
     796                 :         TransactionId copy_xmin;
     797                 :         TransactionId copy_catalog_xmin;
     798                 :         XLogRecPtr  copy_restart_lsn;
     799                 :         XLogRecPtr  copy_confirmed_flush;
     800                 :         bool        copy_islogical;
     801                 :         char       *copy_name;
     802                 : 
     803                 :         /* Copy data of source slot again */
     804 GIC          10 :         SpinLockAcquire(&src->mutex);
     805              10 :         second_slot_contents = *src;
     806              10 :         SpinLockRelease(&src->mutex);
     807                 : 
     808              10 :         copy_effective_xmin = second_slot_contents.effective_xmin;
     809              10 :         copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
     810                 : 
     811              10 :         copy_xmin = second_slot_contents.data.xmin;
     812              10 :         copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
     813 CBC          10 :         copy_restart_lsn = second_slot_contents.data.restart_lsn;
     814              10 :         copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
     815 ECB             : 
     816                 :         /* for existence check */
     817 CBC          10 :         copy_name = NameStr(second_slot_contents.data.name);
     818              10 :         copy_islogical = SlotIsLogical(&second_slot_contents);
     819                 : 
     820 ECB             :         /*
     821                 :          * Check if the source slot still exists and is valid. We regard it as
     822                 :          * invalid if the type of replication slot or name has been changed,
     823                 :          * or the restart_lsn either is invalid or has gone backward. (The
     824                 :          * restart_lsn could go backwards if the source slot is dropped and
     825                 :          * copied from an older slot during installation.)
     826                 :          *
     827                 :          * Since erroring out will release and drop the destination slot we
     828                 :          * don't need to release it here.
     829                 :          */
     830 GIC          10 :         if (copy_restart_lsn < src_restart_lsn ||
     831              10 :             src_islogical != copy_islogical ||
     832              10 :             strcmp(copy_name, NameStr(*src_name)) != 0)
     833 UIC           0 :             ereport(ERROR,
     834                 :                     (errmsg("could not copy replication slot \"%s\"",
     835                 :                             NameStr(*src_name)),
     836                 :                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
     837                 : 
     838                 :         /* The source slot must have a consistent snapshot */
     839 CBC          10 :         if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
     840 LBC           0 :             ereport(ERROR,
     841 ECB             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     842 EUB             :                      errmsg("cannot copy unfinished logical replication slot \"%s\"",
     843                 :                             NameStr(*src_name)),
     844                 :                      errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
     845                 : 
     846                 :         /* Install copied values again */
     847 GIC          10 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     848 CBC          10 :         MyReplicationSlot->effective_xmin = copy_effective_xmin;
     849 GBC          10 :         MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
     850                 : 
     851 GIC          10 :         MyReplicationSlot->data.xmin = copy_xmin;
     852              10 :         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
     853              10 :         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
     854              10 :         MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
     855              10 :         SpinLockRelease(&MyReplicationSlot->mutex);
     856 ECB             : 
     857 CBC          10 :         ReplicationSlotMarkDirty();
     858              10 :         ReplicationSlotsComputeRequiredXmin(false);
     859 GIC          10 :         ReplicationSlotsComputeRequiredLSN();
     860 CBC          10 :         ReplicationSlotSave();
     861 ECB             : 
     862                 : #ifdef USE_ASSERT_CHECKING
     863                 :         /* Check that the restart_lsn is available */
     864                 :         {
     865                 :             XLogSegNo   segno;
     866                 : 
     867 CBC          10 :             XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
     868              10 :             Assert(XLogGetLastRemovedSegno() < segno);
     869 ECB             :         }
     870                 : #endif
     871                 :     }
     872                 : 
     873                 :     /* target slot fully created, mark as persistent if needed */
     874 GIC          10 :     if (logical_slot && !temporary)
     875               3 :         ReplicationSlotPersist();
     876 ECB             : 
     877                 :     /* All done.  Set up the return values */
     878 GIC          10 :     values[0] = NameGetDatum(dst_name);
     879              10 :     nulls[0] = false;
     880              10 :     if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
     881                 :     {
     882               6 :         values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     883 CBC           6 :         nulls[1] = false;
     884 ECB             :     }
     885                 :     else
     886 GIC           4 :         nulls[1] = true;
     887 ECB             : 
     888 CBC          10 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     889              10 :     result = HeapTupleGetDatum(tuple);
     890                 : 
     891              10 :     ReplicationSlotRelease();
     892 ECB             : 
     893 GIC          10 :     PG_RETURN_DATUM(result);
     894                 : }
     895 ECB             : 
     896                 : /* The wrappers below are all to appease opr_sanity */
     897                 : Datum
     898 CBC           4 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
     899                 : {
     900               4 :     return copy_replication_slot(fcinfo, true);
     901                 : }
     902 ECB             : 
     903                 : Datum
     904 UIC           0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
     905                 : {
     906               0 :     return copy_replication_slot(fcinfo, true);
     907 ECB             : }
     908                 : 
     909                 : Datum
     910 GIC           4 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
     911                 : {
     912               4 :     return copy_replication_slot(fcinfo, true);
     913 EUB             : }
     914                 : 
     915                 : Datum
     916 GIC           2 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
     917                 : {
     918               2 :     return copy_replication_slot(fcinfo, false);
     919 ECB             : }
     920                 : 
     921                 : Datum
     922 GIC           4 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
     923                 : {
     924               4 :     return copy_replication_slot(fcinfo, false);
     925 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a