Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * logicalfuncs.c
4 : *
5 : * Support functions for using logical decoding and management of
6 : * logical replication slots via SQL.
7 : *
8 : *
9 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
10 : *
11 : * IDENTIFICATION
12 : * src/backend/replication/logical/logicalfuncs.c
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <unistd.h>
19 :
20 : #include "access/xact.h"
21 : #include "access/xlog_internal.h"
22 : #include "access/xlogrecovery.h"
23 : #include "access/xlogutils.h"
24 : #include "catalog/pg_type.h"
25 : #include "fmgr.h"
26 : #include "funcapi.h"
27 : #include "mb/pg_wchar.h"
28 : #include "miscadmin.h"
29 : #include "nodes/makefuncs.h"
30 : #include "replication/decode.h"
31 : #include "replication/logical.h"
32 : #include "replication/message.h"
33 : #include "storage/fd.h"
34 : #include "utils/array.h"
35 : #include "utils/builtins.h"
36 : #include "utils/inval.h"
37 : #include "utils/lsyscache.h"
38 : #include "utils/memutils.h"
39 : #include "utils/pg_lsn.h"
40 : #include "utils/regproc.h"
41 : #include "utils/resowner.h"
42 :
43 : /* Private data for writing out data */
44 : typedef struct DecodingOutputState
45 : {
46 : Tuplestorestate *tupstore;
47 : TupleDesc tupdesc;
48 : bool binary_output;
49 : int64 returned_rows;
50 : } DecodingOutputState;
51 :
52 : /*
53 : * Prepare for an output plugin write.
54 : */
55 : static void
3324 rhaas 56 CBC 151095 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
57 : bool last_write)
58 : {
59 151095 : resetStringInfo(ctx->out);
60 151095 : }
61 :
62 : /*
63 : * Perform output plugin write into tuplestore.
64 : */
65 : static void
66 151095 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
67 : bool last_write)
68 : {
69 : Datum values[3];
70 : bool nulls[3];
71 : DecodingOutputState *p;
72 :
73 : /* SQL Datums can only be of a limited length... */
74 151095 : if (ctx->out->len > MaxAllocSize - VARHDRSZ)
3324 rhaas 75 UBC 0 : elog(ERROR, "too much output for sql interface");
76 :
3324 rhaas 77 CBC 151095 : p = (DecodingOutputState *) ctx->output_writer_private;
78 :
79 151095 : memset(nulls, 0, sizeof(nulls));
80 151095 : values[0] = LSNGetDatum(lsn);
81 151095 : values[1] = TransactionIdGetDatum(xid);
82 :
83 : /*
84 : * Assert ctx->out is in database encoding when we're writing textual
85 : * output.
86 : */
87 151095 : if (!p->binary_output)
88 151072 : Assert(pg_verify_mbstr(GetDatabaseEncoding(),
89 : ctx->out->data, ctx->out->len,
90 : false));
91 :
92 : /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
1165 alvherre 93 151095 : values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
94 :
3324 rhaas 95 151095 : tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
96 151095 : p->returned_rows++;
97 151095 : }
98 :
99 : /*
100 : * Helper function for the various SQL callable logical decoding functions.
101 : */
102 : static Datum
103 192 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
104 : {
105 : Name name;
106 : XLogRecPtr upto_lsn;
107 : int32 upto_nchanges;
108 192 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
109 : MemoryContext per_query_ctx;
110 : MemoryContext oldcontext;
111 : XLogRecPtr end_of_wal;
112 : LogicalDecodingContext *ctx;
113 192 : ResourceOwner old_resowner = CurrentResourceOwner;
114 : ArrayType *arr;
115 : Size ndim;
116 192 : List *options = NIL;
117 : DecodingOutputState *p;
118 :
572 michael 119 192 : CheckSlotPermissions();
120 :
2647 tgl 121 191 : CheckLogicalDecodingRequirements();
122 :
123 191 : if (PG_ARGISNULL(0))
2647 tgl 124 UBC 0 : ereport(ERROR,
125 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
126 : errmsg("slot name must not be null")));
2647 tgl 127 CBC 191 : name = PG_GETARG_NAME(0);
128 :
3324 rhaas 129 191 : if (PG_ARGISNULL(1))
130 191 : upto_lsn = InvalidXLogRecPtr;
131 : else
3324 rhaas 132 UBC 0 : upto_lsn = PG_GETARG_LSN(1);
133 :
3324 rhaas 134 CBC 191 : if (PG_ARGISNULL(2))
135 191 : upto_nchanges = InvalidXLogRecPtr;
136 : else
3324 rhaas 137 UBC 0 : upto_nchanges = PG_GETARG_INT32(2);
138 :
2647 tgl 139 CBC 191 : if (PG_ARGISNULL(3))
2647 tgl 140 UBC 0 : ereport(ERROR,
141 : (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
142 : errmsg("options array must not be null")));
2647 tgl 143 CBC 191 : arr = PG_GETARG_ARRAYTYPE_P(3);
144 :
145 : /* state to write output to */
3324 rhaas 146 191 : p = palloc0(sizeof(DecodingOutputState));
147 :
148 191 : p->binary_output = binary;
149 :
150 191 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
151 191 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
152 :
153 : /* Deconstruct options array */
2647 tgl 154 191 : ndim = ARR_NDIM(arr);
3324 rhaas 155 191 : if (ndim > 1)
156 : {
3324 rhaas 157 UBC 0 : ereport(ERROR,
158 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
159 : errmsg("array must be one-dimensional")));
160 : }
3324 rhaas 161 CBC 191 : else if (array_contains_nulls(arr))
162 : {
3324 rhaas 163 UBC 0 : ereport(ERROR,
164 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
165 : errmsg("array must not contain nulls")));
166 : }
3324 rhaas 167 CBC 191 : else if (ndim == 1)
168 : {
169 : int nelems;
170 : Datum *datum_opts;
171 : int i;
172 :
173 169 : Assert(ARR_ELEMTYPE(arr) == TEXTOID);
174 :
282 peter 175 GNC 169 : deconstruct_array_builtin(arr, TEXTOID, &datum_opts, NULL, &nelems);
3324 rhaas 176 ECB :
3324 rhaas 177 GBC 169 : if (nelems % 2 != 0)
3324 rhaas 178 UIC 0 : ereport(ERROR,
179 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
180 : errmsg("array must have even number of elements")));
3324 rhaas 181 ECB :
3324 rhaas 182 GIC 507 : for (i = 0; i < nelems; i += 2)
3324 rhaas 183 ECB : {
3324 rhaas 184 CBC 338 : char *name = TextDatumGetCString(datum_opts[i]);
3324 rhaas 185 GIC 338 : char *opt = TextDatumGetCString(datum_opts[i + 1]);
3324 rhaas 186 ECB :
2406 peter_e 187 GIC 338 : options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
188 : }
189 : }
3324 rhaas 190 ECB :
173 michael 191 CBC 191 : InitMaterializedSRF(fcinfo, 0);
398 192 191 : p->tupstore = rsinfo->setResult;
398 michael 193 GIC 191 : p->tupdesc = rsinfo->setDesc;
194 :
195 : /*
196 : * Compute the current end-of-wal.
2209 simon 197 ECB : */
2531 alvherre 198 CBC 191 : if (!RecoveryInProgress())
520 rhaas 199 GIC 187 : end_of_wal = GetFlushRecPtr(NULL);
2531 alvherre 200 ECB : else
520 rhaas 201 GIC 4 : end_of_wal = GetXLogReplayRecPtr(NULL);
2531 alvherre 202 ECB :
667 alvherre 203 GIC 191 : ReplicationSlotAcquire(NameStr(*name), true);
3324 rhaas 204 ECB :
3324 rhaas 205 GIC 189 : PG_TRY();
206 : {
2533 alvherre 207 ECB : /* restart at slot's confirmed_flush */
3324 rhaas 208 GIC 373 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
209 : options,
1908 simon 210 ECB : false,
699 tmunro 211 GIC 189 : XL_ROUTINE(.page_read = read_local_xlog_page,
212 : .segment_open = wal_segment_open,
213 : .segment_close = wal_segment_close),
214 : LogicalOutputPrepareWrite,
215 : LogicalOutputWrite, NULL);
3324 rhaas 216 ECB :
3324 rhaas 217 CBC 184 : MemoryContextSwitchTo(oldcontext);
218 :
219 : /*
220 : * Check whether the output plugin writes textual output if that's
221 : * what we need.
222 : */
3324 rhaas 223 GIC 184 : if (!binary &&
2878 bruce 224 CBC 176 : ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
3324 rhaas 225 GIC 1 : ereport(ERROR,
226 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2720 peter_e 227 ECB : errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
228 : NameStr(MyReplicationSlot->data.plugin),
229 : format_procedure(fcinfo->flinfo->fn_oid))));
3324 rhaas 230 :
3324 rhaas 231 GIC 183 : ctx->output_writer_private = p;
232 :
2566 alvherre 233 ECB : /*
234 : * Decoding of WAL must start at restart_lsn so that the entirety of
2533 235 : * xacts that committed after the slot's confirmed_flush can be
236 : * accumulated into reorder buffers.
2566 alvherre 237 EUB : */
1169 heikki.linnakangas 238 GIC 183 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
239 :
240 : /* invalidate non-timetravel entries */
3324 rhaas 241 183 : InvalidateSystemCaches();
242 :
2209 simon 243 ECB : /* Decode until we run out of records */
1169 heikki.linnakangas 244 CBC 1642232 : while (ctx->reader->EndRecPtr < end_of_wal)
245 : {
246 : XLogRecord *record;
3324 rhaas 247 1642049 : char *errm = NULL;
3324 rhaas 248 EUB :
699 tmunro 249 GBC 1642049 : record = XLogReadRecord(ctx->reader, &errm);
3324 rhaas 250 CBC 1642049 : if (errm)
515 michael 251 UBC 0 : elog(ERROR, "could not find record for logical decoding: %s", errm);
3324 rhaas 252 EUB :
3324 rhaas 253 ECB : /*
254 : * The {begin_txn,change,commit_txn}_wrapper callbacks above will
255 : * store the description into our tuplestore.
256 : */
3324 rhaas 257 GIC 1642049 : if (record != NULL)
3062 heikki.linnakangas 258 1642049 : LogicalDecodingProcessRecord(ctx, ctx->reader);
259 :
260 : /* check limits */
3324 rhaas 261 CBC 1642049 : if (upto_lsn != InvalidXLogRecPtr &&
3324 rhaas 262 UIC 0 : upto_lsn <= ctx->reader->EndRecPtr)
263 0 : break;
3324 rhaas 264 GIC 1642049 : if (upto_nchanges != 0 &&
3324 rhaas 265 UIC 0 : upto_nchanges <= p->returned_rows)
266 0 : break;
3206 andres 267 CBC 1642049 : CHECK_FOR_INTERRUPTS();
268 : }
2647 tgl 269 ECB :
270 : /*
271 : * Logical decoding could have clobbered CurrentResourceOwner during
272 : * transaction management, so restore the executor's value. (This is
273 : * a kluge, but it's not worth cleaning up right now.)
274 : */
2647 tgl 275 GIC 183 : CurrentResourceOwner = old_resowner;
276 :
277 : /*
278 : * Next time, start where we left off. (Hunting things, the family
279 : * business..)
280 : */
281 183 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
282 : {
283 166 : LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
2153 bruce 284 ECB :
285 : /*
286 : * If only the confirmed_flush_lsn has changed the slot won't get
287 : * marked as dirty by the above. Callers on the walsender
288 : * interface are expected to keep track of their own progress and
289 : * don't need it written out. But SQL-interface users cannot
290 : * specify their own start positions and it's harder for them to
291 : * keep track of their progress, so we should make more of an
292 : * effort to save it for them.
2407 simon 293 : *
294 : * Dirty the slot so it's written out at the next checkpoint.
295 : * We'll still lose its position on crash, as documented, but it's
2153 bruce 296 : * better than always losing the position even on clean restart.
297 : */
2407 simon 298 CBC 166 : ReplicationSlotMarkDirty();
299 : }
2647 tgl 300 ECB :
301 : /* free context, call shutdown callback */
2647 tgl 302 CBC 183 : FreeDecodingContext(ctx);
303 :
2647 tgl 304 GIC 183 : ReplicationSlotRelease();
305 183 : InvalidateSystemCaches();
306 : }
3324 rhaas 307 6 : PG_CATCH();
308 : {
3324 rhaas 309 ECB : /* clear all timetravel entries */
3324 rhaas 310 GIC 6 : InvalidateSystemCaches();
3324 rhaas 311 ECB :
3324 rhaas 312 GIC 6 : PG_RE_THROW();
313 : }
314 183 : PG_END_TRY();
315 :
316 183 : return (Datum) 0;
317 : }
3324 rhaas 318 ECB :
319 : /*
320 : * SQL function returning the changestream as text, consuming the data.
321 : */
322 : Datum
3324 rhaas 323 GIC 168 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
324 : {
2647 tgl 325 168 : return pg_logical_slot_get_changes_guts(fcinfo, true, false);
326 : }
3324 rhaas 327 ECB :
328 : /*
329 : * SQL function returning the changestream as text, only peeking ahead.
330 : */
331 : Datum
3324 rhaas 332 GIC 16 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
333 : {
2647 tgl 334 16 : return pg_logical_slot_get_changes_guts(fcinfo, false, false);
335 : }
3324 rhaas 336 ECB :
337 : /*
338 : * SQL function returning the changestream in binary, consuming the data.
339 : */
340 : Datum
3324 rhaas 341 GIC 4 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
342 : {
2647 tgl 343 4 : return pg_logical_slot_get_changes_guts(fcinfo, true, true);
344 : }
345 :
3324 rhaas 346 ECB : /*
347 : * SQL function returning the changestream in binary, only peeking ahead.
348 : */
349 : Datum
3324 rhaas 350 CBC 4 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
351 : {
2647 tgl 352 GIC 4 : return pg_logical_slot_get_changes_guts(fcinfo, false, true);
3324 rhaas 353 ECB : }
354 :
2559 simon 355 :
356 : /*
357 : * SQL function for writing logical decoding message into WAL.
358 : */
359 : Datum
2559 simon 360 GIC 22 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
361 : {
2559 simon 362 CBC 22 : bool transactional = PG_GETARG_BOOL(0);
2559 simon 363 GIC 22 : char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
364 22 : bytea *data = PG_GETARG_BYTEA_PP(2);
365 : XLogRecPtr lsn;
366 :
367 22 : lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
368 : transactional);
369 22 : PG_RETURN_LSN(lsn);
370 : }
371 :
372 : Datum
373 22 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
374 : {
375 : /* bytea and text are compatible */
376 22 : return pg_logical_emit_message_bytea(fcinfo);
377 : }
|