LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 91.2 % 510 465 6 10 29 9 68 12 376 7 78 5
Current Date: 2023-04-08 15:15:32 Functions: 97.8 % 46 45 1 11 2 32 12
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * proto.c
       4                 :  *      logical replication protocol functions
       5                 :  *
       6                 :  * Copyright (c) 2015-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * IDENTIFICATION
       9                 :  *      src/backend/replication/logical/proto.c
      10                 :  *
      11                 :  *-------------------------------------------------------------------------
      12                 :  */
      13                 : #include "postgres.h"
      14                 : 
      15                 : #include "access/sysattr.h"
      16                 : #include "catalog/pg_namespace.h"
      17                 : #include "catalog/pg_type.h"
      18                 : #include "libpq/pqformat.h"
      19                 : #include "replication/logicalproto.h"
      20                 : #include "utils/lsyscache.h"
      21                 : #include "utils/syscache.h"
      22                 : 
      23                 : /*
      24                 :  * Protocol message flags.
      25                 :  */
      26                 : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      27                 : 
      28                 : #define MESSAGE_TRANSACTIONAL (1<<0)
      29                 : #define TRUNCATE_CASCADE        (1<<0)
      30                 : #define TRUNCATE_RESTART_SEQS   (1<<1)
      31                 : 
      32                 : static void logicalrep_write_attrs(StringInfo out, Relation rel,
      33                 :                                    Bitmapset *columns);
      34                 : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      35                 :                                    TupleTableSlot *slot,
      36                 :                                    bool binary, Bitmapset *columns);
      37                 : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      38                 : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      39                 : 
      40                 : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      41                 : static const char *logicalrep_read_namespace(StringInfo in);
      42                 : 
      43                 : /*
      44                 :  * Check if a column is covered by a column list.
      45                 :  *
      46                 :  * Need to be careful about NULL, which is treated as a column list covering
      47                 :  * all columns.
      48                 :  */
      49                 : static bool
      50 CBC      732504 : column_in_column_list(int attnum, Bitmapset *columns)
      51                 : {
      52          732504 :     return (columns == NULL || bms_is_member(attnum, columns));
      53                 : }
      54                 : 
      55                 : 
      56                 : /*
      57                 :  * Write BEGIN to the output stream.
      58                 :  */
      59                 : void
      60             335 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      61                 : {
      62             335 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
      63                 : 
      64                 :     /* fixed fields */
      65             335 :     pq_sendint64(out, txn->final_lsn);
      66             335 :     pq_sendint64(out, txn->xact_time.commit_time);
      67             335 :     pq_sendint32(out, txn->xid);
      68             335 : }
      69                 : 
      70                 : /*
      71                 :  * Read transaction BEGIN from the stream.
      72                 :  */
      73                 : void
      74             380 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      75                 : {
      76                 :     /* read fields */
      77             380 :     begin_data->final_lsn = pq_getmsgint64(in);
      78             380 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      79 UBC           0 :         elog(ERROR, "final_lsn not set in begin message");
      80 CBC         380 :     begin_data->committime = pq_getmsgint64(in);
      81             380 :     begin_data->xid = pq_getmsgint(in, 4);
      82             380 : }
      83                 : 
      84                 : 
      85                 : /*
      86                 :  * Write COMMIT to the output stream.
      87                 :  */
      88                 : void
      89             334 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      90                 :                         XLogRecPtr commit_lsn)
      91                 : {
      92             334 :     uint8       flags = 0;
      93                 : 
      94             334 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
      95                 : 
      96                 :     /* send the flags field (unused for now) */
      97             334 :     pq_sendbyte(out, flags);
      98                 : 
      99                 :     /* send fields */
     100             334 :     pq_sendint64(out, commit_lsn);
     101             334 :     pq_sendint64(out, txn->end_lsn);
     102             334 :     pq_sendint64(out, txn->xact_time.commit_time);
     103             334 : }
     104                 : 
     105                 : /*
     106                 :  * Read transaction COMMIT from the stream.
     107                 :  */
     108                 : void
     109             355 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
     110                 : {
     111                 :     /* read flags (unused for now) */
     112             355 :     uint8       flags = pq_getmsgbyte(in);
     113                 : 
     114             355 :     if (flags != 0)
     115 UBC           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
     116                 : 
     117                 :     /* read fields */
     118 CBC         355 :     commit_data->commit_lsn = pq_getmsgint64(in);
     119             355 :     commit_data->end_lsn = pq_getmsgint64(in);
     120             355 :     commit_data->committime = pq_getmsgint64(in);
     121             355 : }
     122                 : 
     123                 : /*
     124                 :  * Write BEGIN PREPARE to the output stream.
     125                 :  */
     126                 : void
     127              18 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
     128                 : {
     129              18 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
     130                 : 
     131                 :     /* fixed fields */
     132              18 :     pq_sendint64(out, txn->final_lsn);
     133              18 :     pq_sendint64(out, txn->end_lsn);
     134              18 :     pq_sendint64(out, txn->xact_time.prepare_time);
     135              18 :     pq_sendint32(out, txn->xid);
     136                 : 
     137                 :     /* send gid */
     138              18 :     pq_sendstring(out, txn->gid);
     139              18 : }
     140                 : 
     141                 : /*
     142                 :  * Read transaction BEGIN PREPARE from the stream.
     143                 :  */
     144                 : void
     145              14 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
     146                 : {
     147                 :     /* read fields */
     148              14 :     begin_data->prepare_lsn = pq_getmsgint64(in);
     149              14 :     if (begin_data->prepare_lsn == InvalidXLogRecPtr)
     150 UBC           0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
     151 CBC          14 :     begin_data->end_lsn = pq_getmsgint64(in);
     152              14 :     if (begin_data->end_lsn == InvalidXLogRecPtr)
     153 UBC           0 :         elog(ERROR, "end_lsn not set in begin prepare message");
     154 CBC          14 :     begin_data->prepare_time = pq_getmsgint64(in);
     155              14 :     begin_data->xid = pq_getmsgint(in, 4);
     156                 : 
     157                 :     /* read gid (copy it into a pre-allocated buffer) */
     158              14 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
     159              14 : }
     160                 : 
     161                 : /*
     162                 :  * The core functionality for logicalrep_write_prepare and
     163                 :  * logicalrep_write_stream_prepare.
     164                 :  */
     165                 : static void
     166              29 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
     167                 :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
     168                 : {
     169              29 :     uint8       flags = 0;
     170                 : 
     171              29 :     pq_sendbyte(out, type);
     172                 : 
     173                 :     /*
     174                 :      * This should only ever happen for two-phase commit transactions, in
     175                 :      * which case we expect to have a valid GID.
     176                 :      */
     177              29 :     Assert(txn->gid != NULL);
     178              29 :     Assert(rbtxn_prepared(txn));
     179              29 :     Assert(TransactionIdIsValid(txn->xid));
     180                 : 
     181                 :     /* send the flags field */
     182              29 :     pq_sendbyte(out, flags);
     183                 : 
     184                 :     /* send fields */
     185              29 :     pq_sendint64(out, prepare_lsn);
     186              29 :     pq_sendint64(out, txn->end_lsn);
     187              29 :     pq_sendint64(out, txn->xact_time.prepare_time);
     188              29 :     pq_sendint32(out, txn->xid);
     189                 : 
     190                 :     /* send gid */
     191              29 :     pq_sendstring(out, txn->gid);
     192              29 : }
     193                 : 
     194                 : /*
     195                 :  * Write PREPARE to the output stream.
     196                 :  */
     197                 : void
     198              18 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
     199                 :                          XLogRecPtr prepare_lsn)
     200                 : {
     201              18 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
     202                 :                                     txn, prepare_lsn);
     203              18 : }
     204                 : 
     205                 : /*
     206                 :  * The core functionality for logicalrep_read_prepare and
     207                 :  * logicalrep_read_stream_prepare.
     208                 :  */
     209                 : static void
     210              24 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
     211                 :                                LogicalRepPreparedTxnData *prepare_data)
     212                 : {
     213                 :     /* read flags */
     214              24 :     uint8       flags = pq_getmsgbyte(in);
     215                 : 
     216              24 :     if (flags != 0)
     217 UBC           0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
     218                 : 
     219                 :     /* read fields */
     220 CBC          24 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
     221              24 :     if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
     222 UBC           0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
     223 CBC          24 :     prepare_data->end_lsn = pq_getmsgint64(in);
     224              24 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     225 UBC           0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
     226 CBC          24 :     prepare_data->prepare_time = pq_getmsgint64(in);
     227              24 :     prepare_data->xid = pq_getmsgint(in, 4);
     228              24 :     if (prepare_data->xid == InvalidTransactionId)
     229 UBC           0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
     230                 : 
     231                 :     /* read gid (copy it into a pre-allocated buffer) */
     232 CBC          24 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     233              24 : }
     234                 : 
     235                 : /*
     236                 :  * Read transaction PREPARE from the stream.
     237                 :  */
     238                 : void
     239              13 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     240                 : {
     241              13 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
     242              13 : }
     243                 : 
     244                 : /*
     245                 :  * Write COMMIT PREPARED to the output stream.
     246                 :  */
     247                 : void
     248              22 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
     249                 :                                  XLogRecPtr commit_lsn)
     250                 : {
     251              22 :     uint8       flags = 0;
     252                 : 
     253              22 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
     254                 : 
     255                 :     /*
     256                 :      * This should only ever happen for two-phase commit transactions, in
     257                 :      * which case we expect to have a valid GID.
     258                 :      */
     259              22 :     Assert(txn->gid != NULL);
     260                 : 
     261                 :     /* send the flags field */
     262              22 :     pq_sendbyte(out, flags);
     263                 : 
     264                 :     /* send fields */
     265              22 :     pq_sendint64(out, commit_lsn);
     266              22 :     pq_sendint64(out, txn->end_lsn);
     267              22 :     pq_sendint64(out, txn->xact_time.commit_time);
     268              22 :     pq_sendint32(out, txn->xid);
     269                 : 
     270                 :     /* send gid */
     271              22 :     pq_sendstring(out, txn->gid);
     272              22 : }
     273                 : 
     274                 : /*
     275                 :  * Read transaction COMMIT PREPARED from the stream.
     276                 :  */
     277                 : void
     278              19 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
     279                 : {
     280                 :     /* read flags */
     281              19 :     uint8       flags = pq_getmsgbyte(in);
     282                 : 
     283              19 :     if (flags != 0)
     284 UBC           0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
     285                 : 
     286                 :     /* read fields */
     287 CBC          19 :     prepare_data->commit_lsn = pq_getmsgint64(in);
     288              19 :     if (prepare_data->commit_lsn == InvalidXLogRecPtr)
     289 UBC           0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
     290 CBC          19 :     prepare_data->end_lsn = pq_getmsgint64(in);
     291              19 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
     292 UBC           0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
     293 CBC          19 :     prepare_data->commit_time = pq_getmsgint64(in);
     294              19 :     prepare_data->xid = pq_getmsgint(in, 4);
     295                 : 
     296                 :     /* read gid (copy it into a pre-allocated buffer) */
     297              19 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
     298              19 : }
     299                 : 
     300                 : /*
     301                 :  * Write ROLLBACK PREPARED to the output stream.
     302                 :  */
     303                 : void
     304               8 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
     305                 :                                    XLogRecPtr prepare_end_lsn,
     306                 :                                    TimestampTz prepare_time)
     307                 : {
     308               8 :     uint8       flags = 0;
     309                 : 
     310               8 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
     311                 : 
     312                 :     /*
     313                 :      * This should only ever happen for two-phase commit transactions, in
     314                 :      * which case we expect to have a valid GID.
     315                 :      */
     316               8 :     Assert(txn->gid != NULL);
     317                 : 
     318                 :     /* send the flags field */
     319               8 :     pq_sendbyte(out, flags);
     320                 : 
     321                 :     /* send fields */
     322               8 :     pq_sendint64(out, prepare_end_lsn);
     323               8 :     pq_sendint64(out, txn->end_lsn);
     324               8 :     pq_sendint64(out, prepare_time);
     325               8 :     pq_sendint64(out, txn->xact_time.commit_time);
     326               8 :     pq_sendint32(out, txn->xid);
     327                 : 
     328                 :     /* send gid */
     329               8 :     pq_sendstring(out, txn->gid);
     330               8 : }
     331                 : 
     332                 : /*
     333                 :  * Read transaction ROLLBACK PREPARED from the stream.
     334                 :  */
     335                 : void
     336               5 : logicalrep_read_rollback_prepared(StringInfo in,
     337                 :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
     338                 : {
     339                 :     /* read flags */
     340               5 :     uint8       flags = pq_getmsgbyte(in);
     341                 : 
     342               5 :     if (flags != 0)
     343 UBC           0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
     344                 : 
     345                 :     /* read fields */
     346 CBC           5 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
     347               5 :     if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
     348 UBC           0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
     349 CBC           5 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
     350               5 :     if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
     351 UBC           0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
     352 CBC           5 :     rollback_data->prepare_time = pq_getmsgint64(in);
     353               5 :     rollback_data->rollback_time = pq_getmsgint64(in);
     354               5 :     rollback_data->xid = pq_getmsgint(in, 4);
     355                 : 
     356                 :     /* read gid (copy it into a pre-allocated buffer) */
     357               5 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
     358               5 : }
     359                 : 
     360                 : /*
     361                 :  * Write STREAM PREPARE to the output stream.
     362                 :  */
     363                 : void
     364              11 : logicalrep_write_stream_prepare(StringInfo out,
     365                 :                                 ReorderBufferTXN *txn,
     366                 :                                 XLogRecPtr prepare_lsn)
     367                 : {
     368              11 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
     369                 :                                     txn, prepare_lsn);
     370              11 : }
     371                 : 
     372                 : /*
     373                 :  * Read STREAM PREPARE from the stream.
     374                 :  */
     375                 : void
     376              11 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
     377                 : {
     378              11 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
     379              11 : }
     380                 : 
     381                 : /*
     382                 :  * Write ORIGIN to the output stream.
     383                 :  */
     384                 : void
     385               7 : logicalrep_write_origin(StringInfo out, const char *origin,
     386                 :                         XLogRecPtr origin_lsn)
     387                 : {
     388               7 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
     389                 : 
     390                 :     /* fixed fields */
     391               7 :     pq_sendint64(out, origin_lsn);
     392                 : 
     393                 :     /* origin string */
     394               7 :     pq_sendstring(out, origin);
     395               7 : }
     396                 : 
     397                 : /*
     398                 :  * Read ORIGIN from the output stream.
     399                 :  */
     400                 : char *
     401 UBC           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     402                 : {
     403                 :     /* fixed fields */
     404               0 :     *origin_lsn = pq_getmsgint64(in);
     405                 : 
     406                 :     /* return origin */
     407               0 :     return pstrdup(pq_getmsgstring(in));
     408                 : }
     409                 : 
     410                 : /*
     411                 :  * Write INSERT to the output stream.
     412                 :  */
     413                 : void
     414 CBC      105697 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
     415                 :                         TupleTableSlot *newslot, bool binary, Bitmapset *columns)
     416                 : {
     417          105697 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
     418                 : 
     419                 :     /* transaction ID (if not valid, we're not streaming) */
     420          105697 :     if (TransactionIdIsValid(xid))
     421          100075 :         pq_sendint32(out, xid);
     422                 : 
     423                 :     /* use Oid as relation identifier */
     424          105697 :     pq_sendint32(out, RelationGetRelid(rel));
     425                 : 
     426          105697 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     427          105697 :     logicalrep_write_tuple(out, rel, newslot, binary, columns);
     428          105697 : }
     429                 : 
     430                 : /*
     431                 :  * Read INSERT from stream.
     432                 :  *
     433                 :  * Fills the new tuple.
     434                 :  */
     435                 : LogicalRepRelId
     436           75638 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     437                 : {
     438                 :     char        action;
     439                 :     LogicalRepRelId relid;
     440                 : 
     441                 :     /* read the relation id */
     442           75638 :     relid = pq_getmsgint(in, 4);
     443                 : 
     444           75638 :     action = pq_getmsgbyte(in);
     445           75638 :     if (action != 'N')
     446 UBC           0 :         elog(ERROR, "expected new tuple but got %d",
     447                 :              action);
     448                 : 
     449 CBC       75638 :     logicalrep_read_tuple(in, newtup);
     450                 : 
     451           75638 :     return relid;
     452                 : }
     453                 : 
     454                 : /*
     455                 :  * Write UPDATE to the output stream.
     456                 :  */
     457                 : void
     458           34408 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
     459                 :                         TupleTableSlot *oldslot, TupleTableSlot *newslot,
     460                 :                         bool binary, Bitmapset *columns)
     461                 : {
     462           34408 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
     463                 : 
     464           34408 :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     465                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     466                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     467                 : 
     468                 :     /* transaction ID (if not valid, we're not streaming) */
     469           34408 :     if (TransactionIdIsValid(xid))
     470           34226 :         pq_sendint32(out, xid);
     471                 : 
     472                 :     /* use Oid as relation identifier */
     473           34408 :     pq_sendint32(out, RelationGetRelid(rel));
     474                 : 
     475           34408 :     if (oldslot != NULL)
     476                 :     {
     477             107 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     478              47 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     479                 :         else
     480              60 :             pq_sendbyte(out, 'K');  /* old key follows */
     481             107 :         logicalrep_write_tuple(out, rel, oldslot, binary, columns);
     482                 :     }
     483                 : 
     484           34408 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     485           34408 :     logicalrep_write_tuple(out, rel, newslot, binary, columns);
     486           34408 : }
     487                 : 
     488                 : /*
     489                 :  * Read UPDATE from stream.
     490                 :  */
     491                 : LogicalRepRelId
     492           31917 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     493                 :                        LogicalRepTupleData *oldtup,
     494                 :                        LogicalRepTupleData *newtup)
     495                 : {
     496                 :     char        action;
     497                 :     LogicalRepRelId relid;
     498                 : 
     499                 :     /* read the relation id */
     500           31917 :     relid = pq_getmsgint(in, 4);
     501                 : 
     502                 :     /* read and verify action */
     503           31917 :     action = pq_getmsgbyte(in);
     504           31917 :     if (action != 'K' && action != 'O' && action != 'N')
     505 UBC           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     506                 :              action);
     507                 : 
     508                 :     /* check for old tuple */
     509 CBC       31917 :     if (action == 'K' || action == 'O')
     510                 :     {
     511             118 :         logicalrep_read_tuple(in, oldtup);
     512             118 :         *has_oldtuple = true;
     513                 : 
     514             118 :         action = pq_getmsgbyte(in);
     515                 :     }
     516                 :     else
     517           31799 :         *has_oldtuple = false;
     518                 : 
     519                 :     /* check for new  tuple */
     520           31917 :     if (action != 'N')
     521 UBC           0 :         elog(ERROR, "expected action 'N', got %c",
     522                 :              action);
     523                 : 
     524 CBC       31917 :     logicalrep_read_tuple(in, newtup);
     525                 : 
     526           31917 :     return relid;
     527                 : }
     528                 : 
     529                 : /*
     530                 :  * Write DELETE to the output stream.
     531                 :  */
     532                 : void
     533           41853 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
     534                 :                         TupleTableSlot *oldslot, bool binary,
     535                 :                         Bitmapset *columns)
     536                 : {
     537           41853 :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     538                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     539                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     540                 : 
     541           41853 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
     542                 : 
     543                 :     /* transaction ID (if not valid, we're not streaming) */
     544           41853 :     if (TransactionIdIsValid(xid))
     545           41620 :         pq_sendint32(out, xid);
     546                 : 
     547                 :     /* use Oid as relation identifier */
     548           41853 :     pq_sendint32(out, RelationGetRelid(rel));
     549                 : 
     550           41853 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     551             118 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     552                 :     else
     553           41735 :         pq_sendbyte(out, 'K');  /* old key follows */
     554                 : 
     555           41853 :     logicalrep_write_tuple(out, rel, oldslot, binary, columns);
     556           41853 : }
     557                 : 
     558                 : /*
     559                 :  * Read DELETE from stream.
     560                 :  *
     561                 :  * Fills the old tuple.
     562                 :  */
     563                 : LogicalRepRelId
     564           40303 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     565                 : {
     566                 :     char        action;
     567                 :     LogicalRepRelId relid;
     568                 : 
     569                 :     /* read the relation id */
     570           40303 :     relid = pq_getmsgint(in, 4);
     571                 : 
     572                 :     /* read and verify action */
     573           40303 :     action = pq_getmsgbyte(in);
     574           40303 :     if (action != 'K' && action != 'O')
     575 UBC           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     576                 : 
     577 CBC       40303 :     logicalrep_read_tuple(in, oldtup);
     578                 : 
     579           40303 :     return relid;
     580                 : }
     581                 : 
     582                 : /*
     583                 :  * Write TRUNCATE to the output stream.
     584                 :  */
     585                 : void
     586               7 : logicalrep_write_truncate(StringInfo out,
     587                 :                           TransactionId xid,
     588                 :                           int nrelids,
     589                 :                           Oid relids[],
     590                 :                           bool cascade, bool restart_seqs)
     591                 : {
     592                 :     int         i;
     593               7 :     uint8       flags = 0;
     594                 : 
     595               7 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
     596                 : 
     597                 :     /* transaction ID (if not valid, we're not streaming) */
     598               7 :     if (TransactionIdIsValid(xid))
     599 UBC           0 :         pq_sendint32(out, xid);
     600                 : 
     601 CBC           7 :     pq_sendint32(out, nrelids);
     602                 : 
     603                 :     /* encode and send truncate flags */
     604               7 :     if (cascade)
     605 UBC           0 :         flags |= TRUNCATE_CASCADE;
     606 CBC           7 :     if (restart_seqs)
     607 UBC           0 :         flags |= TRUNCATE_RESTART_SEQS;
     608 CBC           7 :     pq_sendint8(out, flags);
     609                 : 
     610              19 :     for (i = 0; i < nrelids; i++)
     611              12 :         pq_sendint32(out, relids[i]);
     612               7 : }
     613                 : 
     614                 : /*
     615                 :  * Read TRUNCATE from stream.
     616                 :  */
     617                 : List *
     618              17 : logicalrep_read_truncate(StringInfo in,
     619                 :                          bool *cascade, bool *restart_seqs)
     620                 : {
     621                 :     int         i;
     622                 :     int         nrelids;
     623              17 :     List       *relids = NIL;
     624                 :     uint8       flags;
     625                 : 
     626              17 :     nrelids = pq_getmsgint(in, 4);
     627                 : 
     628                 :     /* read and decode truncate flags */
     629              17 :     flags = pq_getmsgint(in, 1);
     630              17 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
     631              17 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
     632                 : 
     633              43 :     for (i = 0; i < nrelids; i++)
     634              26 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
     635                 : 
     636              17 :     return relids;
     637                 : }
     638                 : 
     639                 : /*
     640                 :  * Write MESSAGE to stream
     641                 :  */
     642                 : void
     643               5 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
     644                 :                          bool transactional, const char *prefix, Size sz,
     645                 :                          const char *message)
     646                 : {
     647               5 :     uint8       flags = 0;
     648                 : 
     649               5 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
     650                 : 
     651                 :     /* encode and send message flags */
     652               5 :     if (transactional)
     653               2 :         flags |= MESSAGE_TRANSACTIONAL;
     654                 : 
     655                 :     /* transaction ID (if not valid, we're not streaming) */
     656               5 :     if (TransactionIdIsValid(xid))
     657 UBC           0 :         pq_sendint32(out, xid);
     658                 : 
     659 CBC           5 :     pq_sendint8(out, flags);
     660               5 :     pq_sendint64(out, lsn);
     661               5 :     pq_sendstring(out, prefix);
     662               5 :     pq_sendint32(out, sz);
     663               5 :     pq_sendbytes(out, message, sz);
     664               5 : }
     665                 : 
     666                 : /*
     667                 :  * Write relation description to the output stream.
     668                 :  */
     669                 : void
     670             286 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
     671                 :                      Bitmapset *columns)
     672                 : {
     673                 :     char       *relname;
     674                 : 
     675             286 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
     676                 : 
     677                 :     /* transaction ID (if not valid, we're not streaming) */
     678             286 :     if (TransactionIdIsValid(xid))
     679              68 :         pq_sendint32(out, xid);
     680                 : 
     681                 :     /* use Oid as relation identifier */
     682             286 :     pq_sendint32(out, RelationGetRelid(rel));
     683                 : 
     684                 :     /* send qualified relation name */
     685             286 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     686             286 :     relname = RelationGetRelationName(rel);
     687             286 :     pq_sendstring(out, relname);
     688                 : 
     689                 :     /* send replica identity */
     690             286 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     691                 : 
     692                 :     /* send the attribute info */
     693             286 :     logicalrep_write_attrs(out, rel, columns);
     694             286 : }
     695                 : 
     696                 : /*
     697                 :  * Read the relation info from stream and return as LogicalRepRelation.
     698                 :  */
     699                 : LogicalRepRelation *
     700             354 : logicalrep_read_rel(StringInfo in)
     701                 : {
     702             354 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     703                 : 
     704             354 :     rel->remoteid = pq_getmsgint(in, 4);
     705                 : 
     706                 :     /* Read relation name from stream */
     707             354 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     708             354 :     rel->relname = pstrdup(pq_getmsgstring(in));
     709                 : 
     710                 :     /* Read the replica identity. */
     711             354 :     rel->replident = pq_getmsgbyte(in);
     712                 : 
     713                 :     /* Get attribute description */
     714             354 :     logicalrep_read_attrs(in, rel);
     715                 : 
     716             354 :     return rel;
     717                 : }
     718                 : 
     719                 : /*
     720                 :  * Write type info to the output stream.
     721                 :  *
     722                 :  * This function will always write base type info.
     723                 :  */
     724                 : void
     725              18 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
     726                 : {
     727              18 :     Oid         basetypoid = getBaseType(typoid);
     728                 :     HeapTuple   tup;
     729                 :     Form_pg_type typtup;
     730                 : 
     731              18 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
     732                 : 
     733                 :     /* transaction ID (if not valid, we're not streaming) */
     734              18 :     if (TransactionIdIsValid(xid))
     735 UBC           0 :         pq_sendint32(out, xid);
     736                 : 
     737 CBC          18 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     738              18 :     if (!HeapTupleIsValid(tup))
     739 UBC           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     740 CBC          18 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     741                 : 
     742                 :     /* use Oid as relation identifier */
     743              18 :     pq_sendint32(out, typoid);
     744                 : 
     745                 :     /* send qualified type name */
     746              18 :     logicalrep_write_namespace(out, typtup->typnamespace);
     747              18 :     pq_sendstring(out, NameStr(typtup->typname));
     748                 : 
     749              18 :     ReleaseSysCache(tup);
     750              18 : }
     751                 : 
     752                 : /*
     753                 :  * Read type info from the output stream.
     754                 :  */
     755                 : void
     756              18 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     757                 : {
     758              18 :     ltyp->remoteid = pq_getmsgint(in, 4);
     759                 : 
     760                 :     /* Read type name from stream */
     761              18 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     762              18 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     763              18 : }
     764                 : 
     765                 : /*
     766                 :  * Write a tuple to the outputstream, in the most efficient format possible.
     767                 :  */
     768                 : static void
     769          182065 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
     770                 :                        bool binary, Bitmapset *columns)
     771                 : {
     772                 :     TupleDesc   desc;
     773                 :     Datum      *values;
     774                 :     bool       *isnull;
     775                 :     int         i;
     776          182065 :     uint16      nliveatts = 0;
     777                 : 
     778          182065 :     desc = RelationGetDescr(rel);
     779                 : 
     780          547747 :     for (i = 0; i < desc->natts; i++)
     781                 :     {
     782          365682 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     783                 : 
     784          365682 :         if (att->attisdropped || att->attgenerated)
     785               8 :             continue;
     786                 : 
     787          365674 :         if (!column_in_column_list(att->attnum, columns))
     788             102 :             continue;
     789                 : 
     790          365572 :         nliveatts++;
     791                 :     }
     792          182065 :     pq_sendint16(out, nliveatts);
     793                 : 
     794          182065 :     slot_getallattrs(slot);
     795          182065 :     values = slot->tts_values;
     796          182065 :     isnull = slot->tts_isnull;
     797                 : 
     798                 :     /* Write the values */
     799          547747 :     for (i = 0; i < desc->natts; i++)
     800                 :     {
     801                 :         HeapTuple   typtup;
     802                 :         Form_pg_type typclass;
     803          365682 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     804                 : 
     805          365682 :         if (att->attisdropped || att->attgenerated)
     806               8 :             continue;
     807                 : 
     808          365674 :         if (!column_in_column_list(att->attnum, columns))
     809             102 :             continue;
     810                 : 
     811          365572 :         if (isnull[i])
     812                 :         {
     813           51867 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
     814           51867 :             continue;
     815                 :         }
     816                 : 
     817          313705 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     818                 :         {
     819                 :             /*
     820                 :              * Unchanged toasted datum.  (Note that we don't promise to detect
     821                 :              * unchanged data in general; this is just a cheap check to avoid
     822                 :              * sending large values unnecessarily.)
     823                 :              */
     824               3 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
     825               3 :             continue;
     826                 :         }
     827                 : 
     828          313702 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     829          313702 :         if (!HeapTupleIsValid(typtup))
     830 UBC           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     831 CBC      313702 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     832                 : 
     833                 :         /*
     834                 :          * Send in binary if requested and type has suitable send function.
     835                 :          */
     836          313702 :         if (binary && OidIsValid(typclass->typsend))
     837          115045 :         {
     838                 :             bytea      *outputbytes;
     839                 :             int         len;
     840                 : 
     841          115045 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
     842          115045 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
     843          115045 :             len = VARSIZE(outputbytes) - VARHDRSZ;
     844          115045 :             pq_sendint(out, len, 4);    /* length */
     845          115045 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
     846          115045 :             pfree(outputbytes);
     847                 :         }
     848                 :         else
     849                 :         {
     850                 :             char       *outputstr;
     851                 : 
     852          198657 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
     853          198657 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     854          198657 :             pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
     855          198657 :             pfree(outputstr);
     856                 :         }
     857                 : 
     858          313702 :         ReleaseSysCache(typtup);
     859                 :     }
     860          182065 : }
     861                 : 
     862                 : /*
     863                 :  * Read tuple in logical replication format from stream.
     864                 :  */
     865                 : static void
     866          147976 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     867                 : {
     868                 :     int         i;
     869                 :     int         natts;
     870                 : 
     871                 :     /* Get number of attributes */
     872          147976 :     natts = pq_getmsgint(in, 2);
     873                 : 
     874                 :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
     875          147976 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
     876          147976 :     tuple->colstatus = (char *) palloc(natts * sizeof(char));
     877          147976 :     tuple->ncols = natts;
     878                 : 
     879                 :     /* Read the data */
     880          450450 :     for (i = 0; i < natts; i++)
     881                 :     {
     882                 :         char        kind;
     883                 :         int         len;
     884          302474 :         StringInfo  value = &tuple->colvalues[i];
     885                 : 
     886          302474 :         kind = pq_getmsgbyte(in);
     887          302474 :         tuple->colstatus[i] = kind;
     888                 : 
     889          302474 :         switch (kind)
     890                 :         {
     891           50355 :             case LOGICALREP_COLUMN_NULL:
     892                 :                 /* nothing more to do */
     893           50355 :                 break;
     894               3 :             case LOGICALREP_COLUMN_UNCHANGED:
     895                 :                 /* we don't receive the value of an unchanged column */
     896               3 :                 break;
     897          252116 :             case LOGICALREP_COLUMN_TEXT:
     898 ECB             :             case LOGICALREP_COLUMN_BINARY:
     899 GIC      252116 :                 len = pq_getmsgint(in, 4);  /* read length */
     900                 : 
     901 ECB             :                 /* and data */
     902 CBC      252116 :                 value->data = palloc(len + 1);
     903          252116 :                 pq_copymsgbytes(in, value->data, len);
     904                 : 
     905                 :                 /*
     906                 :                  * Not strictly necessary for LOGICALREP_COLUMN_BINARY, but
     907                 :                  * per StringInfo practice.
     908                 :                  */
     909 GBC      252116 :                 value->data[len] = '\0';
     910                 : 
     911 EUB             :                 /* make StringInfo fully valid */
     912 GIC      252116 :                 value->len = len;
     913          252116 :                 value->cursor = 0;
     914 CBC      252116 :                 value->maxlen = len;
     915 GIC      252116 :                 break;
     916 UIC           0 :             default:
     917               0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     918                 :         }
     919                 :     }
     920 CBC      147976 : }
     921                 : 
     922                 : /*
     923                 :  * Write relation attribute metadata to the stream.
     924 ECB             :  */
     925                 : static void
     926 GIC         286 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
     927                 : {
     928 ECB             :     TupleDesc   desc;
     929                 :     int         i;
     930 GIC         286 :     uint16      nliveatts = 0;
     931 CBC         286 :     Bitmapset  *idattrs = NULL;
     932                 :     bool        replidentfull;
     933 ECB             : 
     934 GIC         286 :     desc = RelationGetDescr(rel);
     935 ECB             : 
     936                 :     /* send number of live attributes */
     937 GIC         869 :     for (i = 0; i < desc->natts; i++)
     938 ECB             :     {
     939 CBC         583 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     940                 : 
     941             583 :         if (att->attisdropped || att->attgenerated)
     942 GIC           5 :             continue;
     943 ECB             : 
     944 GIC         578 :         if (!column_in_column_list(att->attnum, columns))
     945              53 :             continue;
     946 ECB             : 
     947 CBC         525 :         nliveatts++;
     948 ECB             :     }
     949 GIC         286 :     pq_sendint16(out, nliveatts);
     950                 : 
     951 ECB             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     952 GIC         286 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     953 CBC         286 :     if (!replidentfull)
     954             253 :         idattrs = RelationGetIdentityKeyBitmap(rel);
     955                 : 
     956 ECB             :     /* send the attributes */
     957 CBC         869 :     for (i = 0; i < desc->natts; i++)
     958                 :     {
     959             583 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     960             583 :         uint8       flags = 0;
     961                 : 
     962 GIC         583 :         if (att->attisdropped || att->attgenerated)
     963 CBC           5 :             continue;
     964 ECB             : 
     965 GIC         578 :         if (!column_in_column_list(att->attnum, columns))
     966 CBC          53 :             continue;
     967                 : 
     968 ECB             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     969 GIC        1004 :         if (replidentfull ||
     970             479 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     971 ECB             :                           idattrs))
     972 GIC         241 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     973                 : 
     974 CBC         525 :         pq_sendbyte(out, flags);
     975                 : 
     976                 :         /* attribute name */
     977             525 :         pq_sendstring(out, NameStr(att->attname));
     978                 : 
     979                 :         /* attribute type id */
     980             525 :         pq_sendint32(out, (int) att->atttypid);
     981 ECB             : 
     982                 :         /* attribute mode */
     983 GIC         525 :         pq_sendint32(out, att->atttypmod);
     984                 :     }
     985                 : 
     986             286 :     bms_free(idattrs);
     987 CBC         286 : }
     988                 : 
     989                 : /*
     990                 :  * Read relation attribute metadata from the stream.
     991                 :  */
     992                 : static void
     993             354 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     994                 : {
     995 ECB             :     int         i;
     996                 :     int         natts;
     997                 :     char      **attnames;
     998                 :     Oid        *atttyps;
     999 GIC         354 :     Bitmapset  *attkeys = NULL;
    1000 ECB             : 
    1001 GIC         354 :     natts = pq_getmsgint(in, 2);
    1002             354 :     attnames = palloc(natts * sizeof(char *));
    1003             354 :     atttyps = palloc(natts * sizeof(Oid));
    1004                 : 
    1005 ECB             :     /* read the attributes */
    1006 CBC        1005 :     for (i = 0; i < natts; i++)
    1007 ECB             :     {
    1008                 :         uint8       flags;
    1009                 : 
    1010                 :         /* Check for replica identity column */
    1011 GIC         651 :         flags = pq_getmsgbyte(in);
    1012             651 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
    1013 CBC         298 :             attkeys = bms_add_member(attkeys, i);
    1014                 : 
    1015                 :         /* attribute name */
    1016             651 :         attnames[i] = pstrdup(pq_getmsgstring(in));
    1017                 : 
    1018                 :         /* attribute type id */
    1019             651 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
    1020 ECB             : 
    1021                 :         /* we ignore attribute mode for now */
    1022 CBC         651 :         (void) pq_getmsgint(in, 4);
    1023 ECB             :     }
    1024                 : 
    1025 GIC         354 :     rel->attnames = attnames;
    1026             354 :     rel->atttyps = atttyps;
    1027             354 :     rel->attkeys = attkeys;
    1028             354 :     rel->natts = natts;
    1029 CBC         354 : }
    1030                 : 
    1031 ECB             : /*
    1032                 :  * Write the namespace name or empty string for pg_catalog (to save space).
    1033                 :  */
    1034                 : static void
    1035 CBC         304 : logicalrep_write_namespace(StringInfo out, Oid nspid)
    1036                 : {
    1037             304 :     if (nspid == PG_CATALOG_NAMESPACE)
    1038 GBC           1 :         pq_sendbyte(out, '\0');
    1039                 :     else
    1040                 :     {
    1041 CBC         303 :         char       *nspname = get_namespace_name(nspid);
    1042                 : 
    1043             303 :         if (nspname == NULL)
    1044 UIC           0 :             elog(ERROR, "cache lookup failed for namespace %u",
    1045                 :                  nspid);
    1046                 : 
    1047 GIC         303 :         pq_sendstring(out, nspname);
    1048                 :     }
    1049 CBC         304 : }
    1050                 : 
    1051 ECB             : /*
    1052                 :  * Read the namespace name while treating empty string as pg_catalog.
    1053                 :  */
    1054                 : static const char *
    1055 GIC         372 : logicalrep_read_namespace(StringInfo in)
    1056 ECB             : {
    1057 GIC         372 :     const char *nspname = pq_getmsgstring(in);
    1058                 : 
    1059             372 :     if (nspname[0] == '\0')
    1060               1 :         nspname = "pg_catalog";
    1061                 : 
    1062             372 :     return nspname;
    1063 ECB             : }
    1064                 : 
    1065                 : /*
    1066                 :  * Write the information for the start stream message to the output stream.
    1067                 :  */
    1068                 : void
    1069 GIC         615 : logicalrep_write_stream_start(StringInfo out,
    1070                 :                               TransactionId xid, bool first_segment)
    1071 ECB             : {
    1072 GIC         615 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
    1073                 : 
    1074 CBC         615 :     Assert(TransactionIdIsValid(xid));
    1075 ECB             : 
    1076                 :     /* transaction ID (we're starting to stream, so must be valid) */
    1077 GIC         615 :     pq_sendint32(out, xid);
    1078                 : 
    1079                 :     /* 1 if this is the first streaming segment for this xid */
    1080             615 :     pq_sendbyte(out, first_segment ? 1 : 0);
    1081 CBC         615 : }
    1082                 : 
    1083                 : /*
    1084                 :  * Read the information about the start stream message from output stream.
    1085 ECB             :  */
    1086                 : TransactionId
    1087 CBC         835 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
    1088 ECB             : {
    1089                 :     TransactionId xid;
    1090                 : 
    1091 GIC         835 :     Assert(first_segment);
    1092                 : 
    1093             835 :     xid = pq_getmsgint(in, 4);
    1094             835 :     *first_segment = (pq_getmsgbyte(in) == 1);
    1095                 : 
    1096             835 :     return xid;
    1097 ECB             : }
    1098                 : 
    1099                 : /*
    1100                 :  * Write the stop stream message to the output stream.
    1101                 :  */
    1102                 : void
    1103 GIC         615 : logicalrep_write_stream_stop(StringInfo out)
    1104                 : {
    1105             615 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
    1106 CBC         615 : }
    1107                 : 
    1108                 : /*
    1109 ECB             :  * Write STREAM COMMIT to the output stream.
    1110                 :  */
    1111                 : void
    1112 GIC          45 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
    1113 ECB             :                                XLogRecPtr commit_lsn)
    1114                 : {
    1115 GIC          45 :     uint8       flags = 0;
    1116 ECB             : 
    1117 GIC          45 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
    1118                 : 
    1119 CBC          45 :     Assert(TransactionIdIsValid(txn->xid));
    1120                 : 
    1121                 :     /* transaction ID */
    1122              45 :     pq_sendint32(out, txn->xid);
    1123 ECB             : 
    1124                 :     /* send the flags field (unused for now) */
    1125 CBC          45 :     pq_sendbyte(out, flags);
    1126                 : 
    1127                 :     /* send fields */
    1128 GIC          45 :     pq_sendint64(out, commit_lsn);
    1129              45 :     pq_sendint64(out, txn->end_lsn);
    1130              45 :     pq_sendint64(out, txn->xact_time.commit_time);
    1131 CBC          45 : }
    1132                 : 
    1133                 : /*
    1134                 :  * Read STREAM COMMIT from the output stream.
    1135                 :  */
    1136 ECB             : TransactionId
    1137 GIC          61 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
    1138                 : {
    1139 ECB             :     TransactionId xid;
    1140                 :     uint8       flags;
    1141                 : 
    1142 GBC          61 :     xid = pq_getmsgint(in, 4);
    1143                 : 
    1144                 :     /* read flags (unused for now) */
    1145 CBC          61 :     flags = pq_getmsgbyte(in);
    1146 ECB             : 
    1147 CBC          61 :     if (flags != 0)
    1148 UIC           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
    1149 ECB             : 
    1150                 :     /* read fields */
    1151 GIC          61 :     commit_data->commit_lsn = pq_getmsgint64(in);
    1152              61 :     commit_data->end_lsn = pq_getmsgint64(in);
    1153              61 :     commit_data->committime = pq_getmsgint64(in);
    1154                 : 
    1155              61 :     return xid;
    1156                 : }
    1157                 : 
    1158                 : /*
    1159                 :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
    1160 ECB             :  * same for the top-level transaction abort.
    1161                 :  *
    1162                 :  * If write_abort_info is true, send the abort_lsn and abort_time fields,
    1163                 :  * otherwise don't.
    1164                 :  */
    1165                 : void
    1166 GIC          26 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
    1167                 :                               TransactionId subxid, XLogRecPtr abort_lsn,
    1168                 :                               TimestampTz abort_time, bool write_abort_info)
    1169                 : {
    1170 CBC          26 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
    1171                 : 
    1172 GIC          26 :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
    1173 ECB             : 
    1174                 :     /* transaction ID */
    1175 GIC          26 :     pq_sendint32(out, xid);
    1176 CBC          26 :     pq_sendint32(out, subxid);
    1177                 : 
    1178 GNC          26 :     if (write_abort_info)
    1179                 :     {
    1180              12 :         pq_sendint64(out, abort_lsn);
    1181              12 :         pq_sendint64(out, abort_time);
    1182                 :     }
    1183 GIC          26 : }
    1184 ECB             : 
    1185                 : /*
    1186                 :  * Read STREAM ABORT from the output stream.
    1187                 :  *
    1188                 :  * If read_abort_info is true, read the abort_lsn and abort_time fields,
    1189                 :  * otherwise don't.
    1190                 :  */
    1191                 : void
    1192 GNC          38 : logicalrep_read_stream_abort(StringInfo in,
    1193                 :                              LogicalRepStreamAbortData *abort_data,
    1194                 :                              bool read_abort_info)
    1195                 : {
    1196              38 :     Assert(abort_data);
    1197                 : 
    1198              38 :     abort_data->xid = pq_getmsgint(in, 4);
    1199              38 :     abort_data->subxid = pq_getmsgint(in, 4);
    1200                 : 
    1201              38 :     if (read_abort_info)
    1202                 :     {
    1203              24 :         abort_data->abort_lsn = pq_getmsgint64(in);
    1204              24 :         abort_data->abort_time = pq_getmsgint64(in);
    1205                 :     }
    1206                 :     else
    1207                 :     {
    1208              14 :         abort_data->abort_lsn = InvalidXLogRecPtr;
    1209              14 :         abort_data->abort_time = 0;
    1210                 :     }
    1211 CBC          38 : }
    1212                 : 
    1213                 : /*
    1214                 :  * Get string representing LogicalRepMsgType.
    1215 ECB             :  */
    1216                 : char *
    1217 CBC         338 : logicalrep_message_type(LogicalRepMsgType action)
    1218 ECB             : {
    1219 GIC         338 :     switch (action)
    1220 ECB             :     {
    1221 GIC           1 :         case LOGICAL_REP_MSG_BEGIN:
    1222 CBC           1 :             return "BEGIN";
    1223               1 :         case LOGICAL_REP_MSG_COMMIT:
    1224 GIC           1 :             return "COMMIT";
    1225 UIC           0 :         case LOGICAL_REP_MSG_ORIGIN:
    1226               0 :             return "ORIGIN";
    1227 CBC          36 :         case LOGICAL_REP_MSG_INSERT:
    1228              36 :             return "INSERT";
    1229 GIC           9 :         case LOGICAL_REP_MSG_UPDATE:
    1230 CBC           9 :             return "UPDATE";
    1231 GIC           5 :         case LOGICAL_REP_MSG_DELETE:
    1232               5 :             return "DELETE";
    1233 UIC           0 :         case LOGICAL_REP_MSG_TRUNCATE:
    1234               0 :             return "TRUNCATE";
    1235 GIC           2 :         case LOGICAL_REP_MSG_RELATION:
    1236 CBC           2 :             return "RELATION";
    1237 UIC           0 :         case LOGICAL_REP_MSG_TYPE:
    1238 LBC           0 :             return "TYPE";
    1239 UIC           0 :         case LOGICAL_REP_MSG_MESSAGE:
    1240 LBC           0 :             return "MESSAGE";
    1241 CBC           1 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    1242               1 :             return "BEGIN PREPARE";
    1243               1 :         case LOGICAL_REP_MSG_PREPARE:
    1244 GBC           1 :             return "PREPARE";
    1245 UBC           0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    1246 LBC           0 :             return "COMMIT PREPARED";
    1247               0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    1248               0 :             return "ROLLBACK PREPARED";
    1249 CBC          13 :         case LOGICAL_REP_MSG_STREAM_START:
    1250              13 :             return "STREAM START";
    1251             228 :         case LOGICAL_REP_MSG_STREAM_STOP:
    1252 GBC         228 :             return "STREAM STOP";
    1253              20 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    1254 CBC          20 :             return "STREAM COMMIT";
    1255              19 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    1256 GBC          19 :             return "STREAM ABORT";
    1257               2 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    1258               2 :             return "STREAM PREPARE";
    1259 EUB             :     }
    1260 ECB             : 
    1261 LBC           0 :     elog(ERROR, "invalid logical replication message type \"%c\"", action);
    1262 ECB             : 
    1263                 :     return NULL;                /* keep compiler quiet */
    1264 EUB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a