Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * message.c
4 : : * Generic logical messages.
5 : : *
6 : : * Copyright (c) 2013-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/message.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * Generic logical messages allow XLOG logging of arbitrary binary blobs that
14 : : * get passed to the logical decoding plugin. In normal XLOG processing they
15 : : * are same as NOOP.
16 : : *
17 : : * These messages can be either transactional or non-transactional.
18 : : * Transactional messages are part of current transaction and will be sent to
19 : : * decoding plugin using in a same way as DML operations.
20 : : * Non-transactional messages are sent to the plugin at the time when the
21 : : * logical decoding reads them from XLOG. This also means that transactional
22 : : * messages won't be delivered if the transaction was rolled back but the
23 : : * non-transactional one will always be delivered.
24 : : *
25 : : * Every message carries prefix to avoid conflicts between different decoding
26 : : * plugins. The plugin authors must take extra care to use unique prefix,
27 : : * good options seems to be for example to use the name of the extension.
28 : : *
29 : : * ---------------------------------------------------------------------------
30 : : */
31 : :
32 : : #include "postgres.h"
33 : :
34 : : #include "access/xact.h"
35 : : #include "access/xloginsert.h"
36 : : #include "miscadmin.h"
37 : : #include "replication/message.h"
38 : :
39 : : /*
40 : : * Write logical decoding message into XLog.
41 : : */
42 : : XLogRecPtr
2930 simon@2ndQuadrant.co 43 :CBC 109 : LogLogicalMessage(const char *prefix, const char *message, size_t size,
44 : : bool transactional, bool flush)
45 : : {
46 : : xl_logical_message xlrec;
47 : : XLogRecPtr lsn;
48 : :
49 : : /*
50 : : * Force xid to be allocated if we're emitting a transactional message.
51 : : */
52 [ + + ]: 109 : if (transactional)
53 : : {
54 [ - + ]: 65 : Assert(IsTransactionState());
55 : 65 : GetCurrentTransactionId();
56 : : }
57 : :
2923 andres@anarazel.de 58 : 109 : xlrec.dbId = MyDatabaseId;
2930 simon@2ndQuadrant.co 59 : 109 : xlrec.transactional = transactional;
60 : : /* trailing zero is critical; see logicalmsg_desc */
61 : 109 : xlrec.prefix_size = strlen(prefix) + 1;
62 : 109 : xlrec.message_size = size;
63 : :
64 : 109 : XLogBeginInsert();
65 : 109 : XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
1902 peter@eisentraut.org 66 : 109 : XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
67 : 109 : XLogRegisterData(unconstify(char *, message), size);
68 : :
69 : : /* allow origin filtering */
2670 andres@anarazel.de 70 : 109 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
71 : :
179 michael@paquier.xyz 72 :GNC 109 : lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
73 : :
74 : : /*
75 : : * Make sure that the message hits disk before leaving if emitting a
76 : : * non-transactional message when flush is requested.
77 : : */
78 [ + + + + ]: 109 : if (!transactional && flush)
79 : 1 : XLogFlush(lsn);
80 : 109 : return lsn;
81 : : }
82 : :
83 : : /*
84 : : * Redo is basically just noop for logical decoding messages.
85 : : */
86 : : void
2930 simon@2ndQuadrant.co 87 :CBC 84 : logicalmsg_redo(XLogReaderState *record)
88 : : {
89 : 84 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
90 : :
91 [ - + ]: 84 : if (info != XLOG_LOGICAL_MESSAGE)
2866 rhaas@postgresql.org 92 [ # # ]:UBC 0 : elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
93 : :
94 : : /* This is only interesting for logical decoding, see decode.c. */
2930 simon@2ndQuadrant.co 95 :CBC 84 : }
|