Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * proto.c
4 : : * logical replication protocol functions
5 : : *
6 : : * Copyright (c) 2015-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/proto.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/sysattr.h"
16 : : #include "catalog/pg_namespace.h"
17 : : #include "catalog/pg_type.h"
18 : : #include "libpq/pqformat.h"
19 : : #include "replication/logicalproto.h"
20 : : #include "utils/lsyscache.h"
21 : : #include "utils/syscache.h"
22 : :
23 : : /*
24 : : * Protocol message flags.
25 : : */
26 : : #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 : :
28 : : #define MESSAGE_TRANSACTIONAL (1<<0)
29 : : #define TRUNCATE_CASCADE (1<<0)
30 : : #define TRUNCATE_RESTART_SEQS (1<<1)
31 : :
32 : : static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 : : Bitmapset *columns);
34 : : static void logicalrep_write_tuple(StringInfo out, Relation rel,
35 : : TupleTableSlot *slot,
36 : : bool binary, Bitmapset *columns);
37 : : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
38 : : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
39 : :
40 : : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
41 : : static const char *logicalrep_read_namespace(StringInfo in);
42 : :
43 : : /*
44 : : * Check if a column is covered by a column list.
45 : : *
46 : : * Need to be careful about NULL, which is treated as a column list covering
47 : : * all columns.
48 : : */
49 : : static bool
750 tomas.vondra@postgre 50 :CBC 739644 : column_in_column_list(int attnum, Bitmapset *columns)
51 : : {
52 [ + + + + ]: 739644 : return (columns == NULL || bms_is_member(attnum, columns));
53 : : }
54 : :
55 : :
56 : : /*
57 : : * Write BEGIN to the output stream.
58 : : */
59 : : void
2642 peter_e@gmx.net 60 : 423 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
61 : : {
1259 akapila@postgresql.o 62 : 423 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
63 : :
64 : : /* fixed fields */
2642 peter_e@gmx.net 65 : 423 : pq_sendint64(out, txn->final_lsn);
1005 akapila@postgresql.o 66 : 423 : pq_sendint64(out, txn->xact_time.commit_time);
2377 andres@anarazel.de 67 : 423 : pq_sendint32(out, txn->xid);
2642 peter_e@gmx.net 68 : 423 : }
69 : :
70 : : /*
71 : : * Read transaction BEGIN from the stream.
72 : : */
73 : : void
74 : 412 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
75 : : {
76 : : /* read fields */
77 : 412 : begin_data->final_lsn = pq_getmsgint64(in);
78 [ - + ]: 412 : if (begin_data->final_lsn == InvalidXLogRecPtr)
2642 peter_e@gmx.net 79 [ # # ]:UBC 0 : elog(ERROR, "final_lsn not set in begin message");
2642 peter_e@gmx.net 80 :CBC 412 : begin_data->committime = pq_getmsgint64(in);
81 : 412 : begin_data->xid = pq_getmsgint(in, 4);
82 : 412 : }
83 : :
84 : :
85 : : /*
86 : : * Write COMMIT to the output stream.
87 : : */
88 : : void
89 : 423 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
90 : : XLogRecPtr commit_lsn)
91 : : {
2524 bruce@momjian.us 92 : 423 : uint8 flags = 0;
93 : :
1259 akapila@postgresql.o 94 : 423 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
95 : :
96 : : /* send the flags field (unused for now) */
2642 peter_e@gmx.net 97 : 423 : pq_sendbyte(out, flags);
98 : :
99 : : /* send fields */
100 : 423 : pq_sendint64(out, commit_lsn);
101 : 423 : pq_sendint64(out, txn->end_lsn);
1005 akapila@postgresql.o 102 : 423 : pq_sendint64(out, txn->xact_time.commit_time);
2642 peter_e@gmx.net 103 : 423 : }
104 : :
105 : : /*
106 : : * Read transaction COMMIT from the stream.
107 : : */
108 : : void
109 : 382 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
110 : : {
111 : : /* read flags (unused for now) */
2524 bruce@momjian.us 112 : 382 : uint8 flags = pq_getmsgbyte(in);
113 : :
2642 peter_e@gmx.net 114 [ - + ]: 382 : if (flags != 0)
2536 tgl@sss.pgh.pa.us 115 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
116 : :
117 : : /* read fields */
2642 peter_e@gmx.net 118 :CBC 382 : commit_data->commit_lsn = pq_getmsgint64(in);
119 : 382 : commit_data->end_lsn = pq_getmsgint64(in);
120 : 382 : commit_data->committime = pq_getmsgint64(in);
121 : 382 : }
122 : :
123 : : /*
124 : : * Write BEGIN PREPARE to the output stream.
125 : : */
126 : : void
1005 akapila@postgresql.o 127 : 16 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
128 : : {
129 : 16 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
130 : :
131 : : /* fixed fields */
132 : 16 : pq_sendint64(out, txn->final_lsn);
133 : 16 : pq_sendint64(out, txn->end_lsn);
134 : 16 : pq_sendint64(out, txn->xact_time.prepare_time);
135 : 16 : pq_sendint32(out, txn->xid);
136 : :
137 : : /* send gid */
138 : 16 : pq_sendstring(out, txn->gid);
139 : 16 : }
140 : :
141 : : /*
142 : : * Read transaction BEGIN PREPARE from the stream.
143 : : */
144 : : void
145 : 19 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
146 : : {
147 : : /* read fields */
148 : 19 : begin_data->prepare_lsn = pq_getmsgint64(in);
149 [ - + ]: 19 : if (begin_data->prepare_lsn == InvalidXLogRecPtr)
1005 akapila@postgresql.o 150 [ # # ]:UBC 0 : elog(ERROR, "prepare_lsn not set in begin prepare message");
1005 akapila@postgresql.o 151 :CBC 19 : begin_data->end_lsn = pq_getmsgint64(in);
152 [ - + ]: 19 : if (begin_data->end_lsn == InvalidXLogRecPtr)
1005 akapila@postgresql.o 153 [ # # ]:UBC 0 : elog(ERROR, "end_lsn not set in begin prepare message");
1005 akapila@postgresql.o 154 :CBC 19 : begin_data->prepare_time = pq_getmsgint64(in);
155 : 19 : begin_data->xid = pq_getmsgint(in, 4);
156 : :
157 : : /* read gid (copy it into a pre-allocated buffer) */
999 158 : 19 : strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
1005 159 : 19 : }
160 : :
161 : : /*
162 : : * The core functionality for logicalrep_write_prepare and
163 : : * logicalrep_write_stream_prepare.
164 : : */
165 : : static void
990 166 : 31 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
167 : : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
168 : : {
1005 169 : 31 : uint8 flags = 0;
170 : :
990 171 : 31 : pq_sendbyte(out, type);
172 : :
173 : : /*
174 : : * This should only ever happen for two-phase commit transactions, in
175 : : * which case we expect to have a valid GID.
176 : : */
1005 177 [ - + ]: 31 : Assert(txn->gid != NULL);
178 [ - + ]: 31 : Assert(rbtxn_prepared(txn));
990 179 [ - + ]: 31 : Assert(TransactionIdIsValid(txn->xid));
180 : :
181 : : /* send the flags field */
1005 182 : 31 : pq_sendbyte(out, flags);
183 : :
184 : : /* send fields */
185 : 31 : pq_sendint64(out, prepare_lsn);
186 : 31 : pq_sendint64(out, txn->end_lsn);
187 : 31 : pq_sendint64(out, txn->xact_time.prepare_time);
188 : 31 : pq_sendint32(out, txn->xid);
189 : :
190 : : /* send gid */
191 : 31 : pq_sendstring(out, txn->gid);
192 : 31 : }
193 : :
194 : : /*
195 : : * Write PREPARE to the output stream.
196 : : */
197 : : void
990 198 : 16 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
199 : : XLogRecPtr prepare_lsn)
200 : : {
201 : 16 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
202 : : txn, prepare_lsn);
203 : 16 : }
204 : :
205 : : /*
206 : : * The core functionality for logicalrep_read_prepare and
207 : : * logicalrep_read_stream_prepare.
208 : : */
209 : : static void
210 : 38 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
211 : : LogicalRepPreparedTxnData *prepare_data)
212 : : {
213 : : /* read flags */
1005 214 : 38 : uint8 flags = pq_getmsgbyte(in);
215 : :
216 [ - + ]: 38 : if (flags != 0)
990 akapila@postgresql.o 217 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
218 : :
219 : : /* read fields */
1005 akapila@postgresql.o 220 :CBC 38 : prepare_data->prepare_lsn = pq_getmsgint64(in);
221 [ - + ]: 38 : if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
990 akapila@postgresql.o 222 [ # # ]:UBC 0 : elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
1005 akapila@postgresql.o 223 :CBC 38 : prepare_data->end_lsn = pq_getmsgint64(in);
224 [ - + ]: 38 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
990 akapila@postgresql.o 225 [ # # ]:UBC 0 : elog(ERROR, "end_lsn is not set in %s message", msgtype);
1005 akapila@postgresql.o 226 :CBC 38 : prepare_data->prepare_time = pq_getmsgint64(in);
227 : 38 : prepare_data->xid = pq_getmsgint(in, 4);
984 228 [ - + ]: 38 : if (prepare_data->xid == InvalidTransactionId)
984 akapila@postgresql.o 229 [ # # ]:UBC 0 : elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
230 : :
231 : : /* read gid (copy it into a pre-allocated buffer) */
999 akapila@postgresql.o 232 :CBC 38 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
1005 233 : 38 : }
234 : :
235 : : /*
236 : : * Read transaction PREPARE from the stream.
237 : : */
238 : : void
990 239 : 18 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
240 : : {
241 : 18 : logicalrep_read_prepare_common(in, "prepare", prepare_data);
242 : 18 : }
243 : :
244 : : /*
245 : : * Write COMMIT PREPARED to the output stream.
246 : : */
247 : : void
1005 248 : 23 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
249 : : XLogRecPtr commit_lsn)
250 : : {
251 : 23 : uint8 flags = 0;
252 : :
253 : 23 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
254 : :
255 : : /*
256 : : * This should only ever happen for two-phase commit transactions, in
257 : : * which case we expect to have a valid GID.
258 : : */
259 [ - + ]: 23 : Assert(txn->gid != NULL);
260 : :
261 : : /* send the flags field */
262 : 23 : pq_sendbyte(out, flags);
263 : :
264 : : /* send fields */
265 : 23 : pq_sendint64(out, commit_lsn);
266 : 23 : pq_sendint64(out, txn->end_lsn);
267 : 23 : pq_sendint64(out, txn->xact_time.commit_time);
268 : 23 : pq_sendint32(out, txn->xid);
269 : :
270 : : /* send gid */
271 : 23 : pq_sendstring(out, txn->gid);
272 : 23 : }
273 : :
274 : : /*
275 : : * Read transaction COMMIT PREPARED from the stream.
276 : : */
277 : : void
278 : 23 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
279 : : {
280 : : /* read flags */
281 : 23 : uint8 flags = pq_getmsgbyte(in);
282 : :
283 [ - + ]: 23 : if (flags != 0)
1005 akapila@postgresql.o 284 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
285 : :
286 : : /* read fields */
1005 akapila@postgresql.o 287 :CBC 23 : prepare_data->commit_lsn = pq_getmsgint64(in);
288 [ - + ]: 23 : if (prepare_data->commit_lsn == InvalidXLogRecPtr)
1005 akapila@postgresql.o 289 [ # # ]:UBC 0 : elog(ERROR, "commit_lsn is not set in commit prepared message");
1005 akapila@postgresql.o 290 :CBC 23 : prepare_data->end_lsn = pq_getmsgint64(in);
291 [ - + ]: 23 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
1005 akapila@postgresql.o 292 [ # # ]:UBC 0 : elog(ERROR, "end_lsn is not set in commit prepared message");
1005 akapila@postgresql.o 293 :CBC 23 : prepare_data->commit_time = pq_getmsgint64(in);
294 : 23 : prepare_data->xid = pq_getmsgint(in, 4);
295 : :
296 : : /* read gid (copy it into a pre-allocated buffer) */
999 297 : 23 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
1005 298 : 23 : }
299 : :
300 : : /*
301 : : * Write ROLLBACK PREPARED to the output stream.
302 : : */
303 : : void
304 : 8 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
305 : : XLogRecPtr prepare_end_lsn,
306 : : TimestampTz prepare_time)
307 : : {
308 : 8 : uint8 flags = 0;
309 : :
310 : 8 : pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
311 : :
312 : : /*
313 : : * This should only ever happen for two-phase commit transactions, in
314 : : * which case we expect to have a valid GID.
315 : : */
316 [ - + ]: 8 : Assert(txn->gid != NULL);
317 : :
318 : : /* send the flags field */
319 : 8 : pq_sendbyte(out, flags);
320 : :
321 : : /* send fields */
322 : 8 : pq_sendint64(out, prepare_end_lsn);
323 : 8 : pq_sendint64(out, txn->end_lsn);
324 : 8 : pq_sendint64(out, prepare_time);
325 : 8 : pq_sendint64(out, txn->xact_time.commit_time);
326 : 8 : pq_sendint32(out, txn->xid);
327 : :
328 : : /* send gid */
329 : 8 : pq_sendstring(out, txn->gid);
330 : 8 : }
331 : :
332 : : /*
333 : : * Read transaction ROLLBACK PREPARED from the stream.
334 : : */
335 : : void
336 : 9 : logicalrep_read_rollback_prepared(StringInfo in,
337 : : LogicalRepRollbackPreparedTxnData *rollback_data)
338 : : {
339 : : /* read flags */
340 : 9 : uint8 flags = pq_getmsgbyte(in);
341 : :
342 [ - + ]: 9 : if (flags != 0)
1005 akapila@postgresql.o 343 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
344 : :
345 : : /* read fields */
1005 akapila@postgresql.o 346 :CBC 9 : rollback_data->prepare_end_lsn = pq_getmsgint64(in);
347 [ - + ]: 9 : if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
1005 akapila@postgresql.o 348 [ # # ]:UBC 0 : elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
1005 akapila@postgresql.o 349 :CBC 9 : rollback_data->rollback_end_lsn = pq_getmsgint64(in);
350 [ - + ]: 9 : if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
1005 akapila@postgresql.o 351 [ # # ]:UBC 0 : elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
1005 akapila@postgresql.o 352 :CBC 9 : rollback_data->prepare_time = pq_getmsgint64(in);
353 : 9 : rollback_data->rollback_time = pq_getmsgint64(in);
354 : 9 : rollback_data->xid = pq_getmsgint(in, 4);
355 : :
356 : : /* read gid (copy it into a pre-allocated buffer) */
999 357 : 9 : strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
1005 358 : 9 : }
359 : :
360 : : /*
361 : : * Write STREAM PREPARE to the output stream.
362 : : */
363 : : void
984 364 : 15 : logicalrep_write_stream_prepare(StringInfo out,
365 : : ReorderBufferTXN *txn,
366 : : XLogRecPtr prepare_lsn)
367 : : {
368 : 15 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
369 : : txn, prepare_lsn);
370 : 15 : }
371 : :
372 : : /*
373 : : * Read STREAM PREPARE from the stream.
374 : : */
375 : : void
376 : 20 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
377 : : {
378 : 20 : logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
379 : 20 : }
380 : :
381 : : /*
382 : : * Write ORIGIN to the output stream.
383 : : */
384 : : void
2642 peter_e@gmx.net 385 : 9 : logicalrep_write_origin(StringInfo out, const char *origin,
386 : : XLogRecPtr origin_lsn)
387 : : {
1259 akapila@postgresql.o 388 : 9 : pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
389 : :
390 : : /* fixed fields */
2642 peter_e@gmx.net 391 : 9 : pq_sendint64(out, origin_lsn);
392 : :
393 : : /* origin string */
394 : 9 : pq_sendstring(out, origin);
395 : 9 : }
396 : :
397 : : /*
398 : : * Read ORIGIN from the output stream.
399 : : */
400 : : char *
2642 peter_e@gmx.net 401 :UBC 0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
402 : : {
403 : : /* fixed fields */
404 : 0 : *origin_lsn = pq_getmsgint64(in);
405 : :
406 : : /* return origin */
407 : 0 : return pstrdup(pq_getmsgstring(in));
408 : : }
409 : :
410 : : /*
411 : : * Write INSERT to the output stream.
412 : : */
413 : : void
1319 akapila@postgresql.o 414 :CBC 108904 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
415 : : TupleTableSlot *newslot, bool binary, Bitmapset *columns)
416 : : {
1259 417 : 108904 : pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
418 : :
419 : : /* transaction ID (if not valid, we're not streaming) */
1319 420 [ + + ]: 108904 : if (TransactionIdIsValid(xid))
421 : 100087 : pq_sendint32(out, xid);
422 : :
423 : : /* use Oid as relation identifier */
2377 andres@anarazel.de 424 : 108904 : pq_sendint32(out, RelationGetRelid(rel));
425 : :
2642 peter_e@gmx.net 426 : 108904 : pq_sendbyte(out, 'N'); /* new tuple follows */
750 tomas.vondra@postgre 427 : 108904 : logicalrep_write_tuple(out, rel, newslot, binary, columns);
2642 peter_e@gmx.net 428 : 108904 : }
429 : :
430 : : /*
431 : : * Read INSERT from stream.
432 : : *
433 : : * Fills the new tuple.
434 : : */
435 : : LogicalRepRelId
436 : 78817 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
437 : : {
438 : : char action;
439 : : LogicalRepRelId relid;
440 : :
441 : : /* read the relation id */
442 : 78817 : relid = pq_getmsgint(in, 4);
443 : :
444 : 78817 : action = pq_getmsgbyte(in);
445 [ - + ]: 78817 : if (action != 'N')
2642 peter_e@gmx.net 446 [ # # ]:UBC 0 : elog(ERROR, "expected new tuple but got %d",
447 : : action);
448 : :
2642 peter_e@gmx.net 449 :CBC 78817 : logicalrep_read_tuple(in, newtup);
450 : :
451 : 78817 : return relid;
452 : : }
453 : :
454 : : /*
455 : : * Write UPDATE to the output stream.
456 : : */
457 : : void
1319 akapila@postgresql.o 458 : 34439 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
459 : : TupleTableSlot *oldslot, TupleTableSlot *newslot,
460 : : bool binary, Bitmapset *columns)
461 : : {
1259 462 : 34439 : pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
463 : :
2642 peter_e@gmx.net 464 [ + + + + : 34439 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- + ]
465 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
466 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
467 : :
468 : : /* transaction ID (if not valid, we're not streaming) */
1319 akapila@postgresql.o 469 [ + + ]: 34439 : if (TransactionIdIsValid(xid))
470 : 34234 : pq_sendint32(out, xid);
471 : :
472 : : /* use Oid as relation identifier */
2377 andres@anarazel.de 473 : 34439 : pq_sendint32(out, RelationGetRelid(rel));
474 : :
782 akapila@postgresql.o 475 [ + + ]: 34439 : if (oldslot != NULL)
476 : : {
2642 peter_e@gmx.net 477 [ + + ]: 126 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
2489 tgl@sss.pgh.pa.us 478 : 54 : pq_sendbyte(out, 'O'); /* old tuple follows */
479 : : else
480 : 72 : pq_sendbyte(out, 'K'); /* old key follows */
499 akapila@postgresql.o 481 : 126 : logicalrep_write_tuple(out, rel, oldslot, binary, columns);
482 : : }
483 : :
2642 peter_e@gmx.net 484 : 34439 : pq_sendbyte(out, 'N'); /* new tuple follows */
750 tomas.vondra@postgre 485 : 34439 : logicalrep_write_tuple(out, rel, newslot, binary, columns);
2642 peter_e@gmx.net 486 : 34439 : }
487 : :
488 : : /*
489 : : * Read UPDATE from stream.
490 : : */
491 : : LogicalRepRelId
492 : 31935 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
493 : : LogicalRepTupleData *oldtup,
494 : : LogicalRepTupleData *newtup)
495 : : {
496 : : char action;
497 : : LogicalRepRelId relid;
498 : :
499 : : /* read the relation id */
500 : 31935 : relid = pq_getmsgint(in, 4);
501 : :
502 : : /* read and verify action */
503 : 31935 : action = pq_getmsgbyte(in);
504 [ + + + + : 31935 : if (action != 'K' && action != 'O' && action != 'N')
- + ]
2642 peter_e@gmx.net 505 [ # # ]:UBC 0 : elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
506 : : action);
507 : :
508 : : /* check for old tuple */
2642 peter_e@gmx.net 509 [ + + + + ]:CBC 31935 : if (action == 'K' || action == 'O')
510 : : {
511 : 124 : logicalrep_read_tuple(in, oldtup);
512 : 124 : *has_oldtuple = true;
513 : :
514 : 124 : action = pq_getmsgbyte(in);
515 : : }
516 : : else
517 : 31811 : *has_oldtuple = false;
518 : :
519 : : /* check for new tuple */
520 [ - + ]: 31935 : if (action != 'N')
2642 peter_e@gmx.net 521 [ # # ]:UBC 0 : elog(ERROR, "expected action 'N', got %c",
522 : : action);
523 : :
2642 peter_e@gmx.net 524 :CBC 31935 : logicalrep_read_tuple(in, newtup);
525 : :
526 : 31935 : return relid;
527 : : }
528 : :
529 : : /*
530 : : * Write DELETE to the output stream.
531 : : */
532 : : void
1319 akapila@postgresql.o 533 : 41888 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
534 : : TupleTableSlot *oldslot, bool binary,
535 : : Bitmapset *columns)
536 : : {
2642 peter_e@gmx.net 537 [ + + + + : 41888 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
- + ]
538 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
539 : : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
540 : :
1259 akapila@postgresql.o 541 : 41888 : pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
542 : :
543 : : /* transaction ID (if not valid, we're not streaming) */
1319 544 [ + + ]: 41888 : if (TransactionIdIsValid(xid))
545 : 41626 : pq_sendint32(out, xid);
546 : :
547 : : /* use Oid as relation identifier */
2377 andres@anarazel.de 548 : 41888 : pq_sendint32(out, RelationGetRelid(rel));
549 : :
2642 peter_e@gmx.net 550 [ + + ]: 41888 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
551 : 122 : pq_sendbyte(out, 'O'); /* old tuple follows */
552 : : else
553 : 41766 : pq_sendbyte(out, 'K'); /* old key follows */
554 : :
499 akapila@postgresql.o 555 : 41888 : logicalrep_write_tuple(out, rel, oldslot, binary, columns);
2642 peter_e@gmx.net 556 : 41888 : }
557 : :
558 : : /*
559 : : * Read DELETE from stream.
560 : : *
561 : : * Fills the old tuple.
562 : : */
563 : : LogicalRepRelId
564 : 40315 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
565 : : {
566 : : char action;
567 : : LogicalRepRelId relid;
568 : :
569 : : /* read the relation id */
570 : 40315 : relid = pq_getmsgint(in, 4);
571 : :
572 : : /* read and verify action */
573 : 40315 : action = pq_getmsgbyte(in);
574 [ + + - + ]: 40315 : if (action != 'K' && action != 'O')
2642 peter_e@gmx.net 575 [ # # ]:UBC 0 : elog(ERROR, "expected action 'O' or 'K', got %c", action);
576 : :
2642 peter_e@gmx.net 577 :CBC 40315 : logicalrep_read_tuple(in, oldtup);
578 : :
579 : 40315 : return relid;
580 : : }
581 : :
582 : : /*
583 : : * Write TRUNCATE to the output stream.
584 : : */
585 : : void
2199 586 : 19 : logicalrep_write_truncate(StringInfo out,
587 : : TransactionId xid,
588 : : int nrelids,
589 : : Oid relids[],
590 : : bool cascade, bool restart_seqs)
591 : : {
592 : : int i;
2180 tgl@sss.pgh.pa.us 593 : 19 : uint8 flags = 0;
594 : :
1259 akapila@postgresql.o 595 : 19 : pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
596 : :
597 : : /* transaction ID (if not valid, we're not streaming) */
1319 598 [ - + ]: 19 : if (TransactionIdIsValid(xid))
1319 akapila@postgresql.o 599 :UBC 0 : pq_sendint32(out, xid);
600 : :
2199 peter_e@gmx.net 601 :CBC 19 : pq_sendint32(out, nrelids);
602 : :
603 : : /* encode and send truncate flags */
604 [ - + ]: 19 : if (cascade)
2199 peter_e@gmx.net 605 :UBC 0 : flags |= TRUNCATE_CASCADE;
2199 peter_e@gmx.net 606 [ + + ]:CBC 19 : if (restart_seqs)
607 : 1 : flags |= TRUNCATE_RESTART_SEQS;
608 : 19 : pq_sendint8(out, flags);
609 : :
610 [ + + ]: 47 : for (i = 0; i < nrelids; i++)
611 : 28 : pq_sendint32(out, relids[i]);
612 : 19 : }
613 : :
614 : : /*
615 : : * Read TRUNCATE from stream.
616 : : */
617 : : List *
618 : 19 : logicalrep_read_truncate(StringInfo in,
619 : : bool *cascade, bool *restart_seqs)
620 : : {
621 : : int i;
622 : : int nrelids;
623 : 19 : List *relids = NIL;
624 : : uint8 flags;
625 : :
626 : 19 : nrelids = pq_getmsgint(in, 4);
627 : :
628 : : /* read and decode truncate flags */
629 : 19 : flags = pq_getmsgint(in, 1);
630 : 19 : *cascade = (flags & TRUNCATE_CASCADE) > 0;
631 : 19 : *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
632 : :
633 [ + + ]: 47 : for (i = 0; i < nrelids; i++)
634 : 28 : relids = lappend_oid(relids, pq_getmsgint(in, 4));
635 : :
636 : 19 : return relids;
637 : : }
638 : :
639 : : /*
640 : : * Write MESSAGE to stream
641 : : */
642 : : void
1104 akapila@postgresql.o 643 : 5 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
644 : : bool transactional, const char *prefix, Size sz,
645 : : const char *message)
646 : : {
647 : 5 : uint8 flags = 0;
648 : :
649 : 5 : pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
650 : :
651 : : /* encode and send message flags */
652 [ + + ]: 5 : if (transactional)
653 : 2 : flags |= MESSAGE_TRANSACTIONAL;
654 : :
655 : : /* transaction ID (if not valid, we're not streaming) */
656 [ - + ]: 5 : if (TransactionIdIsValid(xid))
1104 akapila@postgresql.o 657 :UBC 0 : pq_sendint32(out, xid);
658 : :
1104 akapila@postgresql.o 659 :CBC 5 : pq_sendint8(out, flags);
660 : 5 : pq_sendint64(out, lsn);
661 : 5 : pq_sendstring(out, prefix);
662 : 5 : pq_sendint32(out, sz);
663 : 5 : pq_sendbytes(out, message, sz);
664 : 5 : }
665 : :
666 : : /*
667 : : * Write relation description to the output stream.
668 : : */
669 : : void
750 tomas.vondra@postgre 670 : 376 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
671 : : Bitmapset *columns)
672 : : {
673 : : char *relname;
674 : :
1259 akapila@postgresql.o 675 : 376 : pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
676 : :
677 : : /* transaction ID (if not valid, we're not streaming) */
1319 678 [ + + ]: 376 : if (TransactionIdIsValid(xid))
679 : 74 : pq_sendint32(out, xid);
680 : :
681 : : /* use Oid as relation identifier */
2377 andres@anarazel.de 682 : 376 : pq_sendint32(out, RelationGetRelid(rel));
683 : :
684 : : /* send qualified relation name */
2642 peter_e@gmx.net 685 : 376 : logicalrep_write_namespace(out, RelationGetNamespace(rel));
686 : 376 : relname = RelationGetRelationName(rel);
687 : 376 : pq_sendstring(out, relname);
688 : :
689 : : /* send replica identity */
690 : 376 : pq_sendbyte(out, rel->rd_rel->relreplident);
691 : :
692 : : /* send the attribute info */
750 tomas.vondra@postgre 693 : 376 : logicalrep_write_attrs(out, rel, columns);
2642 peter_e@gmx.net 694 : 376 : }
695 : :
696 : : /*
697 : : * Read the relation info from stream and return as LogicalRepRelation.
698 : : */
699 : : LogicalRepRelation *
700 : 399 : logicalrep_read_rel(StringInfo in)
701 : : {
2524 bruce@momjian.us 702 : 399 : LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
703 : :
2642 peter_e@gmx.net 704 : 399 : rel->remoteid = pq_getmsgint(in, 4);
705 : :
706 : : /* Read relation name from stream */
707 : 399 : rel->nspname = pstrdup(logicalrep_read_namespace(in));
708 : 399 : rel->relname = pstrdup(pq_getmsgstring(in));
709 : :
710 : : /* Read the replica identity. */
711 : 399 : rel->replident = pq_getmsgbyte(in);
712 : :
713 : : /* Get attribute description */
714 : 399 : logicalrep_read_attrs(in, rel);
715 : :
716 : 399 : return rel;
717 : : }
718 : :
719 : : /*
720 : : * Write type info to the output stream.
721 : : *
722 : : * This function will always write base type info.
723 : : */
724 : : void
1319 akapila@postgresql.o 725 : 18 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
726 : : {
2642 peter_e@gmx.net 727 : 18 : Oid basetypoid = getBaseType(typoid);
728 : : HeapTuple tup;
729 : : Form_pg_type typtup;
730 : :
1259 akapila@postgresql.o 731 : 18 : pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
732 : :
733 : : /* transaction ID (if not valid, we're not streaming) */
1319 734 [ - + ]: 18 : if (TransactionIdIsValid(xid))
1319 akapila@postgresql.o 735 :UBC 0 : pq_sendint32(out, xid);
736 : :
2642 peter_e@gmx.net 737 :CBC 18 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
738 [ - + ]: 18 : if (!HeapTupleIsValid(tup))
2642 peter_e@gmx.net 739 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for type %u", basetypoid);
2642 peter_e@gmx.net 740 :CBC 18 : typtup = (Form_pg_type) GETSTRUCT(tup);
741 : :
742 : : /* use Oid as relation identifier */
2377 andres@anarazel.de 743 : 18 : pq_sendint32(out, typoid);
744 : :
745 : : /* send qualified type name */
2642 peter_e@gmx.net 746 : 18 : logicalrep_write_namespace(out, typtup->typnamespace);
747 : 18 : pq_sendstring(out, NameStr(typtup->typname));
748 : :
749 : 18 : ReleaseSysCache(tup);
750 : 18 : }
751 : :
752 : : /*
753 : : * Read type info from the output stream.
754 : : */
755 : : void
756 : 18 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
757 : : {
758 : 18 : ltyp->remoteid = pq_getmsgint(in, 4);
759 : :
760 : : /* Read type name from stream */
761 : 18 : ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
762 : 18 : ltyp->typname = pstrdup(pq_getmsgstring(in));
763 : 18 : }
764 : :
765 : : /*
766 : : * Write a tuple to the outputstream, in the most efficient format possible.
767 : : */
768 : : static void
782 akapila@postgresql.o 769 : 185357 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
770 : : bool binary, Bitmapset *columns)
771 : : {
772 : : TupleDesc desc;
773 : : Datum *values;
774 : : bool *isnull;
775 : : int i;
2642 peter_e@gmx.net 776 : 185357 : uint16 nliveatts = 0;
777 : :
778 : 185357 : desc = RelationGetDescr(rel);
779 : :
780 [ + + ]: 554485 : for (i = 0; i < desc->natts; i++)
781 : : {
750 tomas.vondra@postgre 782 : 369128 : Form_pg_attribute att = TupleDescAttr(desc, i);
783 : :
784 [ + + + + ]: 369128 : if (att->attisdropped || att->attgenerated)
785 : 31 : continue;
786 : :
787 [ + + ]: 369097 : if (!column_in_column_list(att->attnum, columns))
2642 peter_e@gmx.net 788 : 102 : continue;
789 : :
790 : 368995 : nliveatts++;
791 : : }
2377 andres@anarazel.de 792 : 185357 : pq_sendint16(out, nliveatts);
793 : :
782 akapila@postgresql.o 794 : 185357 : slot_getallattrs(slot);
795 : 185357 : values = slot->tts_values;
796 : 185357 : isnull = slot->tts_isnull;
797 : :
798 : : /* Write the values */
2642 peter_e@gmx.net 799 [ + + ]: 554485 : for (i = 0; i < desc->natts; i++)
800 : : {
801 : : HeapTuple typtup;
802 : : Form_pg_type typclass;
2429 andres@anarazel.de 803 : 369128 : Form_pg_attribute att = TupleDescAttr(desc, i);
804 : :
1842 peter@eisentraut.org 805 [ + + + + ]: 369128 : if (att->attisdropped || att->attgenerated)
2642 peter_e@gmx.net 806 : 31 : continue;
807 : :
750 tomas.vondra@postgre 808 [ + + ]: 369097 : if (!column_in_column_list(att->attnum, columns))
809 : 102 : continue;
810 : :
2642 peter_e@gmx.net 811 [ + + ]: 368995 : if (isnull[i])
812 : : {
1366 tgl@sss.pgh.pa.us 813 : 51940 : pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
2642 peter_e@gmx.net 814 : 51940 : continue;
815 : : }
816 : :
1366 tgl@sss.pgh.pa.us 817 [ + + + + : 317055 : if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
+ + ]
818 : : {
819 : : /*
820 : : * Unchanged toasted datum. (Note that we don't promise to detect
821 : : * unchanged data in general; this is just a cheap check to avoid
822 : : * sending large values unnecessarily.)
823 : : */
824 : 3 : pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
2642 peter_e@gmx.net 825 : 3 : continue;
826 : : }
827 : :
828 : 317052 : typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
829 [ - + ]: 317052 : if (!HeapTupleIsValid(typtup))
2642 peter_e@gmx.net 830 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for type %u", att->atttypid);
2642 peter_e@gmx.net 831 :CBC 317052 : typclass = (Form_pg_type) GETSTRUCT(typtup);
832 : :
833 : : /*
834 : : * Send in binary if requested and type has suitable send function.
835 : : */
1363 tgl@sss.pgh.pa.us 836 [ + + + + ]: 317052 : if (binary && OidIsValid(typclass->typsend))
1366 837 : 115045 : {
838 : : bytea *outputbytes;
839 : : int len;
840 : :
841 : 115045 : pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
842 : 115045 : outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
843 : 115045 : len = VARSIZE(outputbytes) - VARHDRSZ;
844 : 115045 : pq_sendint(out, len, 4); /* length */
845 : 115045 : pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
846 : 115045 : pfree(outputbytes);
847 : : }
848 : : else
849 : : {
850 : : char *outputstr;
851 : :
852 : 202007 : pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
853 : 202007 : outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
41 heikki.linnakangas@i 854 :GNC 202007 : pq_sendcountedtext(out, outputstr, strlen(outputstr));
1366 tgl@sss.pgh.pa.us 855 :CBC 202007 : pfree(outputstr);
856 : : }
857 : :
2642 peter_e@gmx.net 858 : 317052 : ReleaseSysCache(typtup);
859 : : }
860 : 185357 : }
861 : :
862 : : /*
863 : : * Read tuple in logical replication format from stream.
864 : : */
865 : : static void
866 : 151191 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
867 : : {
868 : : int i;
869 : : int natts;
870 : :
871 : : /* Get number of attributes */
872 : 151191 : natts = pq_getmsgint(in, 2);
873 : :
874 : : /* Allocate space for per-column values; zero out unused StringInfoDatas */
1366 tgl@sss.pgh.pa.us 875 : 151191 : tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
876 : 151191 : tuple->colstatus = (char *) palloc(natts * sizeof(char));
1364 877 : 151191 : tuple->ncols = natts;
878 : :
879 : : /* Read the data */
2642 peter_e@gmx.net 880 [ + + ]: 456930 : for (i = 0; i < natts; i++)
881 : : {
882 : : char *buff;
883 : : char kind;
884 : : int len;
1366 tgl@sss.pgh.pa.us 885 : 305739 : StringInfo value = &tuple->colvalues[i];
886 : :
2642 peter_e@gmx.net 887 : 305739 : kind = pq_getmsgbyte(in);
1366 tgl@sss.pgh.pa.us 888 : 305739 : tuple->colstatus[i] = kind;
889 : :
2642 peter_e@gmx.net 890 [ + + + - ]: 305739 : switch (kind)
891 : : {
1366 tgl@sss.pgh.pa.us 892 : 50366 : case LOGICALREP_COLUMN_NULL:
893 : : /* nothing more to do */
2642 peter_e@gmx.net 894 : 50366 : break;
1366 tgl@sss.pgh.pa.us 895 : 3 : case LOGICALREP_COLUMN_UNCHANGED:
896 : : /* we don't receive the value of an unchanged column */
2642 peter_e@gmx.net 897 : 3 : break;
1366 tgl@sss.pgh.pa.us 898 : 255370 : case LOGICALREP_COLUMN_TEXT:
899 : : case LOGICALREP_COLUMN_BINARY:
900 : 255370 : len = pq_getmsgint(in, 4); /* read length */
901 : :
902 : : /* and data */
171 drowley@postgresql.o 903 :GNC 255370 : buff = palloc(len + 1);
904 : 255370 : pq_copymsgbytes(in, buff, len);
905 : :
906 : : /*
907 : : * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
908 : : * as input functions require that. For
909 : : * LOGICALREP_COLUMN_BINARY it's not technically required, but
910 : : * it's harmless.
911 : : */
912 : 255370 : buff[len] = '\0';
913 : :
914 : 255370 : initStringInfoFromString(value, buff, len);
2642 peter_e@gmx.net 915 :CBC 255370 : break;
2642 peter_e@gmx.net 916 :UBC 0 : default:
2536 tgl@sss.pgh.pa.us 917 [ # # ]: 0 : elog(ERROR, "unrecognized data representation type '%c'", kind);
918 : : }
919 : : }
2642 peter_e@gmx.net 920 :CBC 151191 : }
921 : :
922 : : /*
923 : : * Write relation attribute metadata to the stream.
924 : : */
925 : : static void
750 tomas.vondra@postgre 926 : 376 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
927 : : {
928 : : TupleDesc desc;
929 : : int i;
2642 peter_e@gmx.net 930 : 376 : uint16 nliveatts = 0;
931 : 376 : Bitmapset *idattrs = NULL;
932 : : bool replidentfull;
933 : :
934 : 376 : desc = RelationGetDescr(rel);
935 : :
936 : : /* send number of live attributes */
937 [ + + ]: 1116 : for (i = 0; i < desc->natts; i++)
938 : : {
750 tomas.vondra@postgre 939 : 740 : Form_pg_attribute att = TupleDescAttr(desc, i);
940 : :
941 [ + + + + ]: 740 : if (att->attisdropped || att->attgenerated)
2642 peter_e@gmx.net 942 : 15 : continue;
943 : :
750 tomas.vondra@postgre 944 [ + + ]: 725 : if (!column_in_column_list(att->attnum, columns))
945 : 53 : continue;
946 : :
2642 peter_e@gmx.net 947 : 672 : nliveatts++;
948 : : }
2377 andres@anarazel.de 949 : 376 : pq_sendint16(out, nliveatts);
950 : :
951 : : /* fetch bitmap of REPLICATION IDENTITY attributes */
2642 peter_e@gmx.net 952 : 376 : replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
953 [ + + ]: 376 : if (!replidentfull)
1083 akapila@postgresql.o 954 : 334 : idattrs = RelationGetIdentityKeyBitmap(rel);
955 : :
956 : : /* send the attributes */
2642 peter_e@gmx.net 957 [ + + ]: 1116 : for (i = 0; i < desc->natts; i++)
958 : : {
2429 andres@anarazel.de 959 : 740 : Form_pg_attribute att = TupleDescAttr(desc, i);
2524 bruce@momjian.us 960 : 740 : uint8 flags = 0;
961 : :
1842 peter@eisentraut.org 962 [ + + + + ]: 740 : if (att->attisdropped || att->attgenerated)
2642 peter_e@gmx.net 963 : 15 : continue;
964 : :
750 tomas.vondra@postgre 965 [ + + ]: 725 : if (!column_in_column_list(att->attnum, columns))
966 : 53 : continue;
967 : :
968 : : /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
2642 peter_e@gmx.net 969 [ + + + + ]: 1286 : if (replidentfull ||
970 : 614 : bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
971 : : idattrs))
972 : 321 : flags |= LOGICALREP_IS_REPLICA_IDENTITY;
973 : :
974 : 672 : pq_sendbyte(out, flags);
975 : :
976 : : /* attribute name */
977 : 672 : pq_sendstring(out, NameStr(att->attname));
978 : :
979 : : /* attribute type id */
2377 andres@anarazel.de 980 : 672 : pq_sendint32(out, (int) att->atttypid);
981 : :
982 : : /* attribute mode */
983 : 672 : pq_sendint32(out, att->atttypmod);
984 : : }
985 : :
2642 peter_e@gmx.net 986 : 376 : bms_free(idattrs);
987 : 376 : }
988 : :
989 : : /*
990 : : * Read relation attribute metadata from the stream.
991 : : */
992 : : static void
993 : 399 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
994 : : {
995 : : int i;
996 : : int natts;
997 : : char **attnames;
998 : : Oid *atttyps;
999 : 399 : Bitmapset *attkeys = NULL;
1000 : :
1001 : 399 : natts = pq_getmsgint(in, 2);
1002 : 399 : attnames = palloc(natts * sizeof(char *));
1003 : 399 : atttyps = palloc(natts * sizeof(Oid));
1004 : :
1005 : : /* read the attributes */
1006 [ + + ]: 1113 : for (i = 0; i < natts; i++)
1007 : : {
1008 : : uint8 flags;
1009 : :
1010 : : /* Check for replica identity column */
1011 : 714 : flags = pq_getmsgbyte(in);
1012 [ + + ]: 714 : if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
1013 : 336 : attkeys = bms_add_member(attkeys, i);
1014 : :
1015 : : /* attribute name */
1016 : 714 : attnames[i] = pstrdup(pq_getmsgstring(in));
1017 : :
1018 : : /* attribute type id */
1019 : 714 : atttyps[i] = (Oid) pq_getmsgint(in, 4);
1020 : :
1021 : : /* we ignore attribute mode for now */
1022 : 714 : (void) pq_getmsgint(in, 4);
1023 : : }
1024 : :
1025 : 399 : rel->attnames = attnames;
1026 : 399 : rel->atttyps = atttyps;
1027 : 399 : rel->attkeys = attkeys;
1028 : 399 : rel->natts = natts;
1029 : 399 : }
1030 : :
1031 : : /*
1032 : : * Write the namespace name or empty string for pg_catalog (to save space).
1033 : : */
1034 : : static void
1035 : 394 : logicalrep_write_namespace(StringInfo out, Oid nspid)
1036 : : {
1037 [ + + ]: 394 : if (nspid == PG_CATALOG_NAMESPACE)
1038 : 1 : pq_sendbyte(out, '\0');
1039 : : else
1040 : : {
2524 bruce@momjian.us 1041 : 393 : char *nspname = get_namespace_name(nspid);
1042 : :
2642 peter_e@gmx.net 1043 [ - + ]: 393 : if (nspname == NULL)
2642 peter_e@gmx.net 1044 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for namespace %u",
1045 : : nspid);
1046 : :
2642 peter_e@gmx.net 1047 :CBC 393 : pq_sendstring(out, nspname);
1048 : : }
1049 : 394 : }
1050 : :
1051 : : /*
1052 : : * Read the namespace name while treating empty string as pg_catalog.
1053 : : */
1054 : : static const char *
1055 : 417 : logicalrep_read_namespace(StringInfo in)
1056 : : {
1057 : 417 : const char *nspname = pq_getmsgstring(in);
1058 : :
1059 [ + + ]: 417 : if (nspname[0] == '\0')
1060 : 1 : nspname = "pg_catalog";
1061 : :
1062 : 417 : return nspname;
1063 : : }
1064 : :
1065 : : /*
1066 : : * Write the information for the start stream message to the output stream.
1067 : : */
1068 : : void
1319 akapila@postgresql.o 1069 : 652 : logicalrep_write_stream_start(StringInfo out,
1070 : : TransactionId xid, bool first_segment)
1071 : : {
1259 1072 : 652 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1073 : :
1319 1074 [ - + ]: 652 : Assert(TransactionIdIsValid(xid));
1075 : :
1076 : : /* transaction ID (we're starting to stream, so must be valid) */
1077 : 652 : pq_sendint32(out, xid);
1078 : :
1079 : : /* 1 if this is the first streaming segment for this xid */
1080 : 652 : pq_sendbyte(out, first_segment ? 1 : 0);
1081 : 652 : }
1082 : :
1083 : : /*
1084 : : * Read the information about the start stream message from output stream.
1085 : : */
1086 : : TransactionId
1087 : 916 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
1088 : : {
1089 : : TransactionId xid;
1090 : :
1091 [ - + ]: 916 : Assert(first_segment);
1092 : :
1093 : 916 : xid = pq_getmsgint(in, 4);
1094 : 916 : *first_segment = (pq_getmsgbyte(in) == 1);
1095 : :
1096 : 916 : return xid;
1097 : : }
1098 : :
1099 : : /*
1100 : : * Write the stop stream message to the output stream.
1101 : : */
1102 : : void
1103 : 652 : logicalrep_write_stream_stop(StringInfo out)
1104 : : {
969 1105 : 652 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
1319 1106 : 652 : }
1107 : :
1108 : : /*
1109 : : * Write STREAM COMMIT to the output stream.
1110 : : */
1111 : : void
1112 : 46 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
1113 : : XLogRecPtr commit_lsn)
1114 : : {
1115 : 46 : uint8 flags = 0;
1116 : :
1259 1117 : 46 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1118 : :
1319 1119 [ - + ]: 46 : Assert(TransactionIdIsValid(txn->xid));
1120 : :
1121 : : /* transaction ID */
1122 : 46 : pq_sendint32(out, txn->xid);
1123 : :
1124 : : /* send the flags field (unused for now) */
1125 : 46 : pq_sendbyte(out, flags);
1126 : :
1127 : : /* send fields */
1128 : 46 : pq_sendint64(out, commit_lsn);
1129 : 46 : pq_sendint64(out, txn->end_lsn);
1005 1130 : 46 : pq_sendint64(out, txn->xact_time.commit_time);
1319 1131 : 46 : }
1132 : :
1133 : : /*
1134 : : * Read STREAM COMMIT from the output stream.
1135 : : */
1136 : : TransactionId
1137 : 64 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1138 : : {
1139 : : TransactionId xid;
1140 : : uint8 flags;
1141 : :
1142 : 64 : xid = pq_getmsgint(in, 4);
1143 : :
1144 : : /* read flags (unused for now) */
1145 : 64 : flags = pq_getmsgbyte(in);
1146 : :
1147 [ - + ]: 64 : if (flags != 0)
1319 akapila@postgresql.o 1148 [ # # ]:UBC 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
1149 : :
1150 : : /* read fields */
1319 akapila@postgresql.o 1151 :CBC 64 : commit_data->commit_lsn = pq_getmsgint64(in);
1152 : 64 : commit_data->end_lsn = pq_getmsgint64(in);
1153 : 64 : commit_data->committime = pq_getmsgint64(in);
1154 : :
1155 : 64 : return xid;
1156 : : }
1157 : :
1158 : : /*
1159 : : * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1160 : : * same for the top-level transaction abort.
1161 : : *
1162 : : * If write_abort_info is true, send the abort_lsn and abort_time fields,
1163 : : * otherwise don't.
1164 : : */
1165 : : void
1166 : 26 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1167 : : TransactionId subxid, XLogRecPtr abort_lsn,
1168 : : TimestampTz abort_time, bool write_abort_info)
1169 : : {
1259 1170 : 26 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1171 : :
1319 1172 [ + - - + ]: 26 : Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
1173 : :
1174 : : /* transaction ID */
1175 : 26 : pq_sendint32(out, xid);
1176 : 26 : pq_sendint32(out, subxid);
1177 : :
461 1178 [ + + ]: 26 : if (write_abort_info)
1179 : : {
1180 : 12 : pq_sendint64(out, abort_lsn);
1181 : 12 : pq_sendint64(out, abort_time);
1182 : : }
1319 1183 : 26 : }
1184 : :
1185 : : /*
1186 : : * Read STREAM ABORT from the output stream.
1187 : : *
1188 : : * If read_abort_info is true, read the abort_lsn and abort_time fields,
1189 : : * otherwise don't.
1190 : : */
1191 : : void
461 1192 : 38 : logicalrep_read_stream_abort(StringInfo in,
1193 : : LogicalRepStreamAbortData *abort_data,
1194 : : bool read_abort_info)
1195 : : {
1196 [ - + ]: 38 : Assert(abort_data);
1197 : :
1198 : 38 : abort_data->xid = pq_getmsgint(in, 4);
1199 : 38 : abort_data->subxid = pq_getmsgint(in, 4);
1200 : :
1201 [ + + ]: 38 : if (read_abort_info)
1202 : : {
1203 : 24 : abort_data->abort_lsn = pq_getmsgint64(in);
1204 : 24 : abort_data->abort_time = pq_getmsgint64(in);
1205 : : }
1206 : : else
1207 : : {
1208 : 14 : abort_data->abort_lsn = InvalidXLogRecPtr;
1209 : 14 : abort_data->abort_time = 0;
1210 : : }
1319 1211 : 38 : }
1212 : :
1213 : : /*
1214 : : * Get string representing LogicalRepMsgType.
1215 : : */
1216 : : const char *
961 1217 : 376 : logicalrep_message_type(LogicalRepMsgType action)
1218 : : {
1219 : : static char err_unknown[20];
1220 : :
1221 [ + + - + : 376 : switch (action)
+ + - + -
- + + - -
+ + + + +
- ]
1222 : : {
1223 : 1 : case LOGICAL_REP_MSG_BEGIN:
1224 : 1 : return "BEGIN";
1225 : 1 : case LOGICAL_REP_MSG_COMMIT:
1226 : 1 : return "COMMIT";
961 akapila@postgresql.o 1227 :UBC 0 : case LOGICAL_REP_MSG_ORIGIN:
1228 : 0 : return "ORIGIN";
961 akapila@postgresql.o 1229 :CBC 39 : case LOGICAL_REP_MSG_INSERT:
1230 : 39 : return "INSERT";
1231 : 10 : case LOGICAL_REP_MSG_UPDATE:
1232 : 10 : return "UPDATE";
1233 : 5 : case LOGICAL_REP_MSG_DELETE:
1234 : 5 : return "DELETE";
961 akapila@postgresql.o 1235 :UBC 0 : case LOGICAL_REP_MSG_TRUNCATE:
1236 : 0 : return "TRUNCATE";
961 akapila@postgresql.o 1237 :CBC 2 : case LOGICAL_REP_MSG_RELATION:
1238 : 2 : return "RELATION";
961 akapila@postgresql.o 1239 :UBC 0 : case LOGICAL_REP_MSG_TYPE:
1240 : 0 : return "TYPE";
1241 : 0 : case LOGICAL_REP_MSG_MESSAGE:
1242 : 0 : return "MESSAGE";
961 akapila@postgresql.o 1243 :CBC 1 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
1244 : 1 : return "BEGIN PREPARE";
1245 : 1 : case LOGICAL_REP_MSG_PREPARE:
1246 : 1 : return "PREPARE";
961 akapila@postgresql.o 1247 :UBC 0 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
1248 : 0 : return "COMMIT PREPARED";
1249 : 0 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1250 : 0 : return "ROLLBACK PREPARED";
961 akapila@postgresql.o 1251 :CBC 14 : case LOGICAL_REP_MSG_STREAM_START:
1252 : 14 : return "STREAM START";
1253 : 257 : case LOGICAL_REP_MSG_STREAM_STOP:
1254 : 257 : return "STREAM STOP";
1255 : 21 : case LOGICAL_REP_MSG_STREAM_COMMIT:
1256 : 21 : return "STREAM COMMIT";
1257 : 19 : case LOGICAL_REP_MSG_STREAM_ABORT:
1258 : 19 : return "STREAM ABORT";
1259 : 5 : case LOGICAL_REP_MSG_STREAM_PREPARE:
1260 : 5 : return "STREAM PREPARE";
1261 : : }
1262 : :
1263 : : /*
1264 : : * This message provides context in the error raised when applying a
1265 : : * logical message. So we can't throw an error here. Return an unknown
1266 : : * indicator value so that the original error is still reported.
1267 : : */
264 akapila@postgresql.o 1268 :UBC 0 : snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1269 : :
1270 : 0 : return err_unknown;
1271 : : }
|