LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 91.2 % 510 465 6 10 29 9 68 12 376 7 78 5
Current Date: 2023-04-08 17:13:01 Functions: 97.8 % 46 45 1 11 2 32 12
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 100.0 % 12 12 12
Legend: Lines: hit not hit (120,180] days: 100.0 % 2 2 2
(240..) days: 90.9 % 496 451 6 10 29 9 68 374 3 68
Function coverage date bins:
(60,120] days: 100.0 % 1 1 1
(240..) days: 77.2 % 57 44 1 11 1 32 12

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * proto.c
                                  4                 :  *      logical replication protocol functions
                                  5                 :  *
                                  6                 :  * Copyright (c) 2015-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *      src/backend/replication/logical/proto.c
                                 10                 :  *
                                 11                 :  *-------------------------------------------------------------------------
                                 12                 :  */
                                 13                 : #include "postgres.h"
                                 14                 : 
                                 15                 : #include "access/sysattr.h"
                                 16                 : #include "catalog/pg_namespace.h"
                                 17                 : #include "catalog/pg_type.h"
                                 18                 : #include "libpq/pqformat.h"
                                 19                 : #include "replication/logicalproto.h"
                                 20                 : #include "utils/lsyscache.h"
                                 21                 : #include "utils/syscache.h"
                                 22                 : 
                                 23                 : /*
                                 24                 :  * Protocol message flags.
                                 25                 :  */
                                 26                 : #define LOGICALREP_IS_REPLICA_IDENTITY 1
                                 27                 : 
                                 28                 : #define MESSAGE_TRANSACTIONAL (1<<0)
                                 29                 : #define TRUNCATE_CASCADE        (1<<0)
                                 30                 : #define TRUNCATE_RESTART_SEQS   (1<<1)
                                 31                 : 
                                 32                 : static void logicalrep_write_attrs(StringInfo out, Relation rel,
                                 33                 :                                    Bitmapset *columns);
                                 34                 : static void logicalrep_write_tuple(StringInfo out, Relation rel,
                                 35                 :                                    TupleTableSlot *slot,
                                 36                 :                                    bool binary, Bitmapset *columns);
                                 37                 : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
                                 38                 : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
                                 39                 : 
                                 40                 : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
                                 41                 : static const char *logicalrep_read_namespace(StringInfo in);
                                 42                 : 
                                 43                 : /*
                                 44                 :  * Check if a column is covered by a column list.
                                 45                 :  *
                                 46                 :  * Need to be careful about NULL, which is treated as a column list covering
                                 47                 :  * all columns.
                                 48                 :  */
                                 49                 : static bool
  379 tomas.vondra               50 CBC      732504 : column_in_column_list(int attnum, Bitmapset *columns)
                                 51                 : {
                                 52          732504 :     return (columns == NULL || bms_is_member(attnum, columns));
                                 53                 : }
                                 54                 : 
                                 55                 : 
                                 56                 : /*
                                 57                 :  * Write BEGIN to the output stream.
                                 58                 :  */
                                 59                 : void
 2271 peter_e                    60             335 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
                                 61                 : {
  888 akapila                    62             335 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
                                 63                 : 
                                 64                 :     /* fixed fields */
 2271 peter_e                    65             335 :     pq_sendint64(out, txn->final_lsn);
  634 akapila                    66             335 :     pq_sendint64(out, txn->xact_time.commit_time);
 2006 andres                     67             335 :     pq_sendint32(out, txn->xid);
 2271 peter_e                    68             335 : }
                                 69                 : 
                                 70                 : /*
                                 71                 :  * Read transaction BEGIN from the stream.
                                 72                 :  */
                                 73                 : void
                                 74             380 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
                                 75                 : {
                                 76                 :     /* read fields */
                                 77             380 :     begin_data->final_lsn = pq_getmsgint64(in);
                                 78             380 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
 2271 peter_e                    79 UBC           0 :         elog(ERROR, "final_lsn not set in begin message");
 2271 peter_e                    80 CBC         380 :     begin_data->committime = pq_getmsgint64(in);
                                 81             380 :     begin_data->xid = pq_getmsgint(in, 4);
                                 82             380 : }
                                 83                 : 
                                 84                 : 
                                 85                 : /*
                                 86                 :  * Write COMMIT to the output stream.
                                 87                 :  */
                                 88                 : void
                                 89             334 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
                                 90                 :                         XLogRecPtr commit_lsn)
                                 91                 : {
 2153 bruce                      92             334 :     uint8       flags = 0;
                                 93                 : 
  888 akapila                    94             334 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
                                 95                 : 
                                 96                 :     /* send the flags field (unused for now) */
 2271 peter_e                    97             334 :     pq_sendbyte(out, flags);
                                 98                 : 
                                 99                 :     /* send fields */
                                100             334 :     pq_sendint64(out, commit_lsn);
                                101             334 :     pq_sendint64(out, txn->end_lsn);
  634 akapila                   102             334 :     pq_sendint64(out, txn->xact_time.commit_time);
 2271 peter_e                   103             334 : }
                                104                 : 
                                105                 : /*
                                106                 :  * Read transaction COMMIT from the stream.
                                107                 :  */
                                108                 : void
                                109             355 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
                                110                 : {
                                111                 :     /* read flags (unused for now) */
 2153 bruce                     112             355 :     uint8       flags = pq_getmsgbyte(in);
                                113                 : 
 2271 peter_e                   114             355 :     if (flags != 0)
 2165 tgl                       115 UBC           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
                                116                 : 
                                117                 :     /* read fields */
 2271 peter_e                   118 CBC         355 :     commit_data->commit_lsn = pq_getmsgint64(in);
                                119             355 :     commit_data->end_lsn = pq_getmsgint64(in);
                                120             355 :     commit_data->committime = pq_getmsgint64(in);
                                121             355 : }
                                122                 : 
                                123                 : /*
                                124                 :  * Write BEGIN PREPARE to the output stream.
                                125                 :  */
                                126                 : void
  634 akapila                   127              18 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
                                128                 : {
                                129              18 :     pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
                                130                 : 
                                131                 :     /* fixed fields */
                                132              18 :     pq_sendint64(out, txn->final_lsn);
                                133              18 :     pq_sendint64(out, txn->end_lsn);
                                134              18 :     pq_sendint64(out, txn->xact_time.prepare_time);
                                135              18 :     pq_sendint32(out, txn->xid);
                                136                 : 
                                137                 :     /* send gid */
                                138              18 :     pq_sendstring(out, txn->gid);
                                139              18 : }
                                140                 : 
                                141                 : /*
                                142                 :  * Read transaction BEGIN PREPARE from the stream.
                                143                 :  */
                                144                 : void
                                145              14 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
                                146                 : {
                                147                 :     /* read fields */
                                148              14 :     begin_data->prepare_lsn = pq_getmsgint64(in);
                                149              14 :     if (begin_data->prepare_lsn == InvalidXLogRecPtr)
  634 akapila                   150 UBC           0 :         elog(ERROR, "prepare_lsn not set in begin prepare message");
  634 akapila                   151 CBC          14 :     begin_data->end_lsn = pq_getmsgint64(in);
                                152              14 :     if (begin_data->end_lsn == InvalidXLogRecPtr)
  634 akapila                   153 UBC           0 :         elog(ERROR, "end_lsn not set in begin prepare message");
  634 akapila                   154 CBC          14 :     begin_data->prepare_time = pq_getmsgint64(in);
                                155              14 :     begin_data->xid = pq_getmsgint(in, 4);
                                156                 : 
                                157                 :     /* read gid (copy it into a pre-allocated buffer) */
  628                           158              14 :     strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
  634                           159              14 : }
                                160                 : 
                                161                 : /*
                                162                 :  * The core functionality for logicalrep_write_prepare and
                                163                 :  * logicalrep_write_stream_prepare.
                                164                 :  */
                                165                 : static void
  619                           166              29 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
                                167                 :                                 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
                                168                 : {
  634                           169              29 :     uint8       flags = 0;
                                170                 : 
  619                           171              29 :     pq_sendbyte(out, type);
                                172                 : 
                                173                 :     /*
                                174                 :      * This should only ever happen for two-phase commit transactions, in
                                175                 :      * which case we expect to have a valid GID.
                                176                 :      */
  634                           177              29 :     Assert(txn->gid != NULL);
                                178              29 :     Assert(rbtxn_prepared(txn));
  619                           179              29 :     Assert(TransactionIdIsValid(txn->xid));
                                180                 : 
                                181                 :     /* send the flags field */
  634                           182              29 :     pq_sendbyte(out, flags);
                                183                 : 
                                184                 :     /* send fields */
                                185              29 :     pq_sendint64(out, prepare_lsn);
                                186              29 :     pq_sendint64(out, txn->end_lsn);
                                187              29 :     pq_sendint64(out, txn->xact_time.prepare_time);
                                188              29 :     pq_sendint32(out, txn->xid);
                                189                 : 
                                190                 :     /* send gid */
                                191              29 :     pq_sendstring(out, txn->gid);
                                192              29 : }
                                193                 : 
                                194                 : /*
                                195                 :  * Write PREPARE to the output stream.
                                196                 :  */
                                197                 : void
  619                           198              18 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
                                199                 :                          XLogRecPtr prepare_lsn)
                                200                 : {
                                201              18 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
                                202                 :                                     txn, prepare_lsn);
                                203              18 : }
                                204                 : 
                                205                 : /*
                                206                 :  * The core functionality for logicalrep_read_prepare and
                                207                 :  * logicalrep_read_stream_prepare.
                                208                 :  */
                                209                 : static void
                                210              24 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
                                211                 :                                LogicalRepPreparedTxnData *prepare_data)
                                212                 : {
                                213                 :     /* read flags */
  634                           214              24 :     uint8       flags = pq_getmsgbyte(in);
                                215                 : 
                                216              24 :     if (flags != 0)
  619 akapila                   217 UBC           0 :         elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
                                218                 : 
                                219                 :     /* read fields */
  634 akapila                   220 CBC          24 :     prepare_data->prepare_lsn = pq_getmsgint64(in);
                                221              24 :     if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
  619 akapila                   222 UBC           0 :         elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
  634 akapila                   223 CBC          24 :     prepare_data->end_lsn = pq_getmsgint64(in);
                                224              24 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
  619 akapila                   225 UBC           0 :         elog(ERROR, "end_lsn is not set in %s message", msgtype);
  634 akapila                   226 CBC          24 :     prepare_data->prepare_time = pq_getmsgint64(in);
                                227              24 :     prepare_data->xid = pq_getmsgint(in, 4);
  613                           228              24 :     if (prepare_data->xid == InvalidTransactionId)
  613 akapila                   229 UBC           0 :         elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
                                230                 : 
                                231                 :     /* read gid (copy it into a pre-allocated buffer) */
  628 akapila                   232 CBC          24 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
  634                           233              24 : }
                                234                 : 
                                235                 : /*
                                236                 :  * Read transaction PREPARE from the stream.
                                237                 :  */
                                238                 : void
  619                           239              13 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
                                240                 : {
                                241              13 :     logicalrep_read_prepare_common(in, "prepare", prepare_data);
                                242              13 : }
                                243                 : 
                                244                 : /*
                                245                 :  * Write COMMIT PREPARED to the output stream.
                                246                 :  */
                                247                 : void
  634                           248              22 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
                                249                 :                                  XLogRecPtr commit_lsn)
                                250                 : {
                                251              22 :     uint8       flags = 0;
                                252                 : 
                                253              22 :     pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
                                254                 : 
                                255                 :     /*
                                256                 :      * This should only ever happen for two-phase commit transactions, in
                                257                 :      * which case we expect to have a valid GID.
                                258                 :      */
                                259              22 :     Assert(txn->gid != NULL);
                                260                 : 
                                261                 :     /* send the flags field */
                                262              22 :     pq_sendbyte(out, flags);
                                263                 : 
                                264                 :     /* send fields */
                                265              22 :     pq_sendint64(out, commit_lsn);
                                266              22 :     pq_sendint64(out, txn->end_lsn);
                                267              22 :     pq_sendint64(out, txn->xact_time.commit_time);
                                268              22 :     pq_sendint32(out, txn->xid);
                                269                 : 
                                270                 :     /* send gid */
                                271              22 :     pq_sendstring(out, txn->gid);
                                272              22 : }
                                273                 : 
                                274                 : /*
                                275                 :  * Read transaction COMMIT PREPARED from the stream.
                                276                 :  */
                                277                 : void
                                278              19 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
                                279                 : {
                                280                 :     /* read flags */
                                281              19 :     uint8       flags = pq_getmsgbyte(in);
                                282                 : 
                                283              19 :     if (flags != 0)
  634 akapila                   284 UBC           0 :         elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
                                285                 : 
                                286                 :     /* read fields */
  634 akapila                   287 CBC          19 :     prepare_data->commit_lsn = pq_getmsgint64(in);
                                288              19 :     if (prepare_data->commit_lsn == InvalidXLogRecPtr)
  634 akapila                   289 UBC           0 :         elog(ERROR, "commit_lsn is not set in commit prepared message");
  634 akapila                   290 CBC          19 :     prepare_data->end_lsn = pq_getmsgint64(in);
                                291              19 :     if (prepare_data->end_lsn == InvalidXLogRecPtr)
  634 akapila                   292 UBC           0 :         elog(ERROR, "end_lsn is not set in commit prepared message");
  634 akapila                   293 CBC          19 :     prepare_data->commit_time = pq_getmsgint64(in);
                                294              19 :     prepare_data->xid = pq_getmsgint(in, 4);
                                295                 : 
                                296                 :     /* read gid (copy it into a pre-allocated buffer) */
  628                           297              19 :     strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
  634                           298              19 : }
                                299                 : 
                                300                 : /*
                                301                 :  * Write ROLLBACK PREPARED to the output stream.
                                302                 :  */
                                303                 : void
                                304               8 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
                                305                 :                                    XLogRecPtr prepare_end_lsn,
                                306                 :                                    TimestampTz prepare_time)
                                307                 : {
                                308               8 :     uint8       flags = 0;
                                309                 : 
                                310               8 :     pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
                                311                 : 
                                312                 :     /*
                                313                 :      * This should only ever happen for two-phase commit transactions, in
                                314                 :      * which case we expect to have a valid GID.
                                315                 :      */
                                316               8 :     Assert(txn->gid != NULL);
                                317                 : 
                                318                 :     /* send the flags field */
                                319               8 :     pq_sendbyte(out, flags);
                                320                 : 
                                321                 :     /* send fields */
                                322               8 :     pq_sendint64(out, prepare_end_lsn);
                                323               8 :     pq_sendint64(out, txn->end_lsn);
                                324               8 :     pq_sendint64(out, prepare_time);
                                325               8 :     pq_sendint64(out, txn->xact_time.commit_time);
                                326               8 :     pq_sendint32(out, txn->xid);
                                327                 : 
                                328                 :     /* send gid */
                                329               8 :     pq_sendstring(out, txn->gid);
                                330               8 : }
                                331                 : 
                                332                 : /*
                                333                 :  * Read transaction ROLLBACK PREPARED from the stream.
                                334                 :  */
                                335                 : void
                                336               5 : logicalrep_read_rollback_prepared(StringInfo in,
                                337                 :                                   LogicalRepRollbackPreparedTxnData *rollback_data)
                                338                 : {
                                339                 :     /* read flags */
                                340               5 :     uint8       flags = pq_getmsgbyte(in);
                                341                 : 
                                342               5 :     if (flags != 0)
  634 akapila                   343 UBC           0 :         elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
                                344                 : 
                                345                 :     /* read fields */
  634 akapila                   346 CBC           5 :     rollback_data->prepare_end_lsn = pq_getmsgint64(in);
                                347               5 :     if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
  634 akapila                   348 UBC           0 :         elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
  634 akapila                   349 CBC           5 :     rollback_data->rollback_end_lsn = pq_getmsgint64(in);
                                350               5 :     if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
  634 akapila                   351 UBC           0 :         elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
  634 akapila                   352 CBC           5 :     rollback_data->prepare_time = pq_getmsgint64(in);
                                353               5 :     rollback_data->rollback_time = pq_getmsgint64(in);
                                354               5 :     rollback_data->xid = pq_getmsgint(in, 4);
                                355                 : 
                                356                 :     /* read gid (copy it into a pre-allocated buffer) */
  628                           357               5 :     strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
  634                           358               5 : }
                                359                 : 
                                360                 : /*
                                361                 :  * Write STREAM PREPARE to the output stream.
                                362                 :  */
                                363                 : void
  613                           364              11 : logicalrep_write_stream_prepare(StringInfo out,
                                365                 :                                 ReorderBufferTXN *txn,
                                366                 :                                 XLogRecPtr prepare_lsn)
                                367                 : {
                                368              11 :     logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
                                369                 :                                     txn, prepare_lsn);
                                370              11 : }
                                371                 : 
                                372                 : /*
                                373                 :  * Read STREAM PREPARE from the stream.
                                374                 :  */
                                375                 : void
                                376              11 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
                                377                 : {
                                378              11 :     logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
                                379              11 : }
                                380                 : 
                                381                 : /*
                                382                 :  * Write ORIGIN to the output stream.
                                383                 :  */
                                384                 : void
 2271 peter_e                   385               7 : logicalrep_write_origin(StringInfo out, const char *origin,
                                386                 :                         XLogRecPtr origin_lsn)
                                387                 : {
  888 akapila                   388               7 :     pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
                                389                 : 
                                390                 :     /* fixed fields */
 2271 peter_e                   391               7 :     pq_sendint64(out, origin_lsn);
                                392                 : 
                                393                 :     /* origin string */
                                394               7 :     pq_sendstring(out, origin);
                                395               7 : }
                                396                 : 
                                397                 : /*
                                398                 :  * Read ORIGIN from the output stream.
                                399                 :  */
                                400                 : char *
 2271 peter_e                   401 UBC           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
                                402                 : {
                                403                 :     /* fixed fields */
                                404               0 :     *origin_lsn = pq_getmsgint64(in);
                                405                 : 
                                406                 :     /* return origin */
                                407               0 :     return pstrdup(pq_getmsgstring(in));
                                408                 : }
                                409                 : 
                                410                 : /*
                                411                 :  * Write INSERT to the output stream.
                                412                 :  */
                                413                 : void
  948 akapila                   414 CBC      105697 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
                                415                 :                         TupleTableSlot *newslot, bool binary, Bitmapset *columns)
                                416                 : {
  888                           417          105697 :     pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
                                418                 : 
                                419                 :     /* transaction ID (if not valid, we're not streaming) */
  948                           420          105697 :     if (TransactionIdIsValid(xid))
                                421          100075 :         pq_sendint32(out, xid);
                                422                 : 
                                423                 :     /* use Oid as relation identifier */
 2006 andres                    424          105697 :     pq_sendint32(out, RelationGetRelid(rel));
                                425                 : 
 2271 peter_e                   426          105697 :     pq_sendbyte(out, 'N');      /* new tuple follows */
  379 tomas.vondra              427          105697 :     logicalrep_write_tuple(out, rel, newslot, binary, columns);
 2271 peter_e                   428          105697 : }
                                429                 : 
                                430                 : /*
                                431                 :  * Read INSERT from stream.
                                432                 :  *
                                433                 :  * Fills the new tuple.
                                434                 :  */
                                435                 : LogicalRepRelId
                                436           75638 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
                                437                 : {
                                438                 :     char        action;
                                439                 :     LogicalRepRelId relid;
                                440                 : 
                                441                 :     /* read the relation id */
                                442           75638 :     relid = pq_getmsgint(in, 4);
                                443                 : 
                                444           75638 :     action = pq_getmsgbyte(in);
                                445           75638 :     if (action != 'N')
 2271 peter_e                   446 UBC           0 :         elog(ERROR, "expected new tuple but got %d",
                                447                 :              action);
                                448                 : 
 2271 peter_e                   449 CBC       75638 :     logicalrep_read_tuple(in, newtup);
                                450                 : 
                                451           75638 :     return relid;
                                452                 : }
                                453                 : 
                                454                 : /*
                                455                 :  * Write UPDATE to the output stream.
                                456                 :  */
                                457                 : void
  948 akapila                   458           34408 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
                                459                 :                         TupleTableSlot *oldslot, TupleTableSlot *newslot,
                                460                 :                         bool binary, Bitmapset *columns)
                                461                 : {
  888                           462           34408 :     pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
                                463                 : 
 2271 peter_e                   464           34408 :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
                                465                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
                                466                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
                                467                 : 
                                468                 :     /* transaction ID (if not valid, we're not streaming) */
  948 akapila                   469           34408 :     if (TransactionIdIsValid(xid))
                                470           34226 :         pq_sendint32(out, xid);
                                471                 : 
                                472                 :     /* use Oid as relation identifier */
 2006 andres                    473           34408 :     pq_sendint32(out, RelationGetRelid(rel));
                                474                 : 
  411 akapila                   475           34408 :     if (oldslot != NULL)
                                476                 :     {
 2271 peter_e                   477             107 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 2118 tgl                       478              47 :             pq_sendbyte(out, 'O');  /* old tuple follows */
                                479                 :         else
                                480              60 :             pq_sendbyte(out, 'K');  /* old key follows */
  128 akapila                   481             107 :         logicalrep_write_tuple(out, rel, oldslot, binary, columns);
                                482                 :     }
                                483                 : 
 2271 peter_e                   484           34408 :     pq_sendbyte(out, 'N');      /* new tuple follows */
  379 tomas.vondra              485           34408 :     logicalrep_write_tuple(out, rel, newslot, binary, columns);
 2271 peter_e                   486           34408 : }
                                487                 : 
                                488                 : /*
                                489                 :  * Read UPDATE from stream.
                                490                 :  */
                                491                 : LogicalRepRelId
                                492           31917 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
                                493                 :                        LogicalRepTupleData *oldtup,
                                494                 :                        LogicalRepTupleData *newtup)
                                495                 : {
                                496                 :     char        action;
                                497                 :     LogicalRepRelId relid;
                                498                 : 
                                499                 :     /* read the relation id */
                                500           31917 :     relid = pq_getmsgint(in, 4);
                                501                 : 
                                502                 :     /* read and verify action */
                                503           31917 :     action = pq_getmsgbyte(in);
                                504           31917 :     if (action != 'K' && action != 'O' && action != 'N')
 2271 peter_e                   505 UBC           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
                                506                 :              action);
                                507                 : 
                                508                 :     /* check for old tuple */
 2271 peter_e                   509 CBC       31917 :     if (action == 'K' || action == 'O')
                                510                 :     {
                                511             118 :         logicalrep_read_tuple(in, oldtup);
                                512             118 :         *has_oldtuple = true;
                                513                 : 
                                514             118 :         action = pq_getmsgbyte(in);
                                515                 :     }
                                516                 :     else
                                517           31799 :         *has_oldtuple = false;
                                518                 : 
                                519                 :     /* check for new  tuple */
                                520           31917 :     if (action != 'N')
 2271 peter_e                   521 UBC           0 :         elog(ERROR, "expected action 'N', got %c",
                                522                 :              action);
                                523                 : 
 2271 peter_e                   524 CBC       31917 :     logicalrep_read_tuple(in, newtup);
                                525                 : 
                                526           31917 :     return relid;
                                527                 : }
                                528                 : 
                                529                 : /*
                                530                 :  * Write DELETE to the output stream.
                                531                 :  */
                                532                 : void
  948 akapila                   533           41853 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
                                534                 :                         TupleTableSlot *oldslot, bool binary,
                                535                 :                         Bitmapset *columns)
                                536                 : {
 2271 peter_e                   537           41853 :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
                                538                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
                                539                 :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
                                540                 : 
  888 akapila                   541           41853 :     pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
                                542                 : 
                                543                 :     /* transaction ID (if not valid, we're not streaming) */
  948                           544           41853 :     if (TransactionIdIsValid(xid))
                                545           41620 :         pq_sendint32(out, xid);
                                546                 : 
                                547                 :     /* use Oid as relation identifier */
 2006 andres                    548           41853 :     pq_sendint32(out, RelationGetRelid(rel));
                                549                 : 
 2271 peter_e                   550           41853 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
                                551             118 :         pq_sendbyte(out, 'O');  /* old tuple follows */
                                552                 :     else
                                553           41735 :         pq_sendbyte(out, 'K');  /* old key follows */
                                554                 : 
  128 akapila                   555           41853 :     logicalrep_write_tuple(out, rel, oldslot, binary, columns);
 2271 peter_e                   556           41853 : }
                                557                 : 
                                558                 : /*
                                559                 :  * Read DELETE from stream.
                                560                 :  *
                                561                 :  * Fills the old tuple.
                                562                 :  */
                                563                 : LogicalRepRelId
                                564           40303 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
                                565                 : {
                                566                 :     char        action;
                                567                 :     LogicalRepRelId relid;
                                568                 : 
                                569                 :     /* read the relation id */
                                570           40303 :     relid = pq_getmsgint(in, 4);
                                571                 : 
                                572                 :     /* read and verify action */
                                573           40303 :     action = pq_getmsgbyte(in);
                                574           40303 :     if (action != 'K' && action != 'O')
 2271 peter_e                   575 UBC           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
                                576                 : 
 2271 peter_e                   577 CBC       40303 :     logicalrep_read_tuple(in, oldtup);
                                578                 : 
                                579           40303 :     return relid;
                                580                 : }
                                581                 : 
                                582                 : /*
                                583                 :  * Write TRUNCATE to the output stream.
                                584                 :  */
                                585                 : void
 1828                           586               7 : logicalrep_write_truncate(StringInfo out,
                                587                 :                           TransactionId xid,
                                588                 :                           int nrelids,
                                589                 :                           Oid relids[],
                                590                 :                           bool cascade, bool restart_seqs)
                                591                 : {
                                592                 :     int         i;
 1809 tgl                       593               7 :     uint8       flags = 0;
                                594                 : 
  888 akapila                   595               7 :     pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
                                596                 : 
                                597                 :     /* transaction ID (if not valid, we're not streaming) */
  948                           598               7 :     if (TransactionIdIsValid(xid))
  948 akapila                   599 UBC           0 :         pq_sendint32(out, xid);
                                600                 : 
 1828 peter_e                   601 CBC           7 :     pq_sendint32(out, nrelids);
                                602                 : 
                                603                 :     /* encode and send truncate flags */
                                604               7 :     if (cascade)
 1828 peter_e                   605 UBC           0 :         flags |= TRUNCATE_CASCADE;
 1828 peter_e                   606 CBC           7 :     if (restart_seqs)
 1828 peter_e                   607 UBC           0 :         flags |= TRUNCATE_RESTART_SEQS;
 1828 peter_e                   608 CBC           7 :     pq_sendint8(out, flags);
                                609                 : 
                                610              19 :     for (i = 0; i < nrelids; i++)
                                611              12 :         pq_sendint32(out, relids[i]);
                                612               7 : }
                                613                 : 
                                614                 : /*
                                615                 :  * Read TRUNCATE from stream.
                                616                 :  */
                                617                 : List *
                                618              17 : logicalrep_read_truncate(StringInfo in,
                                619                 :                          bool *cascade, bool *restart_seqs)
                                620                 : {
                                621                 :     int         i;
                                622                 :     int         nrelids;
                                623              17 :     List       *relids = NIL;
                                624                 :     uint8       flags;
                                625                 : 
                                626              17 :     nrelids = pq_getmsgint(in, 4);
                                627                 : 
                                628                 :     /* read and decode truncate flags */
                                629              17 :     flags = pq_getmsgint(in, 1);
                                630              17 :     *cascade = (flags & TRUNCATE_CASCADE) > 0;
                                631              17 :     *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
                                632                 : 
                                633              43 :     for (i = 0; i < nrelids; i++)
                                634              26 :         relids = lappend_oid(relids, pq_getmsgint(in, 4));
                                635                 : 
                                636              17 :     return relids;
                                637                 : }
                                638                 : 
                                639                 : /*
                                640                 :  * Write MESSAGE to stream
                                641                 :  */
                                642                 : void
  733 akapila                   643               5 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
                                644                 :                          bool transactional, const char *prefix, Size sz,
                                645                 :                          const char *message)
                                646                 : {
                                647               5 :     uint8       flags = 0;
                                648                 : 
                                649               5 :     pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
                                650                 : 
                                651                 :     /* encode and send message flags */
                                652               5 :     if (transactional)
                                653               2 :         flags |= MESSAGE_TRANSACTIONAL;
                                654                 : 
                                655                 :     /* transaction ID (if not valid, we're not streaming) */
                                656               5 :     if (TransactionIdIsValid(xid))
  733 akapila                   657 UBC           0 :         pq_sendint32(out, xid);
                                658                 : 
  733 akapila                   659 CBC           5 :     pq_sendint8(out, flags);
                                660               5 :     pq_sendint64(out, lsn);
                                661               5 :     pq_sendstring(out, prefix);
                                662               5 :     pq_sendint32(out, sz);
                                663               5 :     pq_sendbytes(out, message, sz);
                                664               5 : }
                                665                 : 
                                666                 : /*
                                667                 :  * Write relation description to the output stream.
                                668                 :  */
                                669                 : void
  379 tomas.vondra              670             286 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
                                671                 :                      Bitmapset *columns)
                                672                 : {
                                673                 :     char       *relname;
                                674                 : 
  888 akapila                   675             286 :     pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
                                676                 : 
                                677                 :     /* transaction ID (if not valid, we're not streaming) */
  948                           678             286 :     if (TransactionIdIsValid(xid))
                                679              68 :         pq_sendint32(out, xid);
                                680                 : 
                                681                 :     /* use Oid as relation identifier */
 2006 andres                    682             286 :     pq_sendint32(out, RelationGetRelid(rel));
                                683                 : 
                                684                 :     /* send qualified relation name */
 2271 peter_e                   685             286 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
                                686             286 :     relname = RelationGetRelationName(rel);
                                687             286 :     pq_sendstring(out, relname);
                                688                 : 
                                689                 :     /* send replica identity */
                                690             286 :     pq_sendbyte(out, rel->rd_rel->relreplident);
                                691                 : 
                                692                 :     /* send the attribute info */
  379 tomas.vondra              693             286 :     logicalrep_write_attrs(out, rel, columns);
 2271 peter_e                   694             286 : }
                                695                 : 
                                696                 : /*
                                697                 :  * Read the relation info from stream and return as LogicalRepRelation.
                                698                 :  */
                                699                 : LogicalRepRelation *
                                700             354 : logicalrep_read_rel(StringInfo in)
                                701                 : {
 2153 bruce                     702             354 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
                                703                 : 
 2271 peter_e                   704             354 :     rel->remoteid = pq_getmsgint(in, 4);
                                705                 : 
                                706                 :     /* Read relation name from stream */
                                707             354 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
                                708             354 :     rel->relname = pstrdup(pq_getmsgstring(in));
                                709                 : 
                                710                 :     /* Read the replica identity. */
                                711             354 :     rel->replident = pq_getmsgbyte(in);
                                712                 : 
                                713                 :     /* Get attribute description */
                                714             354 :     logicalrep_read_attrs(in, rel);
                                715                 : 
                                716             354 :     return rel;
                                717                 : }
                                718                 : 
                                719                 : /*
                                720                 :  * Write type info to the output stream.
                                721                 :  *
                                722                 :  * This function will always write base type info.
                                723                 :  */
                                724                 : void
  948 akapila                   725              18 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
                                726                 : {
 2271 peter_e                   727              18 :     Oid         basetypoid = getBaseType(typoid);
                                728                 :     HeapTuple   tup;
                                729                 :     Form_pg_type typtup;
                                730                 : 
  888 akapila                   731              18 :     pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
                                732                 : 
                                733                 :     /* transaction ID (if not valid, we're not streaming) */
  948                           734              18 :     if (TransactionIdIsValid(xid))
  948 akapila                   735 UBC           0 :         pq_sendint32(out, xid);
                                736                 : 
 2271 peter_e                   737 CBC          18 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
                                738              18 :     if (!HeapTupleIsValid(tup))
 2271 peter_e                   739 UBC           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
 2271 peter_e                   740 CBC          18 :     typtup = (Form_pg_type) GETSTRUCT(tup);
                                741                 : 
                                742                 :     /* use Oid as relation identifier */
 2006 andres                    743              18 :     pq_sendint32(out, typoid);
                                744                 : 
                                745                 :     /* send qualified type name */
 2271 peter_e                   746              18 :     logicalrep_write_namespace(out, typtup->typnamespace);
                                747              18 :     pq_sendstring(out, NameStr(typtup->typname));
                                748                 : 
                                749              18 :     ReleaseSysCache(tup);
                                750              18 : }
                                751                 : 
                                752                 : /*
                                753                 :  * Read type info from the output stream.
                                754                 :  */
                                755                 : void
                                756              18 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
                                757                 : {
                                758              18 :     ltyp->remoteid = pq_getmsgint(in, 4);
                                759                 : 
                                760                 :     /* Read type name from stream */
                                761              18 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
                                762              18 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
                                763              18 : }
                                764                 : 
                                765                 : /*
                                766                 :  * Write a tuple to the outputstream, in the most efficient format possible.
                                767                 :  */
                                768                 : static void
  411 akapila                   769          182065 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
                                770                 :                        bool binary, Bitmapset *columns)
                                771                 : {
                                772                 :     TupleDesc   desc;
                                773                 :     Datum      *values;
                                774                 :     bool       *isnull;
                                775                 :     int         i;
 2271 peter_e                   776          182065 :     uint16      nliveatts = 0;
                                777                 : 
                                778          182065 :     desc = RelationGetDescr(rel);
                                779                 : 
                                780          547747 :     for (i = 0; i < desc->natts; i++)
                                781                 :     {
  379 tomas.vondra              782          365682 :         Form_pg_attribute att = TupleDescAttr(desc, i);
                                783                 : 
                                784          365682 :         if (att->attisdropped || att->attgenerated)
                                785               8 :             continue;
                                786                 : 
                                787          365674 :         if (!column_in_column_list(att->attnum, columns))
 2271 peter_e                   788             102 :             continue;
                                789                 : 
                                790          365572 :         nliveatts++;
                                791                 :     }
 2006 andres                    792          182065 :     pq_sendint16(out, nliveatts);
                                793                 : 
  411 akapila                   794          182065 :     slot_getallattrs(slot);
                                795          182065 :     values = slot->tts_values;
                                796          182065 :     isnull = slot->tts_isnull;
                                797                 : 
                                798                 :     /* Write the values */
 2271 peter_e                   799          547747 :     for (i = 0; i < desc->natts; i++)
                                800                 :     {
                                801                 :         HeapTuple   typtup;
                                802                 :         Form_pg_type typclass;
 2058 andres                    803          365682 :         Form_pg_attribute att = TupleDescAttr(desc, i);
                                804                 : 
 1471 peter                     805          365682 :         if (att->attisdropped || att->attgenerated)
 2271 peter_e                   806               8 :             continue;
                                807                 : 
  379 tomas.vondra              808          365674 :         if (!column_in_column_list(att->attnum, columns))
                                809             102 :             continue;
                                810                 : 
 2271 peter_e                   811          365572 :         if (isnull[i])
                                812                 :         {
  995 tgl                       813           51867 :             pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
 2271 peter_e                   814           51867 :             continue;
                                815                 :         }
                                816                 : 
  995 tgl                       817          313705 :         if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
                                818                 :         {
                                819                 :             /*
                                820                 :              * Unchanged toasted datum.  (Note that we don't promise to detect
                                821                 :              * unchanged data in general; this is just a cheap check to avoid
                                822                 :              * sending large values unnecessarily.)
                                823                 :              */
                                824               3 :             pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
 2271 peter_e                   825               3 :             continue;
                                826                 :         }
                                827                 : 
                                828          313702 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
                                829          313702 :         if (!HeapTupleIsValid(typtup))
 2271 peter_e                   830 UBC           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
 2271 peter_e                   831 CBC      313702 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
                                832                 : 
                                833                 :         /*
                                834                 :          * Send in binary if requested and type has suitable send function.
                                835                 :          */
  992 tgl                       836          313702 :         if (binary && OidIsValid(typclass->typsend))
  995                           837          115045 :         {
                                838                 :             bytea      *outputbytes;
                                839                 :             int         len;
                                840                 : 
                                841          115045 :             pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
                                842          115045 :             outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
                                843          115045 :             len = VARSIZE(outputbytes) - VARHDRSZ;
                                844          115045 :             pq_sendint(out, len, 4);    /* length */
                                845          115045 :             pq_sendbytes(out, VARDATA(outputbytes), len);   /* data */
                                846          115045 :             pfree(outputbytes);
                                847                 :         }
                                848                 :         else
                                849                 :         {
                                850                 :             char       *outputstr;
                                851                 : 
                                852          198657 :             pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
                                853          198657 :             outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
                                854          198657 :             pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
                                855          198657 :             pfree(outputstr);
                                856                 :         }
                                857                 : 
 2271 peter_e                   858          313702 :         ReleaseSysCache(typtup);
                                859                 :     }
                                860          182065 : }
                                861                 : 
                                862                 : /*
                                863                 :  * Read tuple in logical replication format from stream.
                                864                 :  */
                                865                 : static void
                                866          147976 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
                                867                 : {
                                868                 :     int         i;
                                869                 :     int         natts;
                                870                 : 
                                871                 :     /* Get number of attributes */
                                872          147976 :     natts = pq_getmsgint(in, 2);
                                873                 : 
                                874                 :     /* Allocate space for per-column values; zero out unused StringInfoDatas */
  995 tgl                       875          147976 :     tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
                                876          147976 :     tuple->colstatus = (char *) palloc(natts * sizeof(char));
  993                           877          147976 :     tuple->ncols = natts;
                                878                 : 
                                879                 :     /* Read the data */
 2271 peter_e                   880          450450 :     for (i = 0; i < natts; i++)
                                881                 :     {
                                882                 :         char        kind;
                                883                 :         int         len;
  995 tgl                       884          302474 :         StringInfo  value = &tuple->colvalues[i];
                                885                 : 
 2271 peter_e                   886          302474 :         kind = pq_getmsgbyte(in);
  995 tgl                       887          302474 :         tuple->colstatus[i] = kind;
                                888                 : 
 2271 peter_e                   889          302474 :         switch (kind)
                                890                 :         {
  995 tgl                       891           50355 :             case LOGICALREP_COLUMN_NULL:
                                892                 :                 /* nothing more to do */
 2271 peter_e                   893           50355 :                 break;
  995 tgl                       894               3 :             case LOGICALREP_COLUMN_UNCHANGED:
                                895                 :                 /* we don't receive the value of an unchanged column */
 2271 peter_e                   896               3 :                 break;
  995 tgl                       897          252116 :             case LOGICALREP_COLUMN_TEXT:
  995 tgl                       898 ECB             :             case LOGICALREP_COLUMN_BINARY:
  995 tgl                       899 GIC      252116 :                 len = pq_getmsgint(in, 4);  /* read length */
                                900                 : 
  995 tgl                       901 ECB             :                 /* and data */
  995 tgl                       902 CBC      252116 :                 value->data = palloc(len + 1);
                                903          252116 :                 pq_copymsgbytes(in, value->data, len);
                                904                 : 
                                905                 :                 /*
                                906                 :                  * Not strictly necessary for LOGICALREP_COLUMN_BINARY, but
                                907                 :                  * per StringInfo practice.
                                908                 :                  */
  995 tgl                       909 GBC      252116 :                 value->data[len] = '\0';
                                910                 : 
  995 tgl                       911 EUB             :                 /* make StringInfo fully valid */
  995 tgl                       912 GIC      252116 :                 value->len = len;
                                913          252116 :                 value->cursor = 0;
  995 tgl                       914 CBC      252116 :                 value->maxlen = len;
 2271 peter_e                   915 GIC      252116 :                 break;
 2271 peter_e                   916 UIC           0 :             default:
 2165 tgl                       917               0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
                                918                 :         }
                                919                 :     }
 2271 peter_e                   920 CBC      147976 : }
                                921                 : 
                                922                 : /*
                                923                 :  * Write relation attribute metadata to the stream.
 2271 peter_e                   924 ECB             :  */
                                925                 : static void
  379 tomas.vondra              926 GIC         286 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
                                927                 : {
 2271 peter_e                   928 ECB             :     TupleDesc   desc;
                                929                 :     int         i;
 2271 peter_e                   930 GIC         286 :     uint16      nliveatts = 0;
 2271 peter_e                   931 CBC         286 :     Bitmapset  *idattrs = NULL;
                                932                 :     bool        replidentfull;
 2271 peter_e                   933 ECB             : 
 2271 peter_e                   934 GIC         286 :     desc = RelationGetDescr(rel);
 2271 peter_e                   935 ECB             : 
                                936                 :     /* send number of live attributes */
 2271 peter_e                   937 GIC         869 :     for (i = 0; i < desc->natts; i++)
 2271 peter_e                   938 ECB             :     {
  379 tomas.vondra              939 CBC         583 :         Form_pg_attribute att = TupleDescAttr(desc, i);
                                940                 : 
                                941             583 :         if (att->attisdropped || att->attgenerated)
 2271 peter_e                   942 GIC           5 :             continue;
  379 tomas.vondra              943 ECB             : 
  379 tomas.vondra              944 GIC         578 :         if (!column_in_column_list(att->attnum, columns))
                                945              53 :             continue;
  379 tomas.vondra              946 ECB             : 
 2271 peter_e                   947 CBC         525 :         nliveatts++;
 2271 peter_e                   948 ECB             :     }
 2006 andres                    949 GIC         286 :     pq_sendint16(out, nliveatts);
                                950                 : 
 2271 peter_e                   951 ECB             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
 2271 peter_e                   952 GIC         286 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
 2271 peter_e                   953 CBC         286 :     if (!replidentfull)
  712 akapila                   954             253 :         idattrs = RelationGetIdentityKeyBitmap(rel);
                                955                 : 
 2271 peter_e                   956 ECB             :     /* send the attributes */
 2271 peter_e                   957 CBC         869 :     for (i = 0; i < desc->natts; i++)
                                958                 :     {
 2058 andres                    959             583 :         Form_pg_attribute att = TupleDescAttr(desc, i);
 2153 bruce                     960             583 :         uint8       flags = 0;
                                961                 : 
 1471 peter                     962 GIC         583 :         if (att->attisdropped || att->attgenerated)
 2271 peter_e                   963 CBC           5 :             continue;
 2271 peter_e                   964 ECB             : 
  379 tomas.vondra              965 GIC         578 :         if (!column_in_column_list(att->attnum, columns))
  379 tomas.vondra              966 CBC          53 :             continue;
                                967                 : 
 2253 heikki.linnakangas        968 ECB             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 2271 peter_e                   969 GIC        1004 :         if (replidentfull ||
                                970             479 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
 2271 peter_e                   971 ECB             :                           idattrs))
 2271 peter_e                   972 GIC         241 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
                                973                 : 
 2271 peter_e                   974 CBC         525 :         pq_sendbyte(out, flags);
                                975                 : 
                                976                 :         /* attribute name */
                                977             525 :         pq_sendstring(out, NameStr(att->attname));
                                978                 : 
                                979                 :         /* attribute type id */
 2006 andres                    980             525 :         pq_sendint32(out, (int) att->atttypid);
 2271 peter_e                   981 ECB             : 
                                982                 :         /* attribute mode */
 2006 andres                    983 GIC         525 :         pq_sendint32(out, att->atttypmod);
                                984                 :     }
                                985                 : 
 2271 peter_e                   986             286 :     bms_free(idattrs);
 2271 peter_e                   987 CBC         286 : }
                                988                 : 
                                989                 : /*
                                990                 :  * Read relation attribute metadata from the stream.
                                991                 :  */
                                992                 : static void
                                993             354 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
                                994                 : {
 2271 peter_e                   995 ECB             :     int         i;
                                996                 :     int         natts;
                                997                 :     char      **attnames;
                                998                 :     Oid        *atttyps;
 2271 peter_e                   999 GIC         354 :     Bitmapset  *attkeys = NULL;
 2271 peter_e                  1000 ECB             : 
 2271 peter_e                  1001 GIC         354 :     natts = pq_getmsgint(in, 2);
                               1002             354 :     attnames = palloc(natts * sizeof(char *));
                               1003             354 :     atttyps = palloc(natts * sizeof(Oid));
                               1004                 : 
 2271 peter_e                  1005 ECB             :     /* read the attributes */
 2271 peter_e                  1006 CBC        1005 :     for (i = 0; i < natts; i++)
 2271 peter_e                  1007 ECB             :     {
                               1008                 :         uint8       flags;
                               1009                 : 
                               1010                 :         /* Check for replica identity column */
 2271 peter_e                  1011 GIC         651 :         flags = pq_getmsgbyte(in);
                               1012             651 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
 2271 peter_e                  1013 CBC         298 :             attkeys = bms_add_member(attkeys, i);
                               1014                 : 
                               1015                 :         /* attribute name */
                               1016             651 :         attnames[i] = pstrdup(pq_getmsgstring(in));
                               1017                 : 
                               1018                 :         /* attribute type id */
                               1019             651 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
 2271 peter_e                  1020 ECB             : 
                               1021                 :         /* we ignore attribute mode for now */
 2271 peter_e                  1022 CBC         651 :         (void) pq_getmsgint(in, 4);
 2271 peter_e                  1023 ECB             :     }
                               1024                 : 
 2271 peter_e                  1025 GIC         354 :     rel->attnames = attnames;
                               1026             354 :     rel->atttyps = atttyps;
                               1027             354 :     rel->attkeys = attkeys;
                               1028             354 :     rel->natts = natts;
 2271 peter_e                  1029 CBC         354 : }
                               1030                 : 
 2271 peter_e                  1031 ECB             : /*
                               1032                 :  * Write the namespace name or empty string for pg_catalog (to save space).
                               1033                 :  */
                               1034                 : static void
 2271 peter_e                  1035 CBC         304 : logicalrep_write_namespace(StringInfo out, Oid nspid)
                               1036                 : {
                               1037             304 :     if (nspid == PG_CATALOG_NAMESPACE)
 2271 peter_e                  1038 GBC           1 :         pq_sendbyte(out, '\0');
                               1039                 :     else
                               1040                 :     {
 2153 bruce                    1041 CBC         303 :         char       *nspname = get_namespace_name(nspid);
                               1042                 : 
 2271 peter_e                  1043             303 :         if (nspname == NULL)
 2271 peter_e                  1044 UIC           0 :             elog(ERROR, "cache lookup failed for namespace %u",
                               1045                 :                  nspid);
                               1046                 : 
 2271 peter_e                  1047 GIC         303 :         pq_sendstring(out, nspname);
                               1048                 :     }
 2271 peter_e                  1049 CBC         304 : }
                               1050                 : 
 2271 peter_e                  1051 ECB             : /*
                               1052                 :  * Read the namespace name while treating empty string as pg_catalog.
                               1053                 :  */
                               1054                 : static const char *
 2271 peter_e                  1055 GIC         372 : logicalrep_read_namespace(StringInfo in)
 2271 peter_e                  1056 ECB             : {
 2271 peter_e                  1057 GIC         372 :     const char *nspname = pq_getmsgstring(in);
                               1058                 : 
                               1059             372 :     if (nspname[0] == '\0')
                               1060               1 :         nspname = "pg_catalog";
                               1061                 : 
                               1062             372 :     return nspname;
 2271 peter_e                  1063 ECB             : }
                               1064                 : 
                               1065                 : /*
  948 akapila                  1066                 :  * Write the information for the start stream message to the output stream.
                               1067                 :  */
                               1068                 : void
  948 akapila                  1069 GIC         615 : logicalrep_write_stream_start(StringInfo out,
                               1070                 :                               TransactionId xid, bool first_segment)
  948 akapila                  1071 ECB             : {
  888 akapila                  1072 GIC         615 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
                               1073                 : 
  948 akapila                  1074 CBC         615 :     Assert(TransactionIdIsValid(xid));
  948 akapila                  1075 ECB             : 
                               1076                 :     /* transaction ID (we're starting to stream, so must be valid) */
  948 akapila                  1077 GIC         615 :     pq_sendint32(out, xid);
                               1078                 : 
                               1079                 :     /* 1 if this is the first streaming segment for this xid */
                               1080             615 :     pq_sendbyte(out, first_segment ? 1 : 0);
  948 akapila                  1081 CBC         615 : }
                               1082                 : 
                               1083                 : /*
                               1084                 :  * Read the information about the start stream message from output stream.
  948 akapila                  1085 ECB             :  */
                               1086                 : TransactionId
  948 akapila                  1087 CBC         835 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
  948 akapila                  1088 ECB             : {
                               1089                 :     TransactionId xid;
                               1090                 : 
  948 akapila                  1091 GIC         835 :     Assert(first_segment);
                               1092                 : 
                               1093             835 :     xid = pq_getmsgint(in, 4);
                               1094             835 :     *first_segment = (pq_getmsgbyte(in) == 1);
                               1095                 : 
                               1096             835 :     return xid;
  948 akapila                  1097 ECB             : }
                               1098                 : 
                               1099                 : /*
                               1100                 :  * Write the stop stream message to the output stream.
                               1101                 :  */
                               1102                 : void
  948 akapila                  1103 GIC         615 : logicalrep_write_stream_stop(StringInfo out)
                               1104                 : {
  598                          1105             615 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
  948 akapila                  1106 CBC         615 : }
                               1107                 : 
                               1108                 : /*
  948 akapila                  1109 ECB             :  * Write STREAM COMMIT to the output stream.
                               1110                 :  */
                               1111                 : void
  948 akapila                  1112 GIC          45 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
  948 akapila                  1113 ECB             :                                XLogRecPtr commit_lsn)
                               1114                 : {
  948 akapila                  1115 GIC          45 :     uint8       flags = 0;
  948 akapila                  1116 ECB             : 
  888 akapila                  1117 GIC          45 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
                               1118                 : 
  948 akapila                  1119 CBC          45 :     Assert(TransactionIdIsValid(txn->xid));
                               1120                 : 
                               1121                 :     /* transaction ID */
                               1122              45 :     pq_sendint32(out, txn->xid);
  948 akapila                  1123 ECB             : 
                               1124                 :     /* send the flags field (unused for now) */
  948 akapila                  1125 CBC          45 :     pq_sendbyte(out, flags);
                               1126                 : 
                               1127                 :     /* send fields */
  948 akapila                  1128 GIC          45 :     pq_sendint64(out, commit_lsn);
                               1129              45 :     pq_sendint64(out, txn->end_lsn);
  634                          1130              45 :     pq_sendint64(out, txn->xact_time.commit_time);
  948 akapila                  1131 CBC          45 : }
                               1132                 : 
                               1133                 : /*
                               1134                 :  * Read STREAM COMMIT from the output stream.
                               1135                 :  */
  948 akapila                  1136 ECB             : TransactionId
  948 akapila                  1137 GIC          61 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
                               1138                 : {
  948 akapila                  1139 ECB             :     TransactionId xid;
                               1140                 :     uint8       flags;
                               1141                 : 
  948 akapila                  1142 GBC          61 :     xid = pq_getmsgint(in, 4);
                               1143                 : 
                               1144                 :     /* read flags (unused for now) */
  948 akapila                  1145 CBC          61 :     flags = pq_getmsgbyte(in);
  948 akapila                  1146 ECB             : 
  948 akapila                  1147 CBC          61 :     if (flags != 0)
  948 akapila                  1148 UIC           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
  948 akapila                  1149 ECB             : 
                               1150                 :     /* read fields */
  948 akapila                  1151 GIC          61 :     commit_data->commit_lsn = pq_getmsgint64(in);
                               1152              61 :     commit_data->end_lsn = pq_getmsgint64(in);
                               1153              61 :     commit_data->committime = pq_getmsgint64(in);
                               1154                 : 
                               1155              61 :     return xid;
                               1156                 : }
                               1157                 : 
                               1158                 : /*
                               1159                 :  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
  948 akapila                  1160 ECB             :  * same for the top-level transaction abort.
                               1161                 :  *
                               1162                 :  * If write_abort_info is true, send the abort_lsn and abort_time fields,
                               1163                 :  * otherwise don't.
                               1164                 :  */
                               1165                 : void
  948 akapila                  1166 GIC          26 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
                               1167                 :                               TransactionId subxid, XLogRecPtr abort_lsn,
                               1168                 :                               TimestampTz abort_time, bool write_abort_info)
                               1169                 : {
  888 akapila                  1170 CBC          26 :     pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
                               1171                 : 
  948 akapila                  1172 GIC          26 :     Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
  948 akapila                  1173 ECB             : 
                               1174                 :     /* transaction ID */
  948 akapila                  1175 GIC          26 :     pq_sendint32(out, xid);
  948 akapila                  1176 CBC          26 :     pq_sendint32(out, subxid);
                               1177                 : 
   90 akapila                  1178 GNC          26 :     if (write_abort_info)
                               1179                 :     {
                               1180              12 :         pq_sendint64(out, abort_lsn);
                               1181              12 :         pq_sendint64(out, abort_time);
                               1182                 :     }
  948 akapila                  1183 GIC          26 : }
  948 akapila                  1184 ECB             : 
                               1185                 : /*
                               1186                 :  * Read STREAM ABORT from the output stream.
                               1187                 :  *
                               1188                 :  * If read_abort_info is true, read the abort_lsn and abort_time fields,
                               1189                 :  * otherwise don't.
                               1190                 :  */
                               1191                 : void
   90 akapila                  1192 GNC          38 : logicalrep_read_stream_abort(StringInfo in,
                               1193                 :                              LogicalRepStreamAbortData *abort_data,
                               1194                 :                              bool read_abort_info)
                               1195                 : {
                               1196              38 :     Assert(abort_data);
                               1197                 : 
                               1198              38 :     abort_data->xid = pq_getmsgint(in, 4);
                               1199              38 :     abort_data->subxid = pq_getmsgint(in, 4);
                               1200                 : 
                               1201              38 :     if (read_abort_info)
                               1202                 :     {
                               1203              24 :         abort_data->abort_lsn = pq_getmsgint64(in);
                               1204              24 :         abort_data->abort_time = pq_getmsgint64(in);
                               1205                 :     }
                               1206                 :     else
                               1207                 :     {
                               1208              14 :         abort_data->abort_lsn = InvalidXLogRecPtr;
                               1209              14 :         abort_data->abort_time = 0;
                               1210                 :     }
  948 akapila                  1211 CBC          38 : }
                               1212                 : 
                               1213                 : /*
                               1214                 :  * Get string representing LogicalRepMsgType.
  590 akapila                  1215 ECB             :  */
                               1216                 : char *
  590 akapila                  1217 CBC         338 : logicalrep_message_type(LogicalRepMsgType action)
  590 akapila                  1218 ECB             : {
  590 akapila                  1219 GIC         338 :     switch (action)
  590 akapila                  1220 ECB             :     {
  590 akapila                  1221 GIC           1 :         case LOGICAL_REP_MSG_BEGIN:
  590 akapila                  1222 CBC           1 :             return "BEGIN";
                               1223               1 :         case LOGICAL_REP_MSG_COMMIT:
  590 akapila                  1224 GIC           1 :             return "COMMIT";
  590 akapila                  1225 UIC           0 :         case LOGICAL_REP_MSG_ORIGIN:
                               1226               0 :             return "ORIGIN";
  590 akapila                  1227 CBC          36 :         case LOGICAL_REP_MSG_INSERT:
                               1228              36 :             return "INSERT";
  590 akapila                  1229 GIC           9 :         case LOGICAL_REP_MSG_UPDATE:
  590 akapila                  1230 CBC           9 :             return "UPDATE";
  590 akapila                  1231 GIC           5 :         case LOGICAL_REP_MSG_DELETE:
                               1232               5 :             return "DELETE";
  590 akapila                  1233 UIC           0 :         case LOGICAL_REP_MSG_TRUNCATE:
                               1234               0 :             return "TRUNCATE";
  590 akapila                  1235 GIC           2 :         case LOGICAL_REP_MSG_RELATION:
  590 akapila                  1236 CBC           2 :             return "RELATION";
  590 akapila                  1237 UIC           0 :         case LOGICAL_REP_MSG_TYPE:
  590 akapila                  1238 LBC           0 :             return "TYPE";
  590 akapila                  1239 UIC           0 :         case LOGICAL_REP_MSG_MESSAGE:
  590 akapila                  1240 LBC           0 :             return "MESSAGE";
  590 akapila                  1241 CBC           1 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
                               1242               1 :             return "BEGIN PREPARE";
                               1243               1 :         case LOGICAL_REP_MSG_PREPARE:
  590 akapila                  1244 GBC           1 :             return "PREPARE";
  590 akapila                  1245 UBC           0 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
  590 akapila                  1246 LBC           0 :             return "COMMIT PREPARED";
                               1247               0 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
                               1248               0 :             return "ROLLBACK PREPARED";
  590 akapila                  1249 CBC          13 :         case LOGICAL_REP_MSG_STREAM_START:
                               1250              13 :             return "STREAM START";
                               1251             228 :         case LOGICAL_REP_MSG_STREAM_STOP:
  590 akapila                  1252 GBC         228 :             return "STREAM STOP";
                               1253              20 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
  590 akapila                  1254 CBC          20 :             return "STREAM COMMIT";
                               1255              19 :         case LOGICAL_REP_MSG_STREAM_ABORT:
  590 akapila                  1256 GBC          19 :             return "STREAM ABORT";
                               1257               2 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
                               1258               2 :             return "STREAM PREPARE";
  590 akapila                  1259 EUB             :     }
  590 akapila                  1260 ECB             : 
  590 akapila                  1261 LBC           0 :     elog(ERROR, "invalid logical replication message type \"%c\"", action);
  590 akapila                  1262 ECB             : 
                               1263                 :     return NULL;                /* keep compiler quiet */
  590 akapila                  1264 EUB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a