Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * message.c
4 : * Generic logical messages.
5 : *
6 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/message.c
10 : *
11 : * NOTES
12 : *
13 : * Generic logical messages allow XLOG logging of arbitrary binary blobs that
14 : * get passed to the logical decoding plugin. In normal XLOG processing they
15 : * are same as NOOP.
16 : *
17 : * These messages can be either transactional or non-transactional.
18 : * Transactional messages are part of current transaction and will be sent to
19 : * decoding plugin using in a same way as DML operations.
20 : * Non-transactional messages are sent to the plugin at the time when the
21 : * logical decoding reads them from XLOG. This also means that transactional
22 : * messages won't be delivered if the transaction was rolled back but the
23 : * non-transactional one will always be delivered.
24 : *
25 : * Every message carries prefix to avoid conflicts between different decoding
26 : * plugins. The plugin authors must take extra care to use unique prefix,
27 : * good options seems to be for example to use the name of the extension.
28 : *
29 : * ---------------------------------------------------------------------------
30 : */
31 :
32 : #include "postgres.h"
33 :
34 : #include "access/xact.h"
35 : #include "access/xloginsert.h"
36 : #include "miscadmin.h"
37 : #include "nodes/execnodes.h"
38 : #include "replication/logical.h"
39 : #include "replication/message.h"
40 : #include "utils/memutils.h"
41 :
42 : /*
43 : * Write logical decoding message into XLog.
44 : */
45 : XLogRecPtr
2559 simon 46 CBC 22 : LogLogicalMessage(const char *prefix, const char *message, size_t size,
47 : bool transactional)
48 : {
49 : xl_logical_message xlrec;
50 :
51 : /*
52 : * Force xid to be allocated if we're emitting a transactional message.
53 : */
54 22 : if (transactional)
55 : {
56 14 : Assert(IsTransactionState());
57 14 : GetCurrentTransactionId();
58 : }
59 :
2552 andres 60 22 : xlrec.dbId = MyDatabaseId;
2559 simon 61 22 : xlrec.transactional = transactional;
62 : /* trailing zero is critical; see logicalmsg_desc */
63 22 : xlrec.prefix_size = strlen(prefix) + 1;
64 22 : xlrec.message_size = size;
65 :
66 22 : XLogBeginInsert();
67 22 : XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
1531 peter 68 22 : XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
69 22 : XLogRegisterData(unconstify(char *, message), size);
70 :
71 : /* allow origin filtering */
2299 andres 72 22 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
73 :
2559 simon 74 22 : return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
75 : }
76 :
77 : /*
78 : * Redo is basically just noop for logical decoding messages.
79 : */
80 : void
81 1 : logicalmsg_redo(XLogReaderState *record)
82 : {
83 1 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
84 :
85 1 : if (info != XLOG_LOGICAL_MESSAGE)
2495 rhaas 86 UBC 0 : elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
87 :
88 : /* This is only interesting for logical decoding, see decode.c. */
2559 simon 89 CBC 1 : }
|