LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Coverage Total Hit UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 88.3 % 111 98 5 8 2 42 1 53 3 41 4
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 9 9 6 1 2 6
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (120,180] days: 100.0 % 1 1 1
Legend: Lines: hit not hit (240..) days: 88.2 % 110 97 5 8 2 42 1 52 3 41
Function coverage date bins:
(240..) days: 60.0 % 15 9 6 1 2 6

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * logicalfuncs.c
                                  4                 :  *
                                  5                 :  *     Support functions for using logical decoding and management of
                                  6                 :  *     logical replication slots via SQL.
                                  7                 :  *
                                  8                 :  *
                                  9                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
                                 10                 :  *
                                 11                 :  * IDENTIFICATION
                                 12                 :  *    src/backend/replication/logical/logicalfuncs.c
                                 13                 :  *-------------------------------------------------------------------------
                                 14                 :  */
                                 15                 : 
                                 16                 : #include "postgres.h"
                                 17                 : 
                                 18                 : #include <unistd.h>
                                 19                 : 
                                 20                 : #include "access/xact.h"
                                 21                 : #include "access/xlog_internal.h"
                                 22                 : #include "access/xlogrecovery.h"
                                 23                 : #include "access/xlogutils.h"
                                 24                 : #include "catalog/pg_type.h"
                                 25                 : #include "fmgr.h"
                                 26                 : #include "funcapi.h"
                                 27                 : #include "mb/pg_wchar.h"
                                 28                 : #include "miscadmin.h"
                                 29                 : #include "nodes/makefuncs.h"
                                 30                 : #include "replication/decode.h"
                                 31                 : #include "replication/logical.h"
                                 32                 : #include "replication/message.h"
                                 33                 : #include "storage/fd.h"
                                 34                 : #include "utils/array.h"
                                 35                 : #include "utils/builtins.h"
                                 36                 : #include "utils/inval.h"
                                 37                 : #include "utils/lsyscache.h"
                                 38                 : #include "utils/memutils.h"
                                 39                 : #include "utils/pg_lsn.h"
                                 40                 : #include "utils/regproc.h"
                                 41                 : #include "utils/resowner.h"
                                 42                 : 
                                 43                 : /* Private data for writing out data */
                                 44                 : typedef struct DecodingOutputState
                                 45                 : {
                                 46                 :     Tuplestorestate *tupstore;
                                 47                 :     TupleDesc   tupdesc;
                                 48                 :     bool        binary_output;
                                 49                 :     int64       returned_rows;
                                 50                 : } DecodingOutputState;
                                 51                 : 
                                 52                 : /*
                                 53                 :  * Prepare for an output plugin write.
                                 54                 :  */
                                 55                 : static void
 3324 rhaas                      56 CBC      151095 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
                                 57                 :                           bool last_write)
                                 58                 : {
                                 59          151095 :     resetStringInfo(ctx->out);
                                 60          151095 : }
                                 61                 : 
                                 62                 : /*
                                 63                 :  * Perform output plugin write into tuplestore.
                                 64                 :  */
                                 65                 : static void
                                 66          151095 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
                                 67                 :                    bool last_write)
                                 68                 : {
                                 69                 :     Datum       values[3];
                                 70                 :     bool        nulls[3];
                                 71                 :     DecodingOutputState *p;
                                 72                 : 
                                 73                 :     /* SQL Datums can only be of a limited length... */
                                 74          151095 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
 3324 rhaas                      75 UBC           0 :         elog(ERROR, "too much output for sql interface");
                                 76                 : 
 3324 rhaas                      77 CBC      151095 :     p = (DecodingOutputState *) ctx->output_writer_private;
                                 78                 : 
                                 79          151095 :     memset(nulls, 0, sizeof(nulls));
                                 80          151095 :     values[0] = LSNGetDatum(lsn);
                                 81          151095 :     values[1] = TransactionIdGetDatum(xid);
                                 82                 : 
                                 83                 :     /*
                                 84                 :      * Assert ctx->out is in database encoding when we're writing textual
                                 85                 :      * output.
                                 86                 :      */
                                 87          151095 :     if (!p->binary_output)
                                 88          151072 :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
                                 89                 :                                ctx->out->data, ctx->out->len,
                                 90                 :                                false));
                                 91                 : 
                                 92                 :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
 1165 alvherre                   93          151095 :     values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
                                 94                 : 
 3324 rhaas                      95          151095 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
                                 96          151095 :     p->returned_rows++;
                                 97          151095 : }
                                 98                 : 
                                 99                 : /*
                                100                 :  * Helper function for the various SQL callable logical decoding functions.
                                101                 :  */
                                102                 : static Datum
                                103             192 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
                                104                 : {
                                105                 :     Name        name;
                                106                 :     XLogRecPtr  upto_lsn;
                                107                 :     int32       upto_nchanges;
                                108             192 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                                109                 :     MemoryContext per_query_ctx;
                                110                 :     MemoryContext oldcontext;
                                111                 :     XLogRecPtr  end_of_wal;
                                112                 :     LogicalDecodingContext *ctx;
                                113             192 :     ResourceOwner old_resowner = CurrentResourceOwner;
                                114                 :     ArrayType  *arr;
                                115                 :     Size        ndim;
                                116             192 :     List       *options = NIL;
                                117                 :     DecodingOutputState *p;
                                118                 : 
  572 michael                   119             192 :     CheckSlotPermissions();
                                120                 : 
 2647 tgl                       121             191 :     CheckLogicalDecodingRequirements();
                                122                 : 
                                123             191 :     if (PG_ARGISNULL(0))
 2647 tgl                       124 UBC           0 :         ereport(ERROR,
                                125                 :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
                                126                 :                  errmsg("slot name must not be null")));
 2647 tgl                       127 CBC         191 :     name = PG_GETARG_NAME(0);
                                128                 : 
 3324 rhaas                     129             191 :     if (PG_ARGISNULL(1))
                                130             191 :         upto_lsn = InvalidXLogRecPtr;
                                131                 :     else
 3324 rhaas                     132 UBC           0 :         upto_lsn = PG_GETARG_LSN(1);
                                133                 : 
 3324 rhaas                     134 CBC         191 :     if (PG_ARGISNULL(2))
                                135             191 :         upto_nchanges = InvalidXLogRecPtr;
                                136                 :     else
 3324 rhaas                     137 UBC           0 :         upto_nchanges = PG_GETARG_INT32(2);
                                138                 : 
 2647 tgl                       139 CBC         191 :     if (PG_ARGISNULL(3))
 2647 tgl                       140 UBC           0 :         ereport(ERROR,
                                141                 :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
                                142                 :                  errmsg("options array must not be null")));
 2647 tgl                       143 CBC         191 :     arr = PG_GETARG_ARRAYTYPE_P(3);
                                144                 : 
                                145                 :     /* state to write output to */
 3324 rhaas                     146             191 :     p = palloc0(sizeof(DecodingOutputState));
                                147                 : 
                                148             191 :     p->binary_output = binary;
                                149                 : 
                                150             191 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
                                151             191 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
                                152                 : 
                                153                 :     /* Deconstruct options array */
 2647 tgl                       154             191 :     ndim = ARR_NDIM(arr);
 3324 rhaas                     155             191 :     if (ndim > 1)
                                156                 :     {
 3324 rhaas                     157 UBC           0 :         ereport(ERROR,
                                158                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                159                 :                  errmsg("array must be one-dimensional")));
                                160                 :     }
 3324 rhaas                     161 CBC         191 :     else if (array_contains_nulls(arr))
                                162                 :     {
 3324 rhaas                     163 UBC           0 :         ereport(ERROR,
                                164                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                165                 :                  errmsg("array must not contain nulls")));
                                166                 :     }
 3324 rhaas                     167 CBC         191 :     else if (ndim == 1)
                                168                 :     {
                                169                 :         int         nelems;
                                170                 :         Datum      *datum_opts;
                                171                 :         int         i;
                                172                 : 
                                173             169 :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
                                174                 : 
  282 peter                     175 GNC         169 :         deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
 3324 rhaas                     176 ECB             : 
 3324 rhaas                     177 GBC         169 :         if (nelems % 2 != 0)
 3324 rhaas                     178 UIC           0 :             ereport(ERROR,
                                179                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                180                 :                      errmsg("array must have even number of elements")));
 3324 rhaas                     181 ECB             : 
 3324 rhaas                     182 GIC         507 :         for (i = 0; i < nelems; i += 2)
 3324 rhaas                     183 ECB             :         {
 3324 rhaas                     184 CBC         338 :             char       *name = TextDatumGetCString(datum_opts[i]);
 3324 rhaas                     185 GIC         338 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
 3324 rhaas                     186 ECB             : 
 2406 peter_e                   187 GIC         338 :             options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
                                188                 :         }
                                189                 :     }
 3324 rhaas                     190 ECB             : 
  173 michael                   191 CBC         191 :     InitMaterializedSRF(fcinfo, 0);
  398                           192             191 :     p->tupstore = rsinfo->setResult;
  398 michael                   193 GIC         191 :     p->tupdesc = rsinfo->setDesc;
                                194                 : 
                                195                 :     /*
                                196                 :      * Compute the current end-of-wal.
 2209 simon                     197 ECB             :      */
 2531 alvherre                  198 CBC         191 :     if (!RecoveryInProgress())
  520 rhaas                     199 GIC         187 :         end_of_wal = GetFlushRecPtr(NULL);
 2531 alvherre                  200 ECB             :     else
  520 rhaas                     201 GIC           4 :         end_of_wal = GetXLogReplayRecPtr(NULL);
 2531 alvherre                  202 ECB             : 
  667 alvherre                  203 GIC         191 :     ReplicationSlotAcquire(NameStr(*name), true);
 3324 rhaas                     204 ECB             : 
 3324 rhaas                     205 GIC         189 :     PG_TRY();
                                206                 :     {
 2533 alvherre                  207 ECB             :         /* restart at slot's confirmed_flush */
 3324 rhaas                     208 GIC         373 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                209                 :                                     options,
 1908 simon                     210 ECB             :                                     false,
  699 tmunro                    211 GIC         189 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
                                212                 :                                                .segment_open = wal_segment_open,
                                213                 :                                                .segment_close = wal_segment_close),
                                214                 :                                     LogicalOutputPrepareWrite,
                                215                 :                                     LogicalOutputWrite, NULL);
 3324 rhaas                     216 ECB             : 
 3324 rhaas                     217 CBC         184 :         MemoryContextSwitchTo(oldcontext);
                                218                 : 
                                219                 :         /*
                                220                 :          * Check whether the output plugin writes textual output if that's
                                221                 :          * what we need.
                                222                 :          */
 3324 rhaas                     223 GIC         184 :         if (!binary &&
 2878 bruce                     224 CBC         176 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
 3324 rhaas                     225 GIC           1 :             ereport(ERROR,
                                226                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 2720 peter_e                   227 ECB             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
                                228                 :                             NameStr(MyReplicationSlot->data.plugin),
                                229                 :                             format_procedure(fcinfo->flinfo->fn_oid))));
 3324 rhaas                     230                 : 
 3324 rhaas                     231 GIC         183 :         ctx->output_writer_private = p;
                                232                 : 
 2566 alvherre                  233 ECB             :         /*
                                234                 :          * Decoding of WAL must start at restart_lsn so that the entirety of
 2533                           235                 :          * xacts that committed after the slot's confirmed_flush can be
                                236                 :          * accumulated into reorder buffers.
 2566 alvherre                  237 EUB             :          */
 1169 heikki.linnakangas        238 GIC         183 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
                                239                 : 
                                240                 :         /* invalidate non-timetravel entries */
 3324 rhaas                     241             183 :         InvalidateSystemCaches();
                                242                 : 
 2209 simon                     243 ECB             :         /* Decode until we run out of records */
 1169 heikki.linnakangas        244 CBC     1642232 :         while (ctx->reader->EndRecPtr < end_of_wal)
                                245                 :         {
                                246                 :             XLogRecord *record;
 3324 rhaas                     247         1642049 :             char       *errm = NULL;
 3324 rhaas                     248 EUB             : 
  699 tmunro                    249 GBC     1642049 :             record = XLogReadRecord(ctx->reader, &errm);
 3324 rhaas                     250 CBC     1642049 :             if (errm)
  515 michael                   251 UBC           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
 3324 rhaas                     252 EUB             : 
 3324 rhaas                     253 ECB             :             /*
                                254                 :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
                                255                 :              * store the description into our tuplestore.
                                256                 :              */
 3324 rhaas                     257 GIC     1642049 :             if (record != NULL)
 3062 heikki.linnakangas        258         1642049 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
                                259                 : 
                                260                 :             /* check limits */
 3324 rhaas                     261 CBC     1642049 :             if (upto_lsn != InvalidXLogRecPtr &&
 3324 rhaas                     262 UIC           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
                                263               0 :                 break;
 3324 rhaas                     264 GIC     1642049 :             if (upto_nchanges != 0 &&
 3324 rhaas                     265 UIC           0 :                 upto_nchanges <= p->returned_rows)
                                266               0 :                 break;
 3206 andres                    267 CBC     1642049 :             CHECK_FOR_INTERRUPTS();
                                268                 :         }
 2647 tgl                       269 ECB             : 
                                270                 :         /*
                                271                 :          * Logical decoding could have clobbered CurrentResourceOwner during
                                272                 :          * transaction management, so restore the executor's value.  (This is
                                273                 :          * a kluge, but it's not worth cleaning up right now.)
                                274                 :          */
 2647 tgl                       275 GIC         183 :         CurrentResourceOwner = old_resowner;
                                276                 : 
                                277                 :         /*
                                278                 :          * Next time, start where we left off. (Hunting things, the family
                                279                 :          * business..)
                                280                 :          */
                                281             183 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
                                282                 :         {
                                283             166 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
 2153 bruce                     284 ECB             : 
                                285                 :             /*
                                286                 :              * If only the confirmed_flush_lsn has changed the slot won't get
                                287                 :              * marked as dirty by the above. Callers on the walsender
                                288                 :              * interface are expected to keep track of their own progress and
                                289                 :              * don't need it written out. But SQL-interface users cannot
                                290                 :              * specify their own start positions and it's harder for them to
                                291                 :              * keep track of their progress, so we should make more of an
                                292                 :              * effort to save it for them.
 2407 simon                     293                 :              *
                                294                 :              * Dirty the slot so it's written out at the next checkpoint.
                                295                 :              * We'll still lose its position on crash, as documented, but it's
 2153 bruce                     296                 :              * better than always losing the position even on clean restart.
                                297                 :              */
 2407 simon                     298 CBC         166 :             ReplicationSlotMarkDirty();
                                299                 :         }
 2647 tgl                       300 ECB             : 
                                301                 :         /* free context, call shutdown callback */
 2647 tgl                       302 CBC         183 :         FreeDecodingContext(ctx);
                                303                 : 
 2647 tgl                       304 GIC         183 :         ReplicationSlotRelease();
                                305             183 :         InvalidateSystemCaches();
                                306                 :     }
 3324 rhaas                     307               6 :     PG_CATCH();
                                308                 :     {
 3324 rhaas                     309 ECB             :         /* clear all timetravel entries */
 3324 rhaas                     310 GIC           6 :         InvalidateSystemCaches();
 3324 rhaas                     311 ECB             : 
 3324 rhaas                     312 GIC           6 :         PG_RE_THROW();
                                313                 :     }
                                314             183 :     PG_END_TRY();
                                315                 : 
                                316             183 :     return (Datum) 0;
                                317                 : }
 3324 rhaas                     318 ECB             : 
                                319                 : /*
                                320                 :  * SQL function returning the changestream as text, consuming the data.
                                321                 :  */
                                322                 : Datum
 3324 rhaas                     323 GIC         168 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
                                324                 : {
 2647 tgl                       325             168 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
                                326                 : }
 3324 rhaas                     327 ECB             : 
                                328                 : /*
                                329                 :  * SQL function returning the changestream as text, only peeking ahead.
                                330                 :  */
                                331                 : Datum
 3324 rhaas                     332 GIC          16 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
                                333                 : {
 2647 tgl                       334              16 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
                                335                 : }
 3324 rhaas                     336 ECB             : 
                                337                 : /*
                                338                 :  * SQL function returning the changestream in binary, consuming the data.
                                339                 :  */
                                340                 : Datum
 3324 rhaas                     341 GIC           4 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
                                342                 : {
 2647 tgl                       343               4 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
                                344                 : }
                                345                 : 
 3324 rhaas                     346 ECB             : /*
                                347                 :  * SQL function returning the changestream in binary, only peeking ahead.
                                348                 :  */
                                349                 : Datum
 3324 rhaas                     350 CBC           4 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
                                351                 : {
 2647 tgl                       352 GIC           4 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
 3324 rhaas                     353 ECB             : }
                                354                 : 
 2559 simon                     355                 : 
                                356                 : /*
                                357                 :  * SQL function for writing logical decoding message into WAL.
                                358                 :  */
                                359                 : Datum
 2559 simon                     360 GIC          22 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
                                361                 : {
 2559 simon                     362 CBC          22 :     bool        transactional = PG_GETARG_BOOL(0);
 2559 simon                     363 GIC          22 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
                                364              22 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
                                365                 :     XLogRecPtr  lsn;
                                366                 : 
                                367              22 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
                                368                 :                             transactional);
                                369              22 :     PG_RETURN_LSN(lsn);
                                370                 : }
                                371                 : 
                                372                 : Datum
                                373              22 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
                                374                 : {
                                375                 :     /* bytea and text are compatible */
                                376              22 :     return pg_logical_emit_message_bytea(fcinfo);
                                377                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a