Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * proto.c
4 : * logical replication protocol functions
5 : *
6 : * Copyright (c) 2015-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/proto.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/sysattr.h"
16 : #include "catalog/pg_namespace.h"
17 : #include "catalog/pg_type.h"
18 : #include "libpq/pqformat.h"
19 : #include "replication/logicalproto.h"
20 : #include "utils/lsyscache.h"
21 : #include "utils/syscache.h"
22 :
23 : /*
24 : * Protocol message flags.
25 : */
26 : #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 :
28 : #define MESSAGE_TRANSACTIONAL (1<<0)
29 : #define TRUNCATE_CASCADE (1<<0)
30 : #define TRUNCATE_RESTART_SEQS (1<<1)
31 :
32 : static void logicalrep_write_attrs(StringInfo out, Relation rel,
33 : Bitmapset *columns);
34 : static void logicalrep_write_tuple(StringInfo out, Relation rel,
35 : TupleTableSlot *slot,
36 : bool binary, Bitmapset *columns);
37 : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
38 : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
39 :
40 : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
41 : static const char *logicalrep_read_namespace(StringInfo in);
42 :
43 : /*
44 : * Check if a column is covered by a column list.
45 : *
46 : * Need to be careful about NULL, which is treated as a column list covering
47 : * all columns.
48 : */
49 : static bool
379 tomas.vondra 50 CBC 732504 : column_in_column_list(int attnum, Bitmapset *columns)
51 : {
52 732504 : return (columns == NULL || bms_is_member(attnum, columns));
53 : }
54 :
55 :
56 : /*
57 : * Write BEGIN to the output stream.
58 : */
59 : void
2271 peter_e 60 335 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
61 : {
888 akapila 62 335 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
63 :
64 : /* fixed fields */
2271 peter_e 65 335 : pq_sendint64(out, txn->final_lsn);
634 akapila 66 335 : pq_sendint64(out, txn->xact_time.commit_time);
2006 andres 67 335 : pq_sendint32(out, txn->xid);
2271 peter_e 68 335 : }
69 :
70 : /*
71 : * Read transaction BEGIN from the stream.
72 : */
73 : void
74 380 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
75 : {
76 : /* read fields */
77 380 : begin_data->final_lsn = pq_getmsgint64(in);
78 380 : if (begin_data->final_lsn == InvalidXLogRecPtr)
2271 peter_e 79 UBC 0 : elog(ERROR, "final_lsn not set in begin message");
2271 peter_e 80 CBC 380 : begin_data->committime = pq_getmsgint64(in);
81 380 : begin_data->xid = pq_getmsgint(in, 4);
82 380 : }
83 :
84 :
85 : /*
86 : * Write COMMIT to the output stream.
87 : */
88 : void
89 334 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
90 : XLogRecPtr commit_lsn)
91 : {
2153 bruce 92 334 : uint8 flags = 0;
93 :
888 akapila 94 334 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
95 :
96 : /* send the flags field (unused for now) */
2271 peter_e 97 334 : pq_sendbyte(out, flags);
98 :
99 : /* send fields */
100 334 : pq_sendint64(out, commit_lsn);
101 334 : pq_sendint64(out, txn->end_lsn);
634 akapila 102 334 : pq_sendint64(out, txn->xact_time.commit_time);
2271 peter_e 103 334 : }
104 :
105 : /*
106 : * Read transaction COMMIT from the stream.
107 : */
108 : void
109 355 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
110 : {
111 : /* read flags (unused for now) */
2153 bruce 112 355 : uint8 flags = pq_getmsgbyte(in);
113 :
2271 peter_e 114 355 : if (flags != 0)
2165 tgl 115 UBC 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
116 :
117 : /* read fields */
2271 peter_e 118 CBC 355 : commit_data->commit_lsn = pq_getmsgint64(in);
119 355 : commit_data->end_lsn = pq_getmsgint64(in);
120 355 : commit_data->committime = pq_getmsgint64(in);
121 355 : }
122 :
123 : /*
124 : * Write BEGIN PREPARE to the output stream.
125 : */
126 : void
634 akapila 127 18 : logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
128 : {
129 18 : pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
130 :
131 : /* fixed fields */
132 18 : pq_sendint64(out, txn->final_lsn);
133 18 : pq_sendint64(out, txn->end_lsn);
134 18 : pq_sendint64(out, txn->xact_time.prepare_time);
135 18 : pq_sendint32(out, txn->xid);
136 :
137 : /* send gid */
138 18 : pq_sendstring(out, txn->gid);
139 18 : }
140 :
141 : /*
142 : * Read transaction BEGIN PREPARE from the stream.
143 : */
144 : void
145 14 : logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
146 : {
147 : /* read fields */
148 14 : begin_data->prepare_lsn = pq_getmsgint64(in);
149 14 : if (begin_data->prepare_lsn == InvalidXLogRecPtr)
634 akapila 150 UBC 0 : elog(ERROR, "prepare_lsn not set in begin prepare message");
634 akapila 151 CBC 14 : begin_data->end_lsn = pq_getmsgint64(in);
152 14 : if (begin_data->end_lsn == InvalidXLogRecPtr)
634 akapila 153 UBC 0 : elog(ERROR, "end_lsn not set in begin prepare message");
634 akapila 154 CBC 14 : begin_data->prepare_time = pq_getmsgint64(in);
155 14 : begin_data->xid = pq_getmsgint(in, 4);
156 :
157 : /* read gid (copy it into a pre-allocated buffer) */
628 158 14 : strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
634 159 14 : }
160 :
161 : /*
162 : * The core functionality for logicalrep_write_prepare and
163 : * logicalrep_write_stream_prepare.
164 : */
165 : static void
619 166 29 : logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
167 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
168 : {
634 169 29 : uint8 flags = 0;
170 :
619 171 29 : pq_sendbyte(out, type);
172 :
173 : /*
174 : * This should only ever happen for two-phase commit transactions, in
175 : * which case we expect to have a valid GID.
176 : */
634 177 29 : Assert(txn->gid != NULL);
178 29 : Assert(rbtxn_prepared(txn));
619 179 29 : Assert(TransactionIdIsValid(txn->xid));
180 :
181 : /* send the flags field */
634 182 29 : pq_sendbyte(out, flags);
183 :
184 : /* send fields */
185 29 : pq_sendint64(out, prepare_lsn);
186 29 : pq_sendint64(out, txn->end_lsn);
187 29 : pq_sendint64(out, txn->xact_time.prepare_time);
188 29 : pq_sendint32(out, txn->xid);
189 :
190 : /* send gid */
191 29 : pq_sendstring(out, txn->gid);
192 29 : }
193 :
194 : /*
195 : * Write PREPARE to the output stream.
196 : */
197 : void
619 198 18 : logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
199 : XLogRecPtr prepare_lsn)
200 : {
201 18 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
202 : txn, prepare_lsn);
203 18 : }
204 :
205 : /*
206 : * The core functionality for logicalrep_read_prepare and
207 : * logicalrep_read_stream_prepare.
208 : */
209 : static void
210 24 : logicalrep_read_prepare_common(StringInfo in, char *msgtype,
211 : LogicalRepPreparedTxnData *prepare_data)
212 : {
213 : /* read flags */
634 214 24 : uint8 flags = pq_getmsgbyte(in);
215 :
216 24 : if (flags != 0)
619 akapila 217 UBC 0 : elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
218 :
219 : /* read fields */
634 akapila 220 CBC 24 : prepare_data->prepare_lsn = pq_getmsgint64(in);
221 24 : if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
619 akapila 222 UBC 0 : elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
634 akapila 223 CBC 24 : prepare_data->end_lsn = pq_getmsgint64(in);
224 24 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
619 akapila 225 UBC 0 : elog(ERROR, "end_lsn is not set in %s message", msgtype);
634 akapila 226 CBC 24 : prepare_data->prepare_time = pq_getmsgint64(in);
227 24 : prepare_data->xid = pq_getmsgint(in, 4);
613 228 24 : if (prepare_data->xid == InvalidTransactionId)
613 akapila 229 UBC 0 : elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
230 :
231 : /* read gid (copy it into a pre-allocated buffer) */
628 akapila 232 CBC 24 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
634 233 24 : }
234 :
235 : /*
236 : * Read transaction PREPARE from the stream.
237 : */
238 : void
619 239 13 : logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
240 : {
241 13 : logicalrep_read_prepare_common(in, "prepare", prepare_data);
242 13 : }
243 :
244 : /*
245 : * Write COMMIT PREPARED to the output stream.
246 : */
247 : void
634 248 22 : logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
249 : XLogRecPtr commit_lsn)
250 : {
251 22 : uint8 flags = 0;
252 :
253 22 : pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
254 :
255 : /*
256 : * This should only ever happen for two-phase commit transactions, in
257 : * which case we expect to have a valid GID.
258 : */
259 22 : Assert(txn->gid != NULL);
260 :
261 : /* send the flags field */
262 22 : pq_sendbyte(out, flags);
263 :
264 : /* send fields */
265 22 : pq_sendint64(out, commit_lsn);
266 22 : pq_sendint64(out, txn->end_lsn);
267 22 : pq_sendint64(out, txn->xact_time.commit_time);
268 22 : pq_sendint32(out, txn->xid);
269 :
270 : /* send gid */
271 22 : pq_sendstring(out, txn->gid);
272 22 : }
273 :
274 : /*
275 : * Read transaction COMMIT PREPARED from the stream.
276 : */
277 : void
278 19 : logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
279 : {
280 : /* read flags */
281 19 : uint8 flags = pq_getmsgbyte(in);
282 :
283 19 : if (flags != 0)
634 akapila 284 UBC 0 : elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
285 :
286 : /* read fields */
634 akapila 287 CBC 19 : prepare_data->commit_lsn = pq_getmsgint64(in);
288 19 : if (prepare_data->commit_lsn == InvalidXLogRecPtr)
634 akapila 289 UBC 0 : elog(ERROR, "commit_lsn is not set in commit prepared message");
634 akapila 290 CBC 19 : prepare_data->end_lsn = pq_getmsgint64(in);
291 19 : if (prepare_data->end_lsn == InvalidXLogRecPtr)
634 akapila 292 UBC 0 : elog(ERROR, "end_lsn is not set in commit prepared message");
634 akapila 293 CBC 19 : prepare_data->commit_time = pq_getmsgint64(in);
294 19 : prepare_data->xid = pq_getmsgint(in, 4);
295 :
296 : /* read gid (copy it into a pre-allocated buffer) */
628 297 19 : strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
634 298 19 : }
299 :
300 : /*
301 : * Write ROLLBACK PREPARED to the output stream.
302 : */
303 : void
304 8 : logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
305 : XLogRecPtr prepare_end_lsn,
306 : TimestampTz prepare_time)
307 : {
308 8 : uint8 flags = 0;
309 :
310 8 : pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
311 :
312 : /*
313 : * This should only ever happen for two-phase commit transactions, in
314 : * which case we expect to have a valid GID.
315 : */
316 8 : Assert(txn->gid != NULL);
317 :
318 : /* send the flags field */
319 8 : pq_sendbyte(out, flags);
320 :
321 : /* send fields */
322 8 : pq_sendint64(out, prepare_end_lsn);
323 8 : pq_sendint64(out, txn->end_lsn);
324 8 : pq_sendint64(out, prepare_time);
325 8 : pq_sendint64(out, txn->xact_time.commit_time);
326 8 : pq_sendint32(out, txn->xid);
327 :
328 : /* send gid */
329 8 : pq_sendstring(out, txn->gid);
330 8 : }
331 :
332 : /*
333 : * Read transaction ROLLBACK PREPARED from the stream.
334 : */
335 : void
336 5 : logicalrep_read_rollback_prepared(StringInfo in,
337 : LogicalRepRollbackPreparedTxnData *rollback_data)
338 : {
339 : /* read flags */
340 5 : uint8 flags = pq_getmsgbyte(in);
341 :
342 5 : if (flags != 0)
634 akapila 343 UBC 0 : elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
344 :
345 : /* read fields */
634 akapila 346 CBC 5 : rollback_data->prepare_end_lsn = pq_getmsgint64(in);
347 5 : if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
634 akapila 348 UBC 0 : elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
634 akapila 349 CBC 5 : rollback_data->rollback_end_lsn = pq_getmsgint64(in);
350 5 : if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
634 akapila 351 UBC 0 : elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
634 akapila 352 CBC 5 : rollback_data->prepare_time = pq_getmsgint64(in);
353 5 : rollback_data->rollback_time = pq_getmsgint64(in);
354 5 : rollback_data->xid = pq_getmsgint(in, 4);
355 :
356 : /* read gid (copy it into a pre-allocated buffer) */
628 357 5 : strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
634 358 5 : }
359 :
360 : /*
361 : * Write STREAM PREPARE to the output stream.
362 : */
363 : void
613 364 11 : logicalrep_write_stream_prepare(StringInfo out,
365 : ReorderBufferTXN *txn,
366 : XLogRecPtr prepare_lsn)
367 : {
368 11 : logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
369 : txn, prepare_lsn);
370 11 : }
371 :
372 : /*
373 : * Read STREAM PREPARE from the stream.
374 : */
375 : void
376 11 : logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
377 : {
378 11 : logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
379 11 : }
380 :
381 : /*
382 : * Write ORIGIN to the output stream.
383 : */
384 : void
2271 peter_e 385 7 : logicalrep_write_origin(StringInfo out, const char *origin,
386 : XLogRecPtr origin_lsn)
387 : {
888 akapila 388 7 : pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
389 :
390 : /* fixed fields */
2271 peter_e 391 7 : pq_sendint64(out, origin_lsn);
392 :
393 : /* origin string */
394 7 : pq_sendstring(out, origin);
395 7 : }
396 :
397 : /*
398 : * Read ORIGIN from the output stream.
399 : */
400 : char *
2271 peter_e 401 UBC 0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
402 : {
403 : /* fixed fields */
404 0 : *origin_lsn = pq_getmsgint64(in);
405 :
406 : /* return origin */
407 0 : return pstrdup(pq_getmsgstring(in));
408 : }
409 :
410 : /*
411 : * Write INSERT to the output stream.
412 : */
413 : void
948 akapila 414 CBC 105697 : logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
415 : TupleTableSlot *newslot, bool binary, Bitmapset *columns)
416 : {
888 417 105697 : pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
418 :
419 : /* transaction ID (if not valid, we're not streaming) */
948 420 105697 : if (TransactionIdIsValid(xid))
421 100075 : pq_sendint32(out, xid);
422 :
423 : /* use Oid as relation identifier */
2006 andres 424 105697 : pq_sendint32(out, RelationGetRelid(rel));
425 :
2271 peter_e 426 105697 : pq_sendbyte(out, 'N'); /* new tuple follows */
379 tomas.vondra 427 105697 : logicalrep_write_tuple(out, rel, newslot, binary, columns);
2271 peter_e 428 105697 : }
429 :
430 : /*
431 : * Read INSERT from stream.
432 : *
433 : * Fills the new tuple.
434 : */
435 : LogicalRepRelId
436 75638 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
437 : {
438 : char action;
439 : LogicalRepRelId relid;
440 :
441 : /* read the relation id */
442 75638 : relid = pq_getmsgint(in, 4);
443 :
444 75638 : action = pq_getmsgbyte(in);
445 75638 : if (action != 'N')
2271 peter_e 446 UBC 0 : elog(ERROR, "expected new tuple but got %d",
447 : action);
448 :
2271 peter_e 449 CBC 75638 : logicalrep_read_tuple(in, newtup);
450 :
451 75638 : return relid;
452 : }
453 :
454 : /*
455 : * Write UPDATE to the output stream.
456 : */
457 : void
948 akapila 458 34408 : logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
459 : TupleTableSlot *oldslot, TupleTableSlot *newslot,
460 : bool binary, Bitmapset *columns)
461 : {
888 462 34408 : pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
463 :
2271 peter_e 464 34408 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
465 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
466 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
467 :
468 : /* transaction ID (if not valid, we're not streaming) */
948 akapila 469 34408 : if (TransactionIdIsValid(xid))
470 34226 : pq_sendint32(out, xid);
471 :
472 : /* use Oid as relation identifier */
2006 andres 473 34408 : pq_sendint32(out, RelationGetRelid(rel));
474 :
411 akapila 475 34408 : if (oldslot != NULL)
476 : {
2271 peter_e 477 107 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
2118 tgl 478 47 : pq_sendbyte(out, 'O'); /* old tuple follows */
479 : else
480 60 : pq_sendbyte(out, 'K'); /* old key follows */
128 akapila 481 107 : logicalrep_write_tuple(out, rel, oldslot, binary, columns);
482 : }
483 :
2271 peter_e 484 34408 : pq_sendbyte(out, 'N'); /* new tuple follows */
379 tomas.vondra 485 34408 : logicalrep_write_tuple(out, rel, newslot, binary, columns);
2271 peter_e 486 34408 : }
487 :
488 : /*
489 : * Read UPDATE from stream.
490 : */
491 : LogicalRepRelId
492 31917 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
493 : LogicalRepTupleData *oldtup,
494 : LogicalRepTupleData *newtup)
495 : {
496 : char action;
497 : LogicalRepRelId relid;
498 :
499 : /* read the relation id */
500 31917 : relid = pq_getmsgint(in, 4);
501 :
502 : /* read and verify action */
503 31917 : action = pq_getmsgbyte(in);
504 31917 : if (action != 'K' && action != 'O' && action != 'N')
2271 peter_e 505 UBC 0 : elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
506 : action);
507 :
508 : /* check for old tuple */
2271 peter_e 509 CBC 31917 : if (action == 'K' || action == 'O')
510 : {
511 118 : logicalrep_read_tuple(in, oldtup);
512 118 : *has_oldtuple = true;
513 :
514 118 : action = pq_getmsgbyte(in);
515 : }
516 : else
517 31799 : *has_oldtuple = false;
518 :
519 : /* check for new tuple */
520 31917 : if (action != 'N')
2271 peter_e 521 UBC 0 : elog(ERROR, "expected action 'N', got %c",
522 : action);
523 :
2271 peter_e 524 CBC 31917 : logicalrep_read_tuple(in, newtup);
525 :
526 31917 : return relid;
527 : }
528 :
529 : /*
530 : * Write DELETE to the output stream.
531 : */
532 : void
948 akapila 533 41853 : logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
534 : TupleTableSlot *oldslot, bool binary,
535 : Bitmapset *columns)
536 : {
2271 peter_e 537 41853 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
538 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
539 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
540 :
888 akapila 541 41853 : pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
542 :
543 : /* transaction ID (if not valid, we're not streaming) */
948 544 41853 : if (TransactionIdIsValid(xid))
545 41620 : pq_sendint32(out, xid);
546 :
547 : /* use Oid as relation identifier */
2006 andres 548 41853 : pq_sendint32(out, RelationGetRelid(rel));
549 :
2271 peter_e 550 41853 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
551 118 : pq_sendbyte(out, 'O'); /* old tuple follows */
552 : else
553 41735 : pq_sendbyte(out, 'K'); /* old key follows */
554 :
128 akapila 555 41853 : logicalrep_write_tuple(out, rel, oldslot, binary, columns);
2271 peter_e 556 41853 : }
557 :
558 : /*
559 : * Read DELETE from stream.
560 : *
561 : * Fills the old tuple.
562 : */
563 : LogicalRepRelId
564 40303 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
565 : {
566 : char action;
567 : LogicalRepRelId relid;
568 :
569 : /* read the relation id */
570 40303 : relid = pq_getmsgint(in, 4);
571 :
572 : /* read and verify action */
573 40303 : action = pq_getmsgbyte(in);
574 40303 : if (action != 'K' && action != 'O')
2271 peter_e 575 UBC 0 : elog(ERROR, "expected action 'O' or 'K', got %c", action);
576 :
2271 peter_e 577 CBC 40303 : logicalrep_read_tuple(in, oldtup);
578 :
579 40303 : return relid;
580 : }
581 :
582 : /*
583 : * Write TRUNCATE to the output stream.
584 : */
585 : void
1828 586 7 : logicalrep_write_truncate(StringInfo out,
587 : TransactionId xid,
588 : int nrelids,
589 : Oid relids[],
590 : bool cascade, bool restart_seqs)
591 : {
592 : int i;
1809 tgl 593 7 : uint8 flags = 0;
594 :
888 akapila 595 7 : pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
596 :
597 : /* transaction ID (if not valid, we're not streaming) */
948 598 7 : if (TransactionIdIsValid(xid))
948 akapila 599 UBC 0 : pq_sendint32(out, xid);
600 :
1828 peter_e 601 CBC 7 : pq_sendint32(out, nrelids);
602 :
603 : /* encode and send truncate flags */
604 7 : if (cascade)
1828 peter_e 605 UBC 0 : flags |= TRUNCATE_CASCADE;
1828 peter_e 606 CBC 7 : if (restart_seqs)
1828 peter_e 607 UBC 0 : flags |= TRUNCATE_RESTART_SEQS;
1828 peter_e 608 CBC 7 : pq_sendint8(out, flags);
609 :
610 19 : for (i = 0; i < nrelids; i++)
611 12 : pq_sendint32(out, relids[i]);
612 7 : }
613 :
614 : /*
615 : * Read TRUNCATE from stream.
616 : */
617 : List *
618 17 : logicalrep_read_truncate(StringInfo in,
619 : bool *cascade, bool *restart_seqs)
620 : {
621 : int i;
622 : int nrelids;
623 17 : List *relids = NIL;
624 : uint8 flags;
625 :
626 17 : nrelids = pq_getmsgint(in, 4);
627 :
628 : /* read and decode truncate flags */
629 17 : flags = pq_getmsgint(in, 1);
630 17 : *cascade = (flags & TRUNCATE_CASCADE) > 0;
631 17 : *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
632 :
633 43 : for (i = 0; i < nrelids; i++)
634 26 : relids = lappend_oid(relids, pq_getmsgint(in, 4));
635 :
636 17 : return relids;
637 : }
638 :
639 : /*
640 : * Write MESSAGE to stream
641 : */
642 : void
733 akapila 643 5 : logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
644 : bool transactional, const char *prefix, Size sz,
645 : const char *message)
646 : {
647 5 : uint8 flags = 0;
648 :
649 5 : pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
650 :
651 : /* encode and send message flags */
652 5 : if (transactional)
653 2 : flags |= MESSAGE_TRANSACTIONAL;
654 :
655 : /* transaction ID (if not valid, we're not streaming) */
656 5 : if (TransactionIdIsValid(xid))
733 akapila 657 UBC 0 : pq_sendint32(out, xid);
658 :
733 akapila 659 CBC 5 : pq_sendint8(out, flags);
660 5 : pq_sendint64(out, lsn);
661 5 : pq_sendstring(out, prefix);
662 5 : pq_sendint32(out, sz);
663 5 : pq_sendbytes(out, message, sz);
664 5 : }
665 :
666 : /*
667 : * Write relation description to the output stream.
668 : */
669 : void
379 tomas.vondra 670 286 : logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
671 : Bitmapset *columns)
672 : {
673 : char *relname;
674 :
888 akapila 675 286 : pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
676 :
677 : /* transaction ID (if not valid, we're not streaming) */
948 678 286 : if (TransactionIdIsValid(xid))
679 68 : pq_sendint32(out, xid);
680 :
681 : /* use Oid as relation identifier */
2006 andres 682 286 : pq_sendint32(out, RelationGetRelid(rel));
683 :
684 : /* send qualified relation name */
2271 peter_e 685 286 : logicalrep_write_namespace(out, RelationGetNamespace(rel));
686 286 : relname = RelationGetRelationName(rel);
687 286 : pq_sendstring(out, relname);
688 :
689 : /* send replica identity */
690 286 : pq_sendbyte(out, rel->rd_rel->relreplident);
691 :
692 : /* send the attribute info */
379 tomas.vondra 693 286 : logicalrep_write_attrs(out, rel, columns);
2271 peter_e 694 286 : }
695 :
696 : /*
697 : * Read the relation info from stream and return as LogicalRepRelation.
698 : */
699 : LogicalRepRelation *
700 354 : logicalrep_read_rel(StringInfo in)
701 : {
2153 bruce 702 354 : LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
703 :
2271 peter_e 704 354 : rel->remoteid = pq_getmsgint(in, 4);
705 :
706 : /* Read relation name from stream */
707 354 : rel->nspname = pstrdup(logicalrep_read_namespace(in));
708 354 : rel->relname = pstrdup(pq_getmsgstring(in));
709 :
710 : /* Read the replica identity. */
711 354 : rel->replident = pq_getmsgbyte(in);
712 :
713 : /* Get attribute description */
714 354 : logicalrep_read_attrs(in, rel);
715 :
716 354 : return rel;
717 : }
718 :
719 : /*
720 : * Write type info to the output stream.
721 : *
722 : * This function will always write base type info.
723 : */
724 : void
948 akapila 725 18 : logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
726 : {
2271 peter_e 727 18 : Oid basetypoid = getBaseType(typoid);
728 : HeapTuple tup;
729 : Form_pg_type typtup;
730 :
888 akapila 731 18 : pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
732 :
733 : /* transaction ID (if not valid, we're not streaming) */
948 734 18 : if (TransactionIdIsValid(xid))
948 akapila 735 UBC 0 : pq_sendint32(out, xid);
736 :
2271 peter_e 737 CBC 18 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
738 18 : if (!HeapTupleIsValid(tup))
2271 peter_e 739 UBC 0 : elog(ERROR, "cache lookup failed for type %u", basetypoid);
2271 peter_e 740 CBC 18 : typtup = (Form_pg_type) GETSTRUCT(tup);
741 :
742 : /* use Oid as relation identifier */
2006 andres 743 18 : pq_sendint32(out, typoid);
744 :
745 : /* send qualified type name */
2271 peter_e 746 18 : logicalrep_write_namespace(out, typtup->typnamespace);
747 18 : pq_sendstring(out, NameStr(typtup->typname));
748 :
749 18 : ReleaseSysCache(tup);
750 18 : }
751 :
752 : /*
753 : * Read type info from the output stream.
754 : */
755 : void
756 18 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
757 : {
758 18 : ltyp->remoteid = pq_getmsgint(in, 4);
759 :
760 : /* Read type name from stream */
761 18 : ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
762 18 : ltyp->typname = pstrdup(pq_getmsgstring(in));
763 18 : }
764 :
765 : /*
766 : * Write a tuple to the outputstream, in the most efficient format possible.
767 : */
768 : static void
411 akapila 769 182065 : logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
770 : bool binary, Bitmapset *columns)
771 : {
772 : TupleDesc desc;
773 : Datum *values;
774 : bool *isnull;
775 : int i;
2271 peter_e 776 182065 : uint16 nliveatts = 0;
777 :
778 182065 : desc = RelationGetDescr(rel);
779 :
780 547747 : for (i = 0; i < desc->natts; i++)
781 : {
379 tomas.vondra 782 365682 : Form_pg_attribute att = TupleDescAttr(desc, i);
783 :
784 365682 : if (att->attisdropped || att->attgenerated)
785 8 : continue;
786 :
787 365674 : if (!column_in_column_list(att->attnum, columns))
2271 peter_e 788 102 : continue;
789 :
790 365572 : nliveatts++;
791 : }
2006 andres 792 182065 : pq_sendint16(out, nliveatts);
793 :
411 akapila 794 182065 : slot_getallattrs(slot);
795 182065 : values = slot->tts_values;
796 182065 : isnull = slot->tts_isnull;
797 :
798 : /* Write the values */
2271 peter_e 799 547747 : for (i = 0; i < desc->natts; i++)
800 : {
801 : HeapTuple typtup;
802 : Form_pg_type typclass;
2058 andres 803 365682 : Form_pg_attribute att = TupleDescAttr(desc, i);
804 :
1471 peter 805 365682 : if (att->attisdropped || att->attgenerated)
2271 peter_e 806 8 : continue;
807 :
379 tomas.vondra 808 365674 : if (!column_in_column_list(att->attnum, columns))
809 102 : continue;
810 :
2271 peter_e 811 365572 : if (isnull[i])
812 : {
995 tgl 813 51867 : pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
2271 peter_e 814 51867 : continue;
815 : }
816 :
995 tgl 817 313705 : if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
818 : {
819 : /*
820 : * Unchanged toasted datum. (Note that we don't promise to detect
821 : * unchanged data in general; this is just a cheap check to avoid
822 : * sending large values unnecessarily.)
823 : */
824 3 : pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
2271 peter_e 825 3 : continue;
826 : }
827 :
828 313702 : typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
829 313702 : if (!HeapTupleIsValid(typtup))
2271 peter_e 830 UBC 0 : elog(ERROR, "cache lookup failed for type %u", att->atttypid);
2271 peter_e 831 CBC 313702 : typclass = (Form_pg_type) GETSTRUCT(typtup);
832 :
833 : /*
834 : * Send in binary if requested and type has suitable send function.
835 : */
992 tgl 836 313702 : if (binary && OidIsValid(typclass->typsend))
995 837 115045 : {
838 : bytea *outputbytes;
839 : int len;
840 :
841 115045 : pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
842 115045 : outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
843 115045 : len = VARSIZE(outputbytes) - VARHDRSZ;
844 115045 : pq_sendint(out, len, 4); /* length */
845 115045 : pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
846 115045 : pfree(outputbytes);
847 : }
848 : else
849 : {
850 : char *outputstr;
851 :
852 198657 : pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
853 198657 : outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
854 198657 : pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
855 198657 : pfree(outputstr);
856 : }
857 :
2271 peter_e 858 313702 : ReleaseSysCache(typtup);
859 : }
860 182065 : }
861 :
862 : /*
863 : * Read tuple in logical replication format from stream.
864 : */
865 : static void
866 147976 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
867 : {
868 : int i;
869 : int natts;
870 :
871 : /* Get number of attributes */
872 147976 : natts = pq_getmsgint(in, 2);
873 :
874 : /* Allocate space for per-column values; zero out unused StringInfoDatas */
995 tgl 875 147976 : tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
876 147976 : tuple->colstatus = (char *) palloc(natts * sizeof(char));
993 877 147976 : tuple->ncols = natts;
878 :
879 : /* Read the data */
2271 peter_e 880 450450 : for (i = 0; i < natts; i++)
881 : {
882 : char kind;
883 : int len;
995 tgl 884 302474 : StringInfo value = &tuple->colvalues[i];
885 :
2271 peter_e 886 302474 : kind = pq_getmsgbyte(in);
995 tgl 887 302474 : tuple->colstatus[i] = kind;
888 :
2271 peter_e 889 302474 : switch (kind)
890 : {
995 tgl 891 50355 : case LOGICALREP_COLUMN_NULL:
892 : /* nothing more to do */
2271 peter_e 893 50355 : break;
995 tgl 894 3 : case LOGICALREP_COLUMN_UNCHANGED:
895 : /* we don't receive the value of an unchanged column */
2271 peter_e 896 3 : break;
995 tgl 897 252116 : case LOGICALREP_COLUMN_TEXT:
995 tgl 898 ECB : case LOGICALREP_COLUMN_BINARY:
995 tgl 899 GIC 252116 : len = pq_getmsgint(in, 4); /* read length */
900 :
995 tgl 901 ECB : /* and data */
995 tgl 902 CBC 252116 : value->data = palloc(len + 1);
903 252116 : pq_copymsgbytes(in, value->data, len);
904 :
905 : /*
906 : * Not strictly necessary for LOGICALREP_COLUMN_BINARY, but
907 : * per StringInfo practice.
908 : */
995 tgl 909 GBC 252116 : value->data[len] = '\0';
910 :
995 tgl 911 EUB : /* make StringInfo fully valid */
995 tgl 912 GIC 252116 : value->len = len;
913 252116 : value->cursor = 0;
995 tgl 914 CBC 252116 : value->maxlen = len;
2271 peter_e 915 GIC 252116 : break;
2271 peter_e 916 UIC 0 : default:
2165 tgl 917 0 : elog(ERROR, "unrecognized data representation type '%c'", kind);
918 : }
919 : }
2271 peter_e 920 CBC 147976 : }
921 :
922 : /*
923 : * Write relation attribute metadata to the stream.
2271 peter_e 924 ECB : */
925 : static void
379 tomas.vondra 926 GIC 286 : logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
927 : {
2271 peter_e 928 ECB : TupleDesc desc;
929 : int i;
2271 peter_e 930 GIC 286 : uint16 nliveatts = 0;
2271 peter_e 931 CBC 286 : Bitmapset *idattrs = NULL;
932 : bool replidentfull;
2271 peter_e 933 ECB :
2271 peter_e 934 GIC 286 : desc = RelationGetDescr(rel);
2271 peter_e 935 ECB :
936 : /* send number of live attributes */
2271 peter_e 937 GIC 869 : for (i = 0; i < desc->natts; i++)
2271 peter_e 938 ECB : {
379 tomas.vondra 939 CBC 583 : Form_pg_attribute att = TupleDescAttr(desc, i);
940 :
941 583 : if (att->attisdropped || att->attgenerated)
2271 peter_e 942 GIC 5 : continue;
379 tomas.vondra 943 ECB :
379 tomas.vondra 944 GIC 578 : if (!column_in_column_list(att->attnum, columns))
945 53 : continue;
379 tomas.vondra 946 ECB :
2271 peter_e 947 CBC 525 : nliveatts++;
2271 peter_e 948 ECB : }
2006 andres 949 GIC 286 : pq_sendint16(out, nliveatts);
950 :
2271 peter_e 951 ECB : /* fetch bitmap of REPLICATION IDENTITY attributes */
2271 peter_e 952 GIC 286 : replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
2271 peter_e 953 CBC 286 : if (!replidentfull)
712 akapila 954 253 : idattrs = RelationGetIdentityKeyBitmap(rel);
955 :
2271 peter_e 956 ECB : /* send the attributes */
2271 peter_e 957 CBC 869 : for (i = 0; i < desc->natts; i++)
958 : {
2058 andres 959 583 : Form_pg_attribute att = TupleDescAttr(desc, i);
2153 bruce 960 583 : uint8 flags = 0;
961 :
1471 peter 962 GIC 583 : if (att->attisdropped || att->attgenerated)
2271 peter_e 963 CBC 5 : continue;
2271 peter_e 964 ECB :
379 tomas.vondra 965 GIC 578 : if (!column_in_column_list(att->attnum, columns))
379 tomas.vondra 966 CBC 53 : continue;
967 :
2253 heikki.linnakangas 968 ECB : /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
2271 peter_e 969 GIC 1004 : if (replidentfull ||
970 479 : bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
2271 peter_e 971 ECB : idattrs))
2271 peter_e 972 GIC 241 : flags |= LOGICALREP_IS_REPLICA_IDENTITY;
973 :
2271 peter_e 974 CBC 525 : pq_sendbyte(out, flags);
975 :
976 : /* attribute name */
977 525 : pq_sendstring(out, NameStr(att->attname));
978 :
979 : /* attribute type id */
2006 andres 980 525 : pq_sendint32(out, (int) att->atttypid);
2271 peter_e 981 ECB :
982 : /* attribute mode */
2006 andres 983 GIC 525 : pq_sendint32(out, att->atttypmod);
984 : }
985 :
2271 peter_e 986 286 : bms_free(idattrs);
2271 peter_e 987 CBC 286 : }
988 :
989 : /*
990 : * Read relation attribute metadata from the stream.
991 : */
992 : static void
993 354 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
994 : {
2271 peter_e 995 ECB : int i;
996 : int natts;
997 : char **attnames;
998 : Oid *atttyps;
2271 peter_e 999 GIC 354 : Bitmapset *attkeys = NULL;
2271 peter_e 1000 ECB :
2271 peter_e 1001 GIC 354 : natts = pq_getmsgint(in, 2);
1002 354 : attnames = palloc(natts * sizeof(char *));
1003 354 : atttyps = palloc(natts * sizeof(Oid));
1004 :
2271 peter_e 1005 ECB : /* read the attributes */
2271 peter_e 1006 CBC 1005 : for (i = 0; i < natts; i++)
2271 peter_e 1007 ECB : {
1008 : uint8 flags;
1009 :
1010 : /* Check for replica identity column */
2271 peter_e 1011 GIC 651 : flags = pq_getmsgbyte(in);
1012 651 : if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
2271 peter_e 1013 CBC 298 : attkeys = bms_add_member(attkeys, i);
1014 :
1015 : /* attribute name */
1016 651 : attnames[i] = pstrdup(pq_getmsgstring(in));
1017 :
1018 : /* attribute type id */
1019 651 : atttyps[i] = (Oid) pq_getmsgint(in, 4);
2271 peter_e 1020 ECB :
1021 : /* we ignore attribute mode for now */
2271 peter_e 1022 CBC 651 : (void) pq_getmsgint(in, 4);
2271 peter_e 1023 ECB : }
1024 :
2271 peter_e 1025 GIC 354 : rel->attnames = attnames;
1026 354 : rel->atttyps = atttyps;
1027 354 : rel->attkeys = attkeys;
1028 354 : rel->natts = natts;
2271 peter_e 1029 CBC 354 : }
1030 :
2271 peter_e 1031 ECB : /*
1032 : * Write the namespace name or empty string for pg_catalog (to save space).
1033 : */
1034 : static void
2271 peter_e 1035 CBC 304 : logicalrep_write_namespace(StringInfo out, Oid nspid)
1036 : {
1037 304 : if (nspid == PG_CATALOG_NAMESPACE)
2271 peter_e 1038 GBC 1 : pq_sendbyte(out, '\0');
1039 : else
1040 : {
2153 bruce 1041 CBC 303 : char *nspname = get_namespace_name(nspid);
1042 :
2271 peter_e 1043 303 : if (nspname == NULL)
2271 peter_e 1044 UIC 0 : elog(ERROR, "cache lookup failed for namespace %u",
1045 : nspid);
1046 :
2271 peter_e 1047 GIC 303 : pq_sendstring(out, nspname);
1048 : }
2271 peter_e 1049 CBC 304 : }
1050 :
2271 peter_e 1051 ECB : /*
1052 : * Read the namespace name while treating empty string as pg_catalog.
1053 : */
1054 : static const char *
2271 peter_e 1055 GIC 372 : logicalrep_read_namespace(StringInfo in)
2271 peter_e 1056 ECB : {
2271 peter_e 1057 GIC 372 : const char *nspname = pq_getmsgstring(in);
1058 :
1059 372 : if (nspname[0] == '\0')
1060 1 : nspname = "pg_catalog";
1061 :
1062 372 : return nspname;
2271 peter_e 1063 ECB : }
1064 :
1065 : /*
948 akapila 1066 : * Write the information for the start stream message to the output stream.
1067 : */
1068 : void
948 akapila 1069 GIC 615 : logicalrep_write_stream_start(StringInfo out,
1070 : TransactionId xid, bool first_segment)
948 akapila 1071 ECB : {
888 akapila 1072 GIC 615 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
1073 :
948 akapila 1074 CBC 615 : Assert(TransactionIdIsValid(xid));
948 akapila 1075 ECB :
1076 : /* transaction ID (we're starting to stream, so must be valid) */
948 akapila 1077 GIC 615 : pq_sendint32(out, xid);
1078 :
1079 : /* 1 if this is the first streaming segment for this xid */
1080 615 : pq_sendbyte(out, first_segment ? 1 : 0);
948 akapila 1081 CBC 615 : }
1082 :
1083 : /*
1084 : * Read the information about the start stream message from output stream.
948 akapila 1085 ECB : */
1086 : TransactionId
948 akapila 1087 CBC 835 : logicalrep_read_stream_start(StringInfo in, bool *first_segment)
948 akapila 1088 ECB : {
1089 : TransactionId xid;
1090 :
948 akapila 1091 GIC 835 : Assert(first_segment);
1092 :
1093 835 : xid = pq_getmsgint(in, 4);
1094 835 : *first_segment = (pq_getmsgbyte(in) == 1);
1095 :
1096 835 : return xid;
948 akapila 1097 ECB : }
1098 :
1099 : /*
1100 : * Write the stop stream message to the output stream.
1101 : */
1102 : void
948 akapila 1103 GIC 615 : logicalrep_write_stream_stop(StringInfo out)
1104 : {
598 1105 615 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_STOP);
948 akapila 1106 CBC 615 : }
1107 :
1108 : /*
948 akapila 1109 ECB : * Write STREAM COMMIT to the output stream.
1110 : */
1111 : void
948 akapila 1112 GIC 45 : logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
948 akapila 1113 ECB : XLogRecPtr commit_lsn)
1114 : {
948 akapila 1115 GIC 45 : uint8 flags = 0;
948 akapila 1116 ECB :
888 akapila 1117 GIC 45 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
1118 :
948 akapila 1119 CBC 45 : Assert(TransactionIdIsValid(txn->xid));
1120 :
1121 : /* transaction ID */
1122 45 : pq_sendint32(out, txn->xid);
948 akapila 1123 ECB :
1124 : /* send the flags field (unused for now) */
948 akapila 1125 CBC 45 : pq_sendbyte(out, flags);
1126 :
1127 : /* send fields */
948 akapila 1128 GIC 45 : pq_sendint64(out, commit_lsn);
1129 45 : pq_sendint64(out, txn->end_lsn);
634 1130 45 : pq_sendint64(out, txn->xact_time.commit_time);
948 akapila 1131 CBC 45 : }
1132 :
1133 : /*
1134 : * Read STREAM COMMIT from the output stream.
1135 : */
948 akapila 1136 ECB : TransactionId
948 akapila 1137 GIC 61 : logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
1138 : {
948 akapila 1139 ECB : TransactionId xid;
1140 : uint8 flags;
1141 :
948 akapila 1142 GBC 61 : xid = pq_getmsgint(in, 4);
1143 :
1144 : /* read flags (unused for now) */
948 akapila 1145 CBC 61 : flags = pq_getmsgbyte(in);
948 akapila 1146 ECB :
948 akapila 1147 CBC 61 : if (flags != 0)
948 akapila 1148 UIC 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
948 akapila 1149 ECB :
1150 : /* read fields */
948 akapila 1151 GIC 61 : commit_data->commit_lsn = pq_getmsgint64(in);
1152 61 : commit_data->end_lsn = pq_getmsgint64(in);
1153 61 : commit_data->committime = pq_getmsgint64(in);
1154 :
1155 61 : return xid;
1156 : }
1157 :
1158 : /*
1159 : * Write STREAM ABORT to the output stream. Note that xid and subxid will be
948 akapila 1160 ECB : * same for the top-level transaction abort.
1161 : *
1162 : * If write_abort_info is true, send the abort_lsn and abort_time fields,
1163 : * otherwise don't.
1164 : */
1165 : void
948 akapila 1166 GIC 26 : logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
1167 : TransactionId subxid, XLogRecPtr abort_lsn,
1168 : TimestampTz abort_time, bool write_abort_info)
1169 : {
888 akapila 1170 CBC 26 : pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
1171 :
948 akapila 1172 GIC 26 : Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
948 akapila 1173 ECB :
1174 : /* transaction ID */
948 akapila 1175 GIC 26 : pq_sendint32(out, xid);
948 akapila 1176 CBC 26 : pq_sendint32(out, subxid);
1177 :
90 akapila 1178 GNC 26 : if (write_abort_info)
1179 : {
1180 12 : pq_sendint64(out, abort_lsn);
1181 12 : pq_sendint64(out, abort_time);
1182 : }
948 akapila 1183 GIC 26 : }
948 akapila 1184 ECB :
1185 : /*
1186 : * Read STREAM ABORT from the output stream.
1187 : *
1188 : * If read_abort_info is true, read the abort_lsn and abort_time fields,
1189 : * otherwise don't.
1190 : */
1191 : void
90 akapila 1192 GNC 38 : logicalrep_read_stream_abort(StringInfo in,
1193 : LogicalRepStreamAbortData *abort_data,
1194 : bool read_abort_info)
1195 : {
1196 38 : Assert(abort_data);
1197 :
1198 38 : abort_data->xid = pq_getmsgint(in, 4);
1199 38 : abort_data->subxid = pq_getmsgint(in, 4);
1200 :
1201 38 : if (read_abort_info)
1202 : {
1203 24 : abort_data->abort_lsn = pq_getmsgint64(in);
1204 24 : abort_data->abort_time = pq_getmsgint64(in);
1205 : }
1206 : else
1207 : {
1208 14 : abort_data->abort_lsn = InvalidXLogRecPtr;
1209 14 : abort_data->abort_time = 0;
1210 : }
948 akapila 1211 CBC 38 : }
1212 :
1213 : /*
1214 : * Get string representing LogicalRepMsgType.
590 akapila 1215 ECB : */
1216 : char *
590 akapila 1217 CBC 338 : logicalrep_message_type(LogicalRepMsgType action)
590 akapila 1218 ECB : {
590 akapila 1219 GIC 338 : switch (action)
590 akapila 1220 ECB : {
590 akapila 1221 GIC 1 : case LOGICAL_REP_MSG_BEGIN:
590 akapila 1222 CBC 1 : return "BEGIN";
1223 1 : case LOGICAL_REP_MSG_COMMIT:
590 akapila 1224 GIC 1 : return "COMMIT";
590 akapila 1225 UIC 0 : case LOGICAL_REP_MSG_ORIGIN:
1226 0 : return "ORIGIN";
590 akapila 1227 CBC 36 : case LOGICAL_REP_MSG_INSERT:
1228 36 : return "INSERT";
590 akapila 1229 GIC 9 : case LOGICAL_REP_MSG_UPDATE:
590 akapila 1230 CBC 9 : return "UPDATE";
590 akapila 1231 GIC 5 : case LOGICAL_REP_MSG_DELETE:
1232 5 : return "DELETE";
590 akapila 1233 UIC 0 : case LOGICAL_REP_MSG_TRUNCATE:
1234 0 : return "TRUNCATE";
590 akapila 1235 GIC 2 : case LOGICAL_REP_MSG_RELATION:
590 akapila 1236 CBC 2 : return "RELATION";
590 akapila 1237 UIC 0 : case LOGICAL_REP_MSG_TYPE:
590 akapila 1238 LBC 0 : return "TYPE";
590 akapila 1239 UIC 0 : case LOGICAL_REP_MSG_MESSAGE:
590 akapila 1240 LBC 0 : return "MESSAGE";
590 akapila 1241 CBC 1 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
1242 1 : return "BEGIN PREPARE";
1243 1 : case LOGICAL_REP_MSG_PREPARE:
590 akapila 1244 GBC 1 : return "PREPARE";
590 akapila 1245 UBC 0 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
590 akapila 1246 LBC 0 : return "COMMIT PREPARED";
1247 0 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
1248 0 : return "ROLLBACK PREPARED";
590 akapila 1249 CBC 13 : case LOGICAL_REP_MSG_STREAM_START:
1250 13 : return "STREAM START";
1251 228 : case LOGICAL_REP_MSG_STREAM_STOP:
590 akapila 1252 GBC 228 : return "STREAM STOP";
1253 20 : case LOGICAL_REP_MSG_STREAM_COMMIT:
590 akapila 1254 CBC 20 : return "STREAM COMMIT";
1255 19 : case LOGICAL_REP_MSG_STREAM_ABORT:
590 akapila 1256 GBC 19 : return "STREAM ABORT";
1257 2 : case LOGICAL_REP_MSG_STREAM_PREPARE:
1258 2 : return "STREAM PREPARE";
590 akapila 1259 EUB : }
590 akapila 1260 ECB :
590 akapila 1261 LBC 0 : elog(ERROR, "invalid logical replication message type \"%c\"", action);
590 akapila 1262 ECB :
1263 : return NULL; /* keep compiler quiet */
590 akapila 1264 EUB : }
|