Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * logicalfuncs.c
4 : : *
5 : : * Support functions for using logical decoding and management of
6 : : * logical replication slots via SQL.
7 : : *
8 : : *
9 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
10 : : *
11 : : * IDENTIFICATION
12 : : * src/backend/replication/logical/logicalfuncs.c
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : #include "postgres.h"
17 : :
18 : : #include <unistd.h>
19 : :
20 : : #include "access/xlogrecovery.h"
21 : : #include "access/xlogutils.h"
22 : : #include "catalog/pg_type.h"
23 : : #include "fmgr.h"
24 : : #include "funcapi.h"
25 : : #include "mb/pg_wchar.h"
26 : : #include "miscadmin.h"
27 : : #include "nodes/makefuncs.h"
28 : : #include "replication/decode.h"
29 : : #include "replication/logical.h"
30 : : #include "replication/message.h"
31 : : #include "utils/array.h"
32 : : #include "utils/builtins.h"
33 : : #include "utils/inval.h"
34 : : #include "utils/memutils.h"
35 : : #include "utils/pg_lsn.h"
36 : : #include "utils/regproc.h"
37 : : #include "utils/resowner.h"
38 : :
39 : : /* Private data for writing out data */
40 : : typedef struct DecodingOutputState
41 : : {
42 : : Tuplestorestate *tupstore;
43 : : TupleDesc tupdesc;
44 : : bool binary_output;
45 : : int64 returned_rows;
46 : : } DecodingOutputState;
47 : :
48 : : /*
49 : : * Prepare for an output plugin write.
50 : : */
51 : : static void
3695 rhaas@postgresql.org 52 :CBC 151173 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
53 : : bool last_write)
54 : : {
55 : 151173 : resetStringInfo(ctx->out);
56 : 151173 : }
57 : :
58 : : /*
59 : : * Perform output plugin write into tuplestore.
60 : : */
61 : : static void
62 : 151173 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
63 : : bool last_write)
64 : : {
65 : : Datum values[3];
66 : : bool nulls[3];
67 : : DecodingOutputState *p;
68 : :
69 : : /* SQL Datums can only be of a limited length... */
70 [ - + ]: 151173 : if (ctx->out->len > MaxAllocSize - VARHDRSZ)
3695 rhaas@postgresql.org 71 [ # # ]:UBC 0 : elog(ERROR, "too much output for sql interface");
72 : :
3695 rhaas@postgresql.org 73 :CBC 151173 : p = (DecodingOutputState *) ctx->output_writer_private;
74 : :
75 : 151173 : memset(nulls, 0, sizeof(nulls));
76 : 151173 : values[0] = LSNGetDatum(lsn);
77 : 151173 : values[1] = TransactionIdGetDatum(xid);
78 : :
79 : : /*
80 : : * Assert ctx->out is in database encoding when we're writing textual
81 : : * output.
82 : : */
83 [ + + ]: 151173 : if (!p->binary_output)
84 [ - + ]: 151145 : Assert(pg_verify_mbstr(GetDatabaseEncoding(),
85 : : ctx->out->data, ctx->out->len,
86 : : false));
87 : :
88 : : /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
1536 alvherre@alvh.no-ip. 89 : 151173 : values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
90 : :
3695 rhaas@postgresql.org 91 : 151173 : tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
92 : 151173 : p->returned_rows++;
93 : 151173 : }
94 : :
95 : : /*
96 : : * Helper function for the various SQL callable logical decoding functions.
97 : : */
98 : : static Datum
99 : 197 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
100 : : {
101 : : Name name;
102 : : XLogRecPtr upto_lsn;
103 : : int32 upto_nchanges;
104 : 197 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
105 : : MemoryContext per_query_ctx;
106 : : MemoryContext oldcontext;
107 : : XLogRecPtr end_of_wal;
108 : : XLogRecPtr wait_for_wal_lsn;
109 : : LogicalDecodingContext *ctx;
110 : 197 : ResourceOwner old_resowner = CurrentResourceOwner;
111 : : ArrayType *arr;
112 : : Size ndim;
113 : 197 : List *options = NIL;
114 : : DecodingOutputState *p;
115 : :
943 michael@paquier.xyz 116 : 197 : CheckSlotPermissions();
117 : :
3018 tgl@sss.pgh.pa.us 118 : 196 : CheckLogicalDecodingRequirements();
119 : :
120 [ - + ]: 196 : if (PG_ARGISNULL(0))
3018 tgl@sss.pgh.pa.us 121 [ # # ]:UBC 0 : ereport(ERROR,
122 : : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
123 : : errmsg("slot name must not be null")));
3018 tgl@sss.pgh.pa.us 124 :CBC 196 : name = PG_GETARG_NAME(0);
125 : :
3695 rhaas@postgresql.org 126 [ + - ]: 196 : if (PG_ARGISNULL(1))
127 : 196 : upto_lsn = InvalidXLogRecPtr;
128 : : else
3695 rhaas@postgresql.org 129 :UBC 0 : upto_lsn = PG_GETARG_LSN(1);
130 : :
3695 rhaas@postgresql.org 131 [ + - ]:CBC 196 : if (PG_ARGISNULL(2))
132 : 196 : upto_nchanges = InvalidXLogRecPtr;
133 : : else
3695 rhaas@postgresql.org 134 :UBC 0 : upto_nchanges = PG_GETARG_INT32(2);
135 : :
3018 tgl@sss.pgh.pa.us 136 [ - + ]:CBC 196 : if (PG_ARGISNULL(3))
3018 tgl@sss.pgh.pa.us 137 [ # # ]:UBC 0 : ereport(ERROR,
138 : : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
139 : : errmsg("options array must not be null")));
3018 tgl@sss.pgh.pa.us 140 :CBC 196 : arr = PG_GETARG_ARRAYTYPE_P(3);
141 : :
142 : : /* state to write output to */
3695 rhaas@postgresql.org 143 : 196 : p = palloc0(sizeof(DecodingOutputState));
144 : :
145 : 196 : p->binary_output = binary;
146 : :
147 : 196 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
148 : 196 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
149 : :
150 : : /* Deconstruct options array */
3018 tgl@sss.pgh.pa.us 151 : 196 : ndim = ARR_NDIM(arr);
3695 rhaas@postgresql.org 152 [ - + ]: 196 : if (ndim > 1)
153 : : {
3695 rhaas@postgresql.org 154 [ # # ]:UBC 0 : ereport(ERROR,
155 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
156 : : errmsg("array must be one-dimensional")));
157 : : }
3695 rhaas@postgresql.org 158 [ - + ]:CBC 196 : else if (array_contains_nulls(arr))
159 : : {
3695 rhaas@postgresql.org 160 [ # # ]:UBC 0 : ereport(ERROR,
161 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
162 : : errmsg("array must not contain nulls")));
163 : : }
3695 rhaas@postgresql.org 164 [ + + ]:CBC 196 : else if (ndim == 1)
165 : : {
166 : : int nelems;
167 : : Datum *datum_opts;
168 : : int i;
169 : :
170 [ - + ]: 171 : Assert(ARR_ELEMTYPE(arr) == TEXTOID);
171 : :
653 peter@eisentraut.org 172 : 171 : deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
173 : :
3695 rhaas@postgresql.org 174 [ - + ]: 171 : if (nelems % 2 != 0)
3695 rhaas@postgresql.org 175 [ # # ]:UBC 0 : ereport(ERROR,
176 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
177 : : errmsg("array must have even number of elements")));
178 : :
3695 rhaas@postgresql.org 179 [ + + ]:CBC 515 : for (i = 0; i < nelems; i += 2)
180 : : {
227 michael@paquier.xyz 181 :GNC 344 : char *optname = TextDatumGetCString(datum_opts[i]);
3695 rhaas@postgresql.org 182 :CBC 344 : char *opt = TextDatumGetCString(datum_opts[i + 1]);
183 : :
227 michael@paquier.xyz 184 :GNC 344 : options = lappend(options, makeDefElem(optname, (Node *) makeString(opt), -1));
185 : : }
186 : : }
187 : :
544 michael@paquier.xyz 188 :CBC 196 : InitMaterializedSRF(fcinfo, 0);
769 189 : 196 : p->tupstore = rsinfo->setResult;
190 : 196 : p->tupdesc = rsinfo->setDesc;
191 : :
192 : : /*
193 : : * Compute the current end-of-wal.
194 : : */
2902 alvherre@alvh.no-ip. 195 [ + + ]: 196 : if (!RecoveryInProgress())
891 rhaas@postgresql.org 196 : 190 : end_of_wal = GetFlushRecPtr(NULL);
197 : : else
198 : 6 : end_of_wal = GetXLogReplayRecPtr(NULL);
199 : :
1038 alvherre@alvh.no-ip. 200 : 196 : ReplicationSlotAcquire(NameStr(*name), true);
201 : :
3695 rhaas@postgresql.org 202 [ + + ]: 195 : PG_TRY();
203 : : {
204 : : /* restart at slot's confirmed_flush */
205 : 383 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
206 : : options,
207 : : false,
1070 tmunro@postgresql.or 208 : 195 : XL_ROUTINE(.page_read = read_local_xlog_page,
209 : : .segment_open = wal_segment_open,
210 : : .segment_close = wal_segment_close),
211 : : LogicalOutputPrepareWrite,
212 : : LogicalOutputWrite, NULL);
213 : :
3695 rhaas@postgresql.org 214 : 188 : MemoryContextSwitchTo(oldcontext);
215 : :
216 : : /*
217 : : * Check whether the output plugin writes textual output if that's
218 : : * what we need.
219 : : */
220 [ + + ]: 188 : if (!binary &&
3249 bruce@momjian.us 221 [ + + ]: 178 : ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
3695 rhaas@postgresql.org 222 [ + - ]: 1 : ereport(ERROR,
223 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
224 : : errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
225 : : NameStr(MyReplicationSlot->data.plugin),
226 : : format_procedure(fcinfo->flinfo->fn_oid))));
227 : :
228 : : /*
229 : : * Wait for specified streaming replication standby servers (if any)
230 : : * to confirm receipt of WAL up to wait_for_wal_lsn.
231 : : */
37 akapila@postgresql.o 232 [ + - ]:GNC 187 : if (XLogRecPtrIsInvalid(upto_lsn))
233 : 187 : wait_for_wal_lsn = end_of_wal;
234 : : else
37 akapila@postgresql.o 235 :UNC 0 : wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
236 : :
37 akapila@postgresql.o 237 :GNC 187 : WaitForStandbyConfirmation(wait_for_wal_lsn);
238 : :
3695 rhaas@postgresql.org 239 :CBC 187 : ctx->output_writer_private = p;
240 : :
241 : : /*
242 : : * Decoding of WAL must start at restart_lsn so that the entirety of
243 : : * xacts that committed after the slot's confirmed_flush can be
244 : : * accumulated into reorder buffers.
245 : : */
1540 heikki.linnakangas@i 246 : 187 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
247 : :
248 : : /* invalidate non-timetravel entries */
3695 rhaas@postgresql.org 249 : 187 : InvalidateSystemCaches();
250 : :
251 : : /* Decode until we run out of records */
1540 heikki.linnakangas@i 252 [ + + ]: 1642942 : while (ctx->reader->EndRecPtr < end_of_wal)
253 : : {
254 : : XLogRecord *record;
3695 rhaas@postgresql.org 255 : 1642755 : char *errm = NULL;
256 : :
1070 tmunro@postgresql.or 257 : 1642755 : record = XLogReadRecord(ctx->reader, &errm);
3695 rhaas@postgresql.org 258 [ - + ]: 1642755 : if (errm)
886 michael@paquier.xyz 259 [ # # ]:UBC 0 : elog(ERROR, "could not find record for logical decoding: %s", errm);
260 : :
261 : : /*
262 : : * The {begin_txn,change,commit_txn}_wrapper callbacks above will
263 : : * store the description into our tuplestore.
264 : : */
3695 rhaas@postgresql.org 265 [ + - ]:CBC 1642755 : if (record != NULL)
3433 heikki.linnakangas@i 266 : 1642755 : LogicalDecodingProcessRecord(ctx, ctx->reader);
267 : :
268 : : /* check limits */
3695 rhaas@postgresql.org 269 [ - + ]: 1642755 : if (upto_lsn != InvalidXLogRecPtr &&
3695 rhaas@postgresql.org 270 [ # # ]:UBC 0 : upto_lsn <= ctx->reader->EndRecPtr)
271 : 0 : break;
3695 rhaas@postgresql.org 272 [ - + ]:CBC 1642755 : if (upto_nchanges != 0 &&
3695 rhaas@postgresql.org 273 [ # # ]:UBC 0 : upto_nchanges <= p->returned_rows)
274 : 0 : break;
3577 andres@anarazel.de 275 [ - + ]:CBC 1642755 : CHECK_FOR_INTERRUPTS();
276 : : }
277 : :
278 : : /*
279 : : * Logical decoding could have clobbered CurrentResourceOwner during
280 : : * transaction management, so restore the executor's value. (This is
281 : : * a kluge, but it's not worth cleaning up right now.)
282 : : */
3018 tgl@sss.pgh.pa.us 283 : 187 : CurrentResourceOwner = old_resowner;
284 : :
285 : : /*
286 : : * Next time, start where we left off. (Hunting things, the family
287 : : * business..)
288 : : */
289 [ + - + + ]: 187 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
290 : : {
291 : 168 : LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
292 : :
293 : : /*
294 : : * If only the confirmed_flush_lsn has changed the slot won't get
295 : : * marked as dirty by the above. Callers on the walsender
296 : : * interface are expected to keep track of their own progress and
297 : : * don't need it written out. But SQL-interface users cannot
298 : : * specify their own start positions and it's harder for them to
299 : : * keep track of their progress, so we should make more of an
300 : : * effort to save it for them.
301 : : *
302 : : * Dirty the slot so it's written out at the next checkpoint.
303 : : * We'll still lose its position on crash, as documented, but it's
304 : : * better than always losing the position even on clean restart.
305 : : */
2778 simon@2ndQuadrant.co 306 : 168 : ReplicationSlotMarkDirty();
307 : : }
308 : :
309 : : /* free context, call shutdown callback */
3018 tgl@sss.pgh.pa.us 310 : 187 : FreeDecodingContext(ctx);
311 : :
312 : 187 : ReplicationSlotRelease();
313 : 187 : InvalidateSystemCaches();
314 : : }
3695 rhaas@postgresql.org 315 : 8 : PG_CATCH();
316 : : {
317 : : /* clear all timetravel entries */
318 : 8 : InvalidateSystemCaches();
319 : :
320 : 8 : PG_RE_THROW();
321 : : }
322 [ - + ]: 187 : PG_END_TRY();
323 : :
324 : 187 : return (Datum) 0;
325 : : }
326 : :
327 : : /*
328 : : * SQL function returning the changestream as text, consuming the data.
329 : : */
330 : : Datum
331 : 171 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
332 : : {
3018 tgl@sss.pgh.pa.us 333 : 171 : return pg_logical_slot_get_changes_guts(fcinfo, true, false);
334 : : }
335 : :
336 : : /*
337 : : * SQL function returning the changestream as text, only peeking ahead.
338 : : */
339 : : Datum
3695 rhaas@postgresql.org 340 : 16 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
341 : : {
3018 tgl@sss.pgh.pa.us 342 : 16 : return pg_logical_slot_get_changes_guts(fcinfo, false, false);
343 : : }
344 : :
345 : : /*
346 : : * SQL function returning the changestream in binary, consuming the data.
347 : : */
348 : : Datum
3695 rhaas@postgresql.org 349 : 4 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
350 : : {
3018 tgl@sss.pgh.pa.us 351 : 4 : return pg_logical_slot_get_changes_guts(fcinfo, true, true);
352 : : }
353 : :
354 : : /*
355 : : * SQL function returning the changestream in binary, only peeking ahead.
356 : : */
357 : : Datum
3695 rhaas@postgresql.org 358 : 6 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
359 : : {
3018 tgl@sss.pgh.pa.us 360 : 6 : return pg_logical_slot_get_changes_guts(fcinfo, false, true);
361 : : }
362 : :
363 : :
364 : : /*
365 : : * SQL function for writing logical decoding message into WAL.
366 : : */
367 : : Datum
2930 simon@2ndQuadrant.co 368 : 109 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
369 : : {
370 : 109 : bool transactional = PG_GETARG_BOOL(0);
371 : 109 : char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
372 : 109 : bytea *data = PG_GETARG_BYTEA_PP(2);
179 michael@paquier.xyz 373 :GNC 109 : bool flush = PG_GETARG_BOOL(3);
374 : : XLogRecPtr lsn;
375 : :
2930 simon@2ndQuadrant.co 376 [ - + - - :CBC 109 : lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
- - - - -
+ - + ]
377 : : transactional, flush);
378 : 109 : PG_RETURN_LSN(lsn);
379 : : }
380 : :
381 : : Datum
382 : 109 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
383 : : {
384 : : /* bytea and text are compatible */
385 : 109 : return pg_logical_emit_message_bytea(fcinfo);
386 : : }
|