LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Coverage Total Hit UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 88.3 % 111 98 5 8 2 42 1 53 3 41 4
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 9 9 6 1 2 6
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * logicalfuncs.c
       4                 :  *
       5                 :  *     Support functions for using logical decoding and management of
       6                 :  *     logical replication slots via SQL.
       7                 :  *
       8                 :  *
       9                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
      10                 :  *
      11                 :  * IDENTIFICATION
      12                 :  *    src/backend/replication/logical/logicalfuncs.c
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : 
      16                 : #include "postgres.h"
      17                 : 
      18                 : #include <unistd.h>
      19                 : 
      20                 : #include "access/xact.h"
      21                 : #include "access/xlog_internal.h"
      22                 : #include "access/xlogrecovery.h"
      23                 : #include "access/xlogutils.h"
      24                 : #include "catalog/pg_type.h"
      25                 : #include "fmgr.h"
      26                 : #include "funcapi.h"
      27                 : #include "mb/pg_wchar.h"
      28                 : #include "miscadmin.h"
      29                 : #include "nodes/makefuncs.h"
      30                 : #include "replication/decode.h"
      31                 : #include "replication/logical.h"
      32                 : #include "replication/message.h"
      33                 : #include "storage/fd.h"
      34                 : #include "utils/array.h"
      35                 : #include "utils/builtins.h"
      36                 : #include "utils/inval.h"
      37                 : #include "utils/lsyscache.h"
      38                 : #include "utils/memutils.h"
      39                 : #include "utils/pg_lsn.h"
      40                 : #include "utils/regproc.h"
      41                 : #include "utils/resowner.h"
      42                 : 
      43                 : /* Private data for writing out data */
      44                 : typedef struct DecodingOutputState
      45                 : {
      46                 :     Tuplestorestate *tupstore;
      47                 :     TupleDesc   tupdesc;
      48                 :     bool        binary_output;
      49                 :     int64       returned_rows;
      50                 : } DecodingOutputState;
      51                 : 
      52                 : /*
      53                 :  * Prepare for an output plugin write.
      54                 :  */
      55                 : static void
      56 CBC      151095 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      57                 :                           bool last_write)
      58                 : {
      59          151095 :     resetStringInfo(ctx->out);
      60          151095 : }
      61                 : 
      62                 : /*
      63                 :  * Perform output plugin write into tuplestore.
      64                 :  */
      65                 : static void
      66          151095 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      67                 :                    bool last_write)
      68                 : {
      69                 :     Datum       values[3];
      70                 :     bool        nulls[3];
      71                 :     DecodingOutputState *p;
      72                 : 
      73                 :     /* SQL Datums can only be of a limited length... */
      74          151095 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      75 UBC           0 :         elog(ERROR, "too much output for sql interface");
      76                 : 
      77 CBC      151095 :     p = (DecodingOutputState *) ctx->output_writer_private;
      78                 : 
      79          151095 :     memset(nulls, 0, sizeof(nulls));
      80          151095 :     values[0] = LSNGetDatum(lsn);
      81          151095 :     values[1] = TransactionIdGetDatum(xid);
      82                 : 
      83                 :     /*
      84                 :      * Assert ctx->out is in database encoding when we're writing textual
      85                 :      * output.
      86                 :      */
      87          151095 :     if (!p->binary_output)
      88          151072 :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      89                 :                                ctx->out->data, ctx->out->len,
      90                 :                                false));
      91                 : 
      92                 :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
      93          151095 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
      94                 : 
      95          151095 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
      96          151095 :     p->returned_rows++;
      97          151095 : }
      98                 : 
      99                 : /*
     100                 :  * Helper function for the various SQL callable logical decoding functions.
     101                 :  */
     102                 : static Datum
     103             192 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     104                 : {
     105                 :     Name        name;
     106                 :     XLogRecPtr  upto_lsn;
     107                 :     int32       upto_nchanges;
     108             192 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     109                 :     MemoryContext per_query_ctx;
     110                 :     MemoryContext oldcontext;
     111                 :     XLogRecPtr  end_of_wal;
     112                 :     LogicalDecodingContext *ctx;
     113             192 :     ResourceOwner old_resowner = CurrentResourceOwner;
     114                 :     ArrayType  *arr;
     115                 :     Size        ndim;
     116             192 :     List       *options = NIL;
     117                 :     DecodingOutputState *p;
     118                 : 
     119             192 :     CheckSlotPermissions();
     120                 : 
     121             191 :     CheckLogicalDecodingRequirements();
     122                 : 
     123             191 :     if (PG_ARGISNULL(0))
     124 UBC           0 :         ereport(ERROR,
     125                 :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     126                 :                  errmsg("slot name must not be null")));
     127 CBC         191 :     name = PG_GETARG_NAME(0);
     128                 : 
     129             191 :     if (PG_ARGISNULL(1))
     130             191 :         upto_lsn = InvalidXLogRecPtr;
     131                 :     else
     132 UBC           0 :         upto_lsn = PG_GETARG_LSN(1);
     133                 : 
     134 CBC         191 :     if (PG_ARGISNULL(2))
     135             191 :         upto_nchanges = InvalidXLogRecPtr;
     136                 :     else
     137 UBC           0 :         upto_nchanges = PG_GETARG_INT32(2);
     138                 : 
     139 CBC         191 :     if (PG_ARGISNULL(3))
     140 UBC           0 :         ereport(ERROR,
     141                 :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     142                 :                  errmsg("options array must not be null")));
     143 CBC         191 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     144                 : 
     145                 :     /* state to write output to */
     146             191 :     p = palloc0(sizeof(DecodingOutputState));
     147                 : 
     148             191 :     p->binary_output = binary;
     149                 : 
     150             191 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     151             191 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     152                 : 
     153                 :     /* Deconstruct options array */
     154             191 :     ndim = ARR_NDIM(arr);
     155             191 :     if (ndim > 1)
     156                 :     {
     157 UBC           0 :         ereport(ERROR,
     158                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     159                 :                  errmsg("array must be one-dimensional")));
     160                 :     }
     161 CBC         191 :     else if (array_contains_nulls(arr))
     162                 :     {
     163 UBC           0 :         ereport(ERROR,
     164                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     165                 :                  errmsg("array must not contain nulls")));
     166                 :     }
     167 CBC         191 :     else if (ndim == 1)
     168                 :     {
     169                 :         int         nelems;
     170                 :         Datum      *datum_opts;
     171                 :         int         i;
     172                 : 
     173             169 :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     174                 : 
     175 GNC         169 :         deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
     176 ECB             : 
     177 GBC         169 :         if (nelems % 2 != 0)
     178 UIC           0 :             ereport(ERROR,
     179                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     180                 :                      errmsg("array must have even number of elements")));
     181 ECB             : 
     182 GIC         507 :         for (i = 0; i < nelems; i += 2)
     183 ECB             :         {
     184 CBC         338 :             char       *name = TextDatumGetCString(datum_opts[i]);
     185 GIC         338 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     186 ECB             : 
     187 GIC         338 :             options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
     188                 :         }
     189                 :     }
     190 ECB             : 
     191 CBC         191 :     InitMaterializedSRF(fcinfo, 0);
     192             191 :     p->tupstore = rsinfo->setResult;
     193 GIC         191 :     p->tupdesc = rsinfo->setDesc;
     194                 : 
     195                 :     /*
     196                 :      * Compute the current end-of-wal.
     197 ECB             :      */
     198 CBC         191 :     if (!RecoveryInProgress())
     199 GIC         187 :         end_of_wal = GetFlushRecPtr(NULL);
     200 ECB             :     else
     201 GIC           4 :         end_of_wal = GetXLogReplayRecPtr(NULL);
     202 ECB             : 
     203 GIC         191 :     ReplicationSlotAcquire(NameStr(*name), true);
     204 ECB             : 
     205 GIC         189 :     PG_TRY();
     206                 :     {
     207 ECB             :         /* restart at slot's confirmed_flush */
     208 GIC         373 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     209                 :                                     options,
     210 ECB             :                                     false,
     211 GIC         189 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     212                 :                                                .segment_open = wal_segment_open,
     213                 :                                                .segment_close = wal_segment_close),
     214                 :                                     LogicalOutputPrepareWrite,
     215                 :                                     LogicalOutputWrite, NULL);
     216 ECB             : 
     217 CBC         184 :         MemoryContextSwitchTo(oldcontext);
     218                 : 
     219                 :         /*
     220                 :          * Check whether the output plugin writes textual output if that's
     221                 :          * what we need.
     222                 :          */
     223 GIC         184 :         if (!binary &&
     224 CBC         176 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     225 GIC           1 :             ereport(ERROR,
     226                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     227 ECB             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     228                 :                             NameStr(MyReplicationSlot->data.plugin),
     229                 :                             format_procedure(fcinfo->flinfo->fn_oid))));
     230                 : 
     231 GIC         183 :         ctx->output_writer_private = p;
     232                 : 
     233 ECB             :         /*
     234                 :          * Decoding of WAL must start at restart_lsn so that the entirety of
     235                 :          * xacts that committed after the slot's confirmed_flush can be
     236                 :          * accumulated into reorder buffers.
     237 EUB             :          */
     238 GIC         183 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
     239                 : 
     240                 :         /* invalidate non-timetravel entries */
     241             183 :         InvalidateSystemCaches();
     242                 : 
     243 ECB             :         /* Decode until we run out of records */
     244 CBC     1642232 :         while (ctx->reader->EndRecPtr < end_of_wal)
     245                 :         {
     246                 :             XLogRecord *record;
     247         1642049 :             char       *errm = NULL;
     248 EUB             : 
     249 GBC     1642049 :             record = XLogReadRecord(ctx->reader, &errm);
     250 CBC     1642049 :             if (errm)
     251 UBC           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
     252 EUB             : 
     253 ECB             :             /*
     254                 :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     255                 :              * store the description into our tuplestore.
     256                 :              */
     257 GIC     1642049 :             if (record != NULL)
     258         1642049 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     259                 : 
     260                 :             /* check limits */
     261 CBC     1642049 :             if (upto_lsn != InvalidXLogRecPtr &&
     262 UIC           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     263               0 :                 break;
     264 GIC     1642049 :             if (upto_nchanges != 0 &&
     265 UIC           0 :                 upto_nchanges <= p->returned_rows)
     266               0 :                 break;
     267 CBC     1642049 :             CHECK_FOR_INTERRUPTS();
     268                 :         }
     269 ECB             : 
     270                 :         /*
     271                 :          * Logical decoding could have clobbered CurrentResourceOwner during
     272                 :          * transaction management, so restore the executor's value.  (This is
     273                 :          * a kluge, but it's not worth cleaning up right now.)
     274                 :          */
     275 GIC         183 :         CurrentResourceOwner = old_resowner;
     276                 : 
     277                 :         /*
     278                 :          * Next time, start where we left off. (Hunting things, the family
     279                 :          * business..)
     280                 :          */
     281             183 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     282                 :         {
     283             166 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     284 ECB             : 
     285                 :             /*
     286                 :              * If only the confirmed_flush_lsn has changed the slot won't get
     287                 :              * marked as dirty by the above. Callers on the walsender
     288                 :              * interface are expected to keep track of their own progress and
     289                 :              * don't need it written out. But SQL-interface users cannot
     290                 :              * specify their own start positions and it's harder for them to
     291                 :              * keep track of their progress, so we should make more of an
     292                 :              * effort to save it for them.
     293                 :              *
     294                 :              * Dirty the slot so it's written out at the next checkpoint.
     295                 :              * We'll still lose its position on crash, as documented, but it's
     296                 :              * better than always losing the position even on clean restart.
     297                 :              */
     298 CBC         166 :             ReplicationSlotMarkDirty();
     299                 :         }
     300 ECB             : 
     301                 :         /* free context, call shutdown callback */
     302 CBC         183 :         FreeDecodingContext(ctx);
     303                 : 
     304 GIC         183 :         ReplicationSlotRelease();
     305             183 :         InvalidateSystemCaches();
     306                 :     }
     307               6 :     PG_CATCH();
     308                 :     {
     309 ECB             :         /* clear all timetravel entries */
     310 GIC           6 :         InvalidateSystemCaches();
     311 ECB             : 
     312 GIC           6 :         PG_RE_THROW();
     313                 :     }
     314             183 :     PG_END_TRY();
     315                 : 
     316             183 :     return (Datum) 0;
     317                 : }
     318 ECB             : 
     319                 : /*
     320                 :  * SQL function returning the changestream as text, consuming the data.
     321                 :  */
     322                 : Datum
     323 GIC         168 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     324                 : {
     325             168 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     326                 : }
     327 ECB             : 
     328                 : /*
     329                 :  * SQL function returning the changestream as text, only peeking ahead.
     330                 :  */
     331                 : Datum
     332 GIC          16 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     333                 : {
     334              16 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     335                 : }
     336 ECB             : 
     337                 : /*
     338                 :  * SQL function returning the changestream in binary, consuming the data.
     339                 :  */
     340                 : Datum
     341 GIC           4 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     342                 : {
     343               4 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     344                 : }
     345                 : 
     346 ECB             : /*
     347                 :  * SQL function returning the changestream in binary, only peeking ahead.
     348                 :  */
     349                 : Datum
     350 CBC           4 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     351                 : {
     352 GIC           4 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     353 ECB             : }
     354                 : 
     355                 : 
     356                 : /*
     357                 :  * SQL function for writing logical decoding message into WAL.
     358                 :  */
     359                 : Datum
     360 GIC          22 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     361                 : {
     362 CBC          22 :     bool        transactional = PG_GETARG_BOOL(0);
     363 GIC          22 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     364              22 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     365                 :     XLogRecPtr  lsn;
     366                 : 
     367              22 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     368                 :                             transactional);
     369              22 :     PG_RETURN_LSN(lsn);
     370                 : }
     371                 : 
     372                 : Datum
     373              22 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     374                 : {
     375                 :     /* bytea and text are compatible */
     376              22 :     return pg_logical_emit_message_bytea(fcinfo);
     377                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a