TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/dependency.h"
22 : #include "catalog/indexing.h"
23 : #include "catalog/namespace.h"
24 : #include "catalog/objectaccess.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/pg_authid_d.h"
27 : #include "catalog/pg_database_d.h"
28 : #include "catalog/pg_subscription.h"
29 : #include "catalog/pg_subscription_rel.h"
30 : #include "catalog/pg_type.h"
31 : #include "commands/dbcommands.h"
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/subscriptioncmds.h"
35 : #include "executor/executor.h"
36 : #include "miscadmin.h"
37 : #include "nodes/makefuncs.h"
38 : #include "pgstat.h"
39 : #include "replication/logicallauncher.h"
40 : #include "replication/logicalworker.h"
41 : #include "replication/origin.h"
42 : #include "replication/slot.h"
43 : #include "replication/walreceiver.h"
44 : #include "replication/walsender.h"
45 : #include "replication/worker_internal.h"
46 : #include "storage/lmgr.h"
47 : #include "utils/acl.h"
48 : #include "utils/builtins.h"
49 : #include "utils/guc.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/pg_lsn.h"
53 : #include "utils/syscache.h"
54 :
55 : /*
56 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
57 : * command.
58 : */
59 : #define SUBOPT_CONNECT 0x00000001
60 : #define SUBOPT_ENABLED 0x00000002
61 : #define SUBOPT_CREATE_SLOT 0x00000004
62 : #define SUBOPT_SLOT_NAME 0x00000008
63 : #define SUBOPT_COPY_DATA 0x00000010
64 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
65 : #define SUBOPT_REFRESH 0x00000040
66 : #define SUBOPT_BINARY 0x00000080
67 : #define SUBOPT_STREAMING 0x00000100
68 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
69 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
70 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
71 : #define SUBOPT_RUN_AS_OWNER 0x00001000
72 : #define SUBOPT_LSN 0x00002000
73 : #define SUBOPT_ORIGIN 0x00004000
74 :
75 : /* check if the 'val' has 'bits' set */
76 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
77 :
78 : /*
79 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
80 : * SUBSCRIPTION command options and the parsed/default values of each of them.
81 : */
82 : typedef struct SubOpts
83 : {
84 : bits32 specified_opts;
85 : char *slot_name;
86 : char *synchronous_commit;
87 : bool connect;
88 : bool enabled;
89 : bool create_slot;
90 : bool copy_data;
91 : bool refresh;
92 : bool binary;
93 : char streaming;
94 : bool twophase;
95 : bool disableonerr;
96 : bool passwordrequired;
97 : bool runasowner;
98 : char *origin;
99 : XLogRecPtr lsn;
100 : } SubOpts;
101 :
102 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
103 : static void check_publications_origin(WalReceiverConn *wrconn,
104 : List *publications, bool copydata,
105 : char *origin, Oid *subrel_local_oids,
106 : int subrel_count, char *subname);
107 : static void check_duplicates_in_publist(List *publist, Datum *datums);
108 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
109 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
110 :
111 :
112 : /*
113 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
114 : *
115 : * Since not all options can be specified in both commands, this function
116 : * will report an error if mutually exclusive options are specified.
117 : */
118 : static void
119 GIC 362 : parse_subscription_options(ParseState *pstate, List *stmt_options,
120 : bits32 supported_opts, SubOpts *opts)
121 : {
122 : ListCell *lc;
123 :
124 : /* Start out with cleared opts. */
125 362 : memset(opts, 0, sizeof(SubOpts));
126 :
127 : /* caller must expect some option */
128 362 : Assert(supported_opts != 0);
129 :
130 : /* If connect option is supported, these others also need to be. */
131 362 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
132 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
133 ECB : SUBOPT_COPY_DATA));
134 :
135 : /* Set default values for the supported options. */
136 GIC 362 : if (IsSet(supported_opts, SUBOPT_CONNECT))
137 177 : opts->connect = true;
138 362 : if (IsSet(supported_opts, SUBOPT_ENABLED))
139 CBC 198 : opts->enabled = true;
140 GIC 362 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
141 177 : opts->create_slot = true;
142 CBC 362 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
143 GIC 251 : opts->copy_data = true;
144 362 : if (IsSet(supported_opts, SUBOPT_REFRESH))
145 CBC 51 : opts->refresh = true;
146 GIC 362 : if (IsSet(supported_opts, SUBOPT_BINARY))
147 255 : opts->binary = false;
148 362 : if (IsSet(supported_opts, SUBOPT_STREAMING))
149 GNC 255 : opts->streaming = LOGICALREP_STREAM_OFF;
150 CBC 362 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
151 177 : opts->twophase = false;
152 362 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
153 255 : opts->disableonerr = false;
154 GNC 362 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
155 255 : opts->passwordrequired = true;
156 362 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
157 255 : opts->runasowner = false;
158 362 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
159 255 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
160 ECB :
161 : /* Parse options */
162 CBC 693 : foreach(lc, stmt_options)
163 ECB : {
164 CBC 361 : DefElem *defel = (DefElem *) lfirst(lc);
165 ECB :
166 CBC 361 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
167 213 : strcmp(defel->defname, "connect") == 0)
168 ECB : {
169 CBC 79 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
170 LBC 0 : errorConflictingDefElem(defel, pstate);
171 ECB :
172 CBC 79 : opts->specified_opts |= SUBOPT_CONNECT;
173 79 : opts->connect = defGetBoolean(defel);
174 ECB : }
175 CBC 282 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
176 155 : strcmp(defel->defname, "enabled") == 0)
177 ECB : {
178 CBC 33 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
179 LBC 0 : errorConflictingDefElem(defel, pstate);
180 :
181 GIC 33 : opts->specified_opts |= SUBOPT_ENABLED;
182 CBC 33 : opts->enabled = defGetBoolean(defel);
183 : }
184 249 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
185 GIC 122 : strcmp(defel->defname, "create_slot") == 0)
186 ECB : {
187 CBC 16 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
188 UIC 0 : errorConflictingDefElem(defel, pstate);
189 ECB :
190 GBC 16 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
191 GIC 16 : opts->create_slot = defGetBoolean(defel);
192 ECB : }
193 CBC 233 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
194 GIC 185 : strcmp(defel->defname, "slot_name") == 0)
195 ECB : {
196 CBC 62 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
197 UIC 0 : errorConflictingDefElem(defel, pstate);
198 ECB :
199 GBC 62 : opts->specified_opts |= SUBOPT_SLOT_NAME;
200 GIC 62 : opts->slot_name = defGetString(defel);
201 ECB :
202 : /* Setting slot_name = NONE is treated as no slot name. */
203 GIC 62 : if (strcmp(opts->slot_name, "none") == 0)
204 CBC 53 : opts->slot_name = NULL;
205 ECB : else
206 GIC 9 : ReplicationSlotValidateName(opts->slot_name, ERROR);
207 ECB : }
208 GBC 171 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
209 GIC 111 : strcmp(defel->defname, "copy_data") == 0)
210 ECB : {
211 CBC 15 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
212 UIC 0 : errorConflictingDefElem(defel, pstate);
213 ECB :
214 CBC 15 : opts->specified_opts |= SUBOPT_COPY_DATA;
215 GIC 15 : opts->copy_data = defGetBoolean(defel);
216 ECB : }
217 GBC 156 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
218 GIC 111 : strcmp(defel->defname, "synchronous_commit") == 0)
219 ECB : {
220 CBC 6 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
221 UIC 0 : errorConflictingDefElem(defel, pstate);
222 :
223 CBC 6 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
224 6 : opts->synchronous_commit = defGetString(defel);
225 :
226 ECB : /* Test if the given value is valid for synchronous_commit GUC. */
227 GIC 6 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
228 ECB : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
229 : false, 0, false);
230 : }
231 CBC 150 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
232 GBC 33 : strcmp(defel->defname, "refresh") == 0)
233 : {
234 CBC 33 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
235 LBC 0 : errorConflictingDefElem(defel, pstate);
236 :
237 CBC 33 : opts->specified_opts |= SUBOPT_REFRESH;
238 33 : opts->refresh = defGetBoolean(defel);
239 : }
240 117 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
241 GBC 105 : strcmp(defel->defname, "binary") == 0)
242 : {
243 CBC 16 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
244 LBC 0 : errorConflictingDefElem(defel, pstate);
245 :
246 GIC 16 : opts->specified_opts |= SUBOPT_BINARY;
247 CBC 16 : opts->binary = defGetBoolean(defel);
248 : }
249 GIC 101 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
250 89 : strcmp(defel->defname, "streaming") == 0)
251 ECB : {
252 CBC 31 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
253 UIC 0 : errorConflictingDefElem(defel, pstate);
254 ECB :
255 GBC 31 : opts->specified_opts |= SUBOPT_STREAMING;
256 GNC 31 : opts->streaming = defGetStreamingMode(defel);
257 ECB : }
258 CBC 70 : else if (strcmp(defel->defname, "two_phase") == 0)
259 : {
260 ECB : /*
261 : * Do not allow toggling of two_phase option. Doing so could cause
262 : * missing of transactions and lead to an inconsistent replica.
263 : * See comments atop worker.c
264 EUB : *
265 : * Note: Unsupported twophase indicates that this call originated
266 ECB : * from AlterSubscription.
267 : */
268 GIC 18 : if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
269 CBC 3 : ereport(ERROR,
270 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
271 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
272 :
273 GBC 15 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
274 UIC 0 : errorConflictingDefElem(defel, pstate);
275 ECB :
276 CBC 15 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
277 GIC 15 : opts->twophase = defGetBoolean(defel);
278 ECB : }
279 GIC 52 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
280 40 : strcmp(defel->defname, "disable_on_error") == 0)
281 : {
282 10 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
283 UIC 0 : errorConflictingDefElem(defel, pstate);
284 :
285 GIC 10 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
286 10 : opts->disableonerr = defGetBoolean(defel);
287 : }
288 GNC 42 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
289 30 : strcmp(defel->defname, "password_required") == 0)
290 : {
291 11 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
292 UNC 0 : errorConflictingDefElem(defel, pstate);
293 :
294 GNC 11 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
295 11 : opts->passwordrequired = defGetBoolean(defel);
296 : }
297 31 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
298 19 : strcmp(defel->defname, "run_as_owner") == 0)
299 : {
300 1 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
301 UNC 0 : errorConflictingDefElem(defel, pstate);
302 :
303 GNC 1 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
304 1 : opts->runasowner = defGetBoolean(defel);
305 : }
306 30 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
307 18 : strcmp(defel->defname, "origin") == 0)
308 : {
309 15 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
310 UNC 0 : errorConflictingDefElem(defel, pstate);
311 :
312 GNC 15 : opts->specified_opts |= SUBOPT_ORIGIN;
313 15 : pfree(opts->origin);
314 :
315 : /*
316 : * Even though the "origin" parameter allows only "none" and "any"
317 : * values, it is implemented as a string type so that the
318 : * parameter can be extended in future versions to support
319 : * filtering using origin names specified by the user.
320 : */
321 15 : opts->origin = defGetString(defel);
322 :
323 22 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
324 7 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
325 3 : ereport(ERROR,
326 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
327 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
328 : }
329 CBC 15 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
330 12 : strcmp(defel->defname, "lsn") == 0)
331 GIC 9 : {
332 12 : char *lsn_str = defGetString(defel);
333 : XLogRecPtr lsn;
334 ECB :
335 GBC 12 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
336 UIC 0 : errorConflictingDefElem(defel, pstate);
337 ECB :
338 : /* Setting lsn = NONE is treated as resetting LSN */
339 GIC 12 : if (strcmp(lsn_str, "none") == 0)
340 CBC 3 : lsn = InvalidXLogRecPtr;
341 ECB : else
342 : {
343 : /* Parse the argument as LSN */
344 GBC 9 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
345 : CStringGetDatum(lsn_str)));
346 ECB :
347 CBC 9 : if (XLogRecPtrIsInvalid(lsn))
348 GIC 3 : ereport(ERROR,
349 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
350 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
351 : }
352 :
353 GBC 9 : opts->specified_opts |= SUBOPT_LSN;
354 GIC 9 : opts->lsn = lsn;
355 ECB : }
356 : else
357 GIC 3 : ereport(ERROR,
358 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
359 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
360 : }
361 :
362 EUB : /*
363 : * We've been explicitly asked to not connect, that requires some
364 ECB : * additional processing.
365 : */
366 GIC 332 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
367 ECB : {
368 : /* Check for incompatible options from the user. */
369 GIC 64 : if (opts->enabled &&
370 CBC 64 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
371 GBC 3 : ereport(ERROR,
372 : (errcode(ERRCODE_SYNTAX_ERROR),
373 ECB : /*- translator: both %s are strings of the form "option = value" */
374 : errmsg("%s and %s are mutually exclusive options",
375 : "connect = false", "enabled = true")));
376 :
377 GIC 61 : if (opts->create_slot &&
378 58 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
379 3 : ereport(ERROR,
380 : (errcode(ERRCODE_SYNTAX_ERROR),
381 : errmsg("%s and %s are mutually exclusive options",
382 ECB : "connect = false", "create_slot = true")));
383 :
384 CBC 58 : if (opts->copy_data &&
385 55 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
386 3 : ereport(ERROR,
387 : (errcode(ERRCODE_SYNTAX_ERROR),
388 : errmsg("%s and %s are mutually exclusive options",
389 : "connect = false", "copy_data = true")));
390 ECB :
391 : /* Change the defaults of other options. */
392 CBC 55 : opts->enabled = false;
393 55 : opts->create_slot = false;
394 GIC 55 : opts->copy_data = false;
395 : }
396 ECB :
397 EUB : /*
398 : * Do additional checking for disallowed combination when slot_name = NONE
399 : * was used.
400 ECB : */
401 CBC 323 : if (!opts->slot_name &&
402 GIC 317 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
403 : {
404 50 : if (opts->enabled)
405 ECB : {
406 GIC 9 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
407 3 : ereport(ERROR,
408 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
409 : /*- translator: both %s are strings of the form "option = value" */
410 : errmsg("%s and %s are mutually exclusive options",
411 : "slot_name = NONE", "enabled = true")));
412 : else
413 GIC 6 : ereport(ERROR,
414 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
415 : /*- translator: both %s are strings of the form "option = value" */
416 : errmsg("subscription with %s must also set %s",
417 : "slot_name = NONE", "enabled = false")));
418 : }
419 :
420 GIC 41 : if (opts->create_slot)
421 : {
422 6 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
423 3 : ereport(ERROR,
424 : (errcode(ERRCODE_SYNTAX_ERROR),
425 : /*- translator: both %s are strings of the form "option = value" */
426 : errmsg("%s and %s are mutually exclusive options",
427 ECB : "slot_name = NONE", "create_slot = true")));
428 : else
429 GIC 3 : ereport(ERROR,
430 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
431 : /*- translator: both %s are strings of the form "option = value" */
432 : errmsg("subscription with %s must also set %s",
433 : "slot_name = NONE", "create_slot = false")));
434 : }
435 : }
436 GIC 308 : }
437 :
438 ECB : /*
439 : * Add publication names from the list to a string.
440 : */
441 : static void
442 GIC 208 : get_publications_str(List *publications, StringInfo dest, bool quote_literal)
443 : {
444 : ListCell *lc;
445 CBC 208 : bool first = true;
446 ECB :
447 GNC 208 : Assert(publications != NIL);
448 :
449 GIC 495 : foreach(lc, publications)
450 : {
451 287 : char *pubname = strVal(lfirst(lc));
452 :
453 CBC 287 : if (first)
454 208 : first = false;
455 ECB : else
456 GIC 79 : appendStringInfoString(dest, ", ");
457 :
458 287 : if (quote_literal)
459 281 : appendStringInfoString(dest, quote_literal_cstr(pubname));
460 : else
461 : {
462 CBC 6 : appendStringInfoChar(dest, '"');
463 6 : appendStringInfoString(dest, pubname);
464 GIC 6 : appendStringInfoChar(dest, '"');
465 ECB : }
466 : }
467 CBC 208 : }
468 ECB :
469 : /*
470 : * Check that the specified publications are present on the publisher.
471 : */
472 : static void
473 GIC 88 : check_publications(WalReceiverConn *wrconn, List *publications)
474 ECB : {
475 : WalRcvExecResult *res;
476 : StringInfo cmd;
477 : TupleTableSlot *slot;
478 GIC 88 : List *publicationsCopy = NIL;
479 88 : Oid tableRow[1] = {TEXTOID};
480 :
481 CBC 88 : cmd = makeStringInfo();
482 GIC 88 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
483 ECB : " pg_catalog.pg_publication t WHERE\n"
484 : " t.pubname IN (");
485 GIC 88 : get_publications_str(publications, cmd, true);
486 88 : appendStringInfoChar(cmd, ')');
487 :
488 88 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
489 88 : pfree(cmd->data);
490 CBC 88 : pfree(cmd);
491 :
492 GIC 88 : if (res->status != WALRCV_OK_TUPLES)
493 UIC 0 : ereport(ERROR,
494 : errmsg("could not receive list of publications from the publisher: %s",
495 : res->err));
496 :
497 CBC 88 : publicationsCopy = list_copy(publications);
498 :
499 : /* Process publication(s). */
500 GIC 88 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
501 202 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
502 : {
503 ECB : char *pubname;
504 : bool isnull;
505 :
506 CBC 114 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
507 GIC 114 : Assert(!isnull);
508 ECB :
509 : /* Delete the publication present in publisher from the list. */
510 CBC 114 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
511 GIC 114 : ExecClearTuple(slot);
512 ECB : }
513 :
514 CBC 88 : ExecDropSingleTupleTableSlot(slot);
515 ECB :
516 GIC 88 : walrcv_clear_result(res);
517 ECB :
518 GIC 88 : if (list_length(publicationsCopy))
519 ECB : {
520 : /* Prepare the list of non-existent publication(s) for error message. */
521 GIC 3 : StringInfo pubnames = makeStringInfo();
522 :
523 CBC 3 : get_publications_str(publicationsCopy, pubnames, false);
524 3 : ereport(WARNING,
525 ECB : errcode(ERRCODE_UNDEFINED_OBJECT),
526 : errmsg_plural("publication %s does not exist on the publisher",
527 : "publications %s do not exist on the publisher",
528 : list_length(publicationsCopy),
529 : pubnames->data));
530 : }
531 GIC 88 : }
532 :
533 : /*
534 ECB : * Auxiliary function to build a text array out of a list of String nodes.
535 : */
536 : static Datum
537 GIC 147 : publicationListToArray(List *publist)
538 : {
539 ECB : ArrayType *arr;
540 : Datum *datums;
541 : MemoryContext memcxt;
542 : MemoryContext oldcxt;
543 :
544 : /* Create memory context for temporary allocations. */
545 GIC 147 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
546 ECB : "publicationListToArray to array",
547 : ALLOCSET_DEFAULT_SIZES);
548 GIC 147 : oldcxt = MemoryContextSwitchTo(memcxt);
549 ECB :
550 CBC 147 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
551 ECB :
552 GIC 147 : check_duplicates_in_publist(publist, datums);
553 ECB :
554 GBC 144 : MemoryContextSwitchTo(oldcxt);
555 :
556 GNC 144 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
557 ECB :
558 GIC 144 : MemoryContextDelete(memcxt);
559 :
560 CBC 144 : return PointerGetDatum(arr);
561 ECB : }
562 :
563 : /*
564 : * Create new subscription.
565 : */
566 : ObjectAddress
567 CBC 177 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
568 : bool isTopLevel)
569 : {
570 ECB : Relation rel;
571 : ObjectAddress myself;
572 : Oid subid;
573 : bool nulls[Natts_pg_subscription];
574 : Datum values[Natts_pg_subscription];
575 GIC 177 : Oid owner = GetUserId();
576 ECB : HeapTuple tup;
577 : char *conninfo;
578 : char originname[NAMEDATALEN];
579 : List *publications;
580 : bits32 supported_opts;
581 CBC 177 : SubOpts opts = {0};
582 : AclResult aclresult;
583 :
584 ECB : /*
585 : * Parse and check options.
586 : *
587 : * Connection and publication should not be specified here.
588 : */
589 GIC 177 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
590 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
591 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
592 ECB : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594 : SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
595 GIC 177 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
596 :
597 : /*
598 : * Since creating a replication slot is not transactional, rolling back
599 ECB : * the transaction leaves the created replication slot. So we cannot run
600 : * CREATE SUBSCRIPTION inside a transaction block if creating a
601 : * replication slot.
602 : */
603 GIC 138 : if (opts.create_slot)
604 82 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
605 :
606 : /*
607 : * We don't want to allow unprivileged users to be able to trigger attempts
608 : * to access arbitrary network destinations, so require the user to have
609 : * been specifically authorized to create subscriptions.
610 : */
611 GNC 135 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
612 CBC 3 : ereport(ERROR,
613 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
614 : errmsg("must have privileges of pg_create_subscription to create subscriptions")));
615 :
616 : /*
617 : * Since a subscription is a database object, we also check for CREATE
618 : * permission on the database.
619 : */
620 GNC 132 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
621 : owner, ACL_CREATE);
622 132 : if (aclresult != ACLCHECK_OK)
623 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
624 3 : get_database_name(MyDatabaseId));
625 :
626 : /*
627 : * Non-superusers are required to set a password for authentication, and
628 : * that password must be used by the target server, but the superuser can
629 : * exempt a subscription from this requirement.
630 : */
631 129 : if (!opts.passwordrequired && !superuser_arg(owner))
632 3 : ereport(ERROR,
633 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
634 : errmsg("password_required=false is superuser-only"),
635 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
636 ECB :
637 : /*
638 : * If built with appropriate switch, whine when regression-testing
639 : * conventions for subscription names are violated.
640 : */
641 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
642 : if (strncmp(stmt->subname, "regress_", 8) != 0)
643 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
644 : #endif
645 :
646 CBC 126 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
647 :
648 ECB : /* Check if name is used */
649 GIC 126 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
650 : MyDatabaseId, CStringGetDatum(stmt->subname));
651 126 : if (OidIsValid(subid))
652 : {
653 3 : ereport(ERROR,
654 : (errcode(ERRCODE_DUPLICATE_OBJECT),
655 ECB : errmsg("subscription \"%s\" already exists",
656 : stmt->subname)));
657 : }
658 :
659 GIC 123 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
660 110 : opts.slot_name == NULL)
661 110 : opts.slot_name = stmt->subname;
662 :
663 ECB : /* The default for synchronous_commit of subscriptions is off. */
664 GIC 123 : if (opts.synchronous_commit == NULL)
665 123 : opts.synchronous_commit = "off";
666 :
667 123 : conninfo = stmt->conninfo;
668 123 : publications = stmt->publication;
669 ECB :
670 : /* Load the library providing us libpq calls. */
671 GIC 123 : load_file("libpqwalreceiver", false);
672 :
673 : /* Check the connection info string. */
674 GNC 123 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
675 :
676 : /* Everything ok, form a new tuple. */
677 CBC 114 : memset(values, 0, sizeof(values));
678 GIC 114 : memset(nulls, false, sizeof(nulls));
679 :
680 114 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
681 : Anum_pg_subscription_oid);
682 114 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
683 CBC 114 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
684 GIC 114 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
685 114 : values[Anum_pg_subscription_subname - 1] =
686 114 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
687 114 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
688 114 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
689 114 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
690 GNC 114 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
691 CBC 114 : values[Anum_pg_subscription_subtwophasestate - 1] =
692 114 : CharGetDatum(opts.twophase ?
693 : LOGICALREP_TWOPHASE_STATE_PENDING :
694 : LOGICALREP_TWOPHASE_STATE_DISABLED);
695 GIC 114 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
696 GNC 114 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
697 114 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
698 GIC 114 : values[Anum_pg_subscription_subconninfo - 1] =
699 114 : CStringGetTextDatum(conninfo);
700 114 : if (opts.slot_name)
701 CBC 104 : values[Anum_pg_subscription_subslotname - 1] =
702 104 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
703 : else
704 GIC 10 : nulls[Anum_pg_subscription_subslotname - 1] = true;
705 114 : values[Anum_pg_subscription_subsynccommit - 1] =
706 114 : CStringGetTextDatum(opts.synchronous_commit);
707 111 : values[Anum_pg_subscription_subpublications - 1] =
708 114 : publicationListToArray(publications);
709 GNC 111 : values[Anum_pg_subscription_suborigin - 1] =
710 111 : CStringGetTextDatum(opts.origin);
711 :
712 CBC 111 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
713 :
714 ECB : /* Insert tuple into catalog. */
715 CBC 111 : CatalogTupleInsert(rel, tup);
716 111 : heap_freetuple(tup);
717 :
718 GIC 111 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
719 :
720 GNC 111 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
721 GIC 111 : replorigin_create(originname);
722 :
723 ECB : /*
724 : * Connect to remote side to execute requested commands and fetch table
725 : * info.
726 : */
727 GIC 111 : if (opts.connect)
728 : {
729 : char *err;
730 : WalReceiverConn *wrconn;
731 : List *tables;
732 : ListCell *lc;
733 : char table_state;
734 : bool must_use_password;
735 :
736 : /* Try to connect to the publisher. */
737 GNC 74 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
738 74 : wrconn = walrcv_connect(conninfo, true, must_use_password,
739 : stmt->subname, &err);
740 GIC 74 : if (!wrconn)
741 CBC 3 : ereport(ERROR,
742 : (errcode(ERRCODE_CONNECTION_FAILURE),
743 : errmsg("could not connect to the publisher: %s", err)));
744 ECB :
745 GIC 71 : PG_TRY();
746 ECB : {
747 GIC 71 : check_publications(wrconn, publications);
748 GNC 71 : check_publications_origin(wrconn, publications, opts.copy_data,
749 : opts.origin, NULL, 0, stmt->subname);
750 ECB :
751 : /*
752 : * Set sync state based on if we were asked to do data copy or
753 : * not.
754 : */
755 GIC 71 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
756 ECB :
757 : /*
758 : * Get the table list from publisher and build local table status
759 : * info.
760 : */
761 CBC 71 : tables = fetch_table_list(wrconn, publications);
762 198 : foreach(lc, tables)
763 : {
764 128 : RangeVar *rv = (RangeVar *) lfirst(lc);
765 ECB : Oid relid;
766 :
767 GIC 128 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
768 ECB :
769 : /* Check for supported relkind. */
770 GIC 128 : CheckSubscriptionRelkind(get_rel_relkind(relid),
771 CBC 128 : rv->schemaname, rv->relname);
772 :
773 GIC 128 : AddSubscriptionRelState(subid, relid, table_state,
774 ECB : InvalidXLogRecPtr);
775 : }
776 :
777 : /*
778 : * If requested, create permanent slot for the subscription. We
779 : * won't use the initial snapshot for anything, so no need to
780 : * export it.
781 : */
782 CBC 70 : if (opts.create_slot)
783 ECB : {
784 CBC 69 : bool twophase_enabled = false;
785 ECB :
786 CBC 69 : Assert(opts.slot_name);
787 ECB :
788 : /*
789 : * Even if two_phase is set, don't create the slot with
790 : * two-phase enabled. Will enable it once all the tables are
791 : * synced and ready. This avoids race-conditions like prepared
792 : * transactions being skipped due to changes not being applied
793 : * due to checks in should_apply_changes_for_rel() when
794 : * tablesync for the corresponding tables are in progress. See
795 : * comments atop worker.c.
796 : *
797 : * Note that if tables were specified but copy_data is false
798 : * then it is safe to enable two_phase up-front because those
799 : * tables are already initially in READY state. When the
800 : * subscription has no tables, we leave the twophase state as
801 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
802 : * PUBLICATION to work.
803 : */
804 CBC 69 : if (opts.twophase && !opts.copy_data && tables != NIL)
805 1 : twophase_enabled = true;
806 ECB :
807 CBC 69 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
808 : CRS_NOEXPORT_SNAPSHOT, NULL);
809 ECB :
810 GIC 69 : if (twophase_enabled)
811 1 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
812 ECB :
813 CBC 69 : ereport(NOTICE,
814 : (errmsg("created replication slot \"%s\" on publisher",
815 ECB : opts.slot_name)));
816 : }
817 : }
818 CBC 1 : PG_FINALLY();
819 : {
820 GIC 71 : walrcv_disconnect(wrconn);
821 : }
822 71 : PG_END_TRY();
823 : }
824 ECB : else
825 GIC 37 : ereport(WARNING,
826 : (errmsg("subscription was created, but is not connected"),
827 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
828 :
829 107 : table_close(rel, RowExclusiveLock);
830 :
831 107 : pgstat_create_subscription(subid);
832 :
833 CBC 107 : if (opts.enabled)
834 70 : ApplyLauncherWakeupAtCommit();
835 :
836 107 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
837 ECB :
838 GIC 107 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
839 :
840 107 : return myself;
841 ECB : }
842 :
843 : static void
844 CBC 38 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
845 : List *validate_publications)
846 : {
847 : char *err;
848 : List *pubrel_names;
849 : List *subrel_states;
850 : Oid *subrel_local_oids;
851 ECB : Oid *pubrel_local_oids;
852 : ListCell *lc;
853 : int off;
854 : int remove_rel_len;
855 : int subrel_count;
856 GIC 38 : Relation rel = NULL;
857 : typedef struct SubRemoveRels
858 ECB : {
859 : Oid relid;
860 : char state;
861 : } SubRemoveRels;
862 : SubRemoveRels *sub_remove_rels;
863 : WalReceiverConn *wrconn;
864 : bool must_use_password;
865 :
866 : /* Load the library providing us libpq calls. */
867 GIC 38 : load_file("libpqwalreceiver", false);
868 ECB :
869 : /* Try to connect to the publisher. */
870 GNC 38 : must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
871 38 : wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
872 : sub->name, &err);
873 CBC 38 : if (!wrconn)
874 UIC 0 : ereport(ERROR,
875 : (errcode(ERRCODE_CONNECTION_FAILURE),
876 : errmsg("could not connect to the publisher: %s", err)));
877 :
878 GIC 38 : PG_TRY();
879 : {
880 38 : if (validate_publications)
881 17 : check_publications(wrconn, validate_publications);
882 ECB :
883 : /* Get the table list from publisher. */
884 CBC 38 : pubrel_names = fetch_table_list(wrconn, sub->publications);
885 :
886 ECB : /* Get local table list. */
887 GNC 38 : subrel_states = GetSubscriptionRelations(sub->oid, false);
888 38 : subrel_count = list_length(subrel_states);
889 :
890 : /*
891 : * Build qsorted array of local table oids for faster lookup. This can
892 : * potentially contain all tables in the database so speed of lookup
893 : * is important.
894 : */
895 38 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
896 GIC 38 : off = 0;
897 131 : foreach(lc, subrel_states)
898 : {
899 93 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
900 :
901 93 : subrel_local_oids[off++] = relstate->relid;
902 : }
903 GNC 38 : qsort(subrel_local_oids, subrel_count,
904 : sizeof(Oid), oid_cmp);
905 ECB :
906 GNC 38 : check_publications_origin(wrconn, sub->publications, copy_data,
907 : sub->origin, subrel_local_oids,
908 : subrel_count, sub->name);
909 :
910 ECB : /*
911 : * Rels that we want to remove from subscription and drop any slots
912 : * and origins corresponding to them.
913 : */
914 GNC 38 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
915 ECB :
916 : /*
917 : * Walk over the remote tables and try to match them to locally known
918 : * tables. If the table is not known locally create a new state for
919 : * it.
920 : *
921 : * Also builds array of local oids of remote tables for the next step.
922 : */
923 CBC 38 : off = 0;
924 GIC 38 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
925 ECB :
926 GIC 130 : foreach(lc, pubrel_names)
927 ECB : {
928 GIC 92 : RangeVar *rv = (RangeVar *) lfirst(lc);
929 : Oid relid;
930 ECB :
931 GIC 92 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
932 :
933 : /* Check for supported relkind. */
934 CBC 92 : CheckSubscriptionRelkind(get_rel_relkind(relid),
935 GIC 92 : rv->schemaname, rv->relname);
936 ECB :
937 GIC 92 : pubrel_local_oids[off++] = relid;
938 ECB :
939 CBC 92 : if (!bsearch(&relid, subrel_local_oids,
940 : subrel_count, sizeof(Oid), oid_cmp))
941 ECB : {
942 GIC 29 : AddSubscriptionRelState(sub->oid, relid,
943 ECB : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
944 : InvalidXLogRecPtr);
945 CBC 29 : ereport(DEBUG1,
946 : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
947 : rv->schemaname, rv->relname, sub->name)));
948 : }
949 ECB : }
950 :
951 : /*
952 : * Next remove state for tables we should not care about anymore using
953 : * the data we collected above
954 : */
955 GIC 38 : qsort(pubrel_local_oids, list_length(pubrel_names),
956 : sizeof(Oid), oid_cmp);
957 :
958 38 : remove_rel_len = 0;
959 GNC 131 : for (off = 0; off < subrel_count; off++)
960 : {
961 CBC 93 : Oid relid = subrel_local_oids[off];
962 :
963 GIC 93 : if (!bsearch(&relid, pubrel_local_oids,
964 93 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
965 : {
966 : char state;
967 : XLogRecPtr statelsn;
968 :
969 : /*
970 : * Lock pg_subscription_rel with AccessExclusiveLock to
971 : * prevent any race conditions with the apply worker
972 ECB : * re-launching workers at the same time this code is trying
973 : * to remove those tables.
974 : *
975 : * Even if new worker for this particular rel is restarted it
976 : * won't be able to make any progress as we hold exclusive
977 : * lock on subscription_rel till the transaction end. It will
978 : * simply exit as there is no corresponding rel entry.
979 EUB : *
980 : * This locking also ensures that the state of rels won't
981 : * change till we are done with this refresh operation.
982 : */
983 CBC 30 : if (!rel)
984 GIC 15 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
985 ECB :
986 : /* Last known rel state. */
987 GIC 30 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
988 :
989 CBC 30 : sub_remove_rels[remove_rel_len].relid = relid;
990 GIC 30 : sub_remove_rels[remove_rel_len++].state = state;
991 :
992 CBC 30 : RemoveSubscriptionRel(sub->oid, relid);
993 ECB :
994 GIC 30 : logicalrep_worker_stop(sub->oid, relid);
995 :
996 : /*
997 : * For READY state, we would have already dropped the
998 : * tablesync origin.
999 : */
1000 CBC 30 : if (state != SUBREL_STATE_READY)
1001 ECB : {
1002 : char originname[NAMEDATALEN];
1003 :
1004 : /*
1005 : * Drop the tablesync's origin tracking if exists.
1006 : *
1007 : * It is possible that the origin is not yet created for
1008 : * tablesync worker, this can happen for the states before
1009 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1010 : * apply worker can also concurrently try to drop the
1011 : * origin and by this time the origin might be already
1012 : * removed. For these reasons, passing missing_ok = true.
1013 : */
1014 UNC 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1015 : sizeof(originname));
1016 UIC 0 : replorigin_drop_by_name(originname, true, false);
1017 : }
1018 :
1019 CBC 30 : ereport(DEBUG1,
1020 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1021 : get_namespace_name(get_rel_namespace(relid)),
1022 : get_rel_name(relid),
1023 : sub->name)));
1024 : }
1025 : }
1026 :
1027 : /*
1028 ECB : * Drop the tablesync slots associated with removed tables. This has
1029 : * to be at the end because otherwise if there is an error while doing
1030 : * the database operations we won't be able to rollback dropped slots.
1031 : */
1032 GIC 68 : for (off = 0; off < remove_rel_len; off++)
1033 ECB : {
1034 GIC 30 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
1035 UIC 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
1036 ECB : {
1037 UIC 0 : char syncslotname[NAMEDATALEN] = {0};
1038 :
1039 ECB : /*
1040 : * For READY/SYNCDONE states we know the tablesync slot has
1041 : * already been dropped by the tablesync worker.
1042 : *
1043 : * For other states, there is no certainty, maybe the slot
1044 : * does not exist yet. Also, if we fail after removing some of
1045 : * the slots, next time, it will again try to drop already
1046 : * dropped slots and fail. For these reasons, we allow
1047 : * missing_ok = true for the drop.
1048 : */
1049 UIC 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
1050 ECB : syncslotname, sizeof(syncslotname));
1051 UIC 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1052 : }
1053 : }
1054 : }
1055 0 : PG_FINALLY();
1056 : {
1057 GIC 38 : walrcv_disconnect(wrconn);
1058 : }
1059 38 : PG_END_TRY();
1060 ECB :
1061 GIC 38 : if (rel)
1062 15 : table_close(rel, NoLock);
1063 CBC 38 : }
1064 ECB :
1065 : /*
1066 : * Alter the existing subscription.
1067 : */
1068 : ObjectAddress
1069 CBC 198 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1070 : bool isTopLevel)
1071 : {
1072 : Relation rel;
1073 : ObjectAddress myself;
1074 : bool nulls[Natts_pg_subscription];
1075 : bool replaces[Natts_pg_subscription];
1076 : Datum values[Natts_pg_subscription];
1077 : HeapTuple tup;
1078 : Oid subid;
1079 GIC 198 : bool update_tuple = false;
1080 : Subscription *sub;
1081 : Form_pg_subscription form;
1082 : bits32 supported_opts;
1083 198 : SubOpts opts = {0};
1084 :
1085 198 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1086 :
1087 : /* Fetch the existing tuple. */
1088 CBC 198 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1089 ECB : CStringGetDatum(stmt->subname));
1090 :
1091 GIC 198 : if (!HeapTupleIsValid(tup))
1092 CBC 3 : ereport(ERROR,
1093 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1094 ECB : errmsg("subscription \"%s\" does not exist",
1095 : stmt->subname)));
1096 :
1097 CBC 195 : form = (Form_pg_subscription) GETSTRUCT(tup);
1098 GIC 195 : subid = form->oid;
1099 ECB :
1100 : /* must be owner */
1101 GNC 195 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1102 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1103 0 : stmt->subname);
1104 :
1105 CBC 195 : sub = GetSubscription(subid, false);
1106 :
1107 : /*
1108 : * Don't allow non-superuser modification of a subscription with
1109 : * password_required=false.
1110 : */
1111 GNC 195 : if (!sub->passwordrequired && !superuser())
1112 UNC 0 : ereport(ERROR,
1113 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1114 : errmsg("password_required=false is superuser-only"),
1115 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1116 :
1117 : /* Lock the subscription so nobody else can do anything with it. */
1118 GIC 195 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1119 :
1120 : /* Form a new tuple. */
1121 195 : memset(values, 0, sizeof(values));
1122 195 : memset(nulls, false, sizeof(nulls));
1123 195 : memset(replaces, false, sizeof(replaces));
1124 :
1125 195 : switch (stmt->kind)
1126 : {
1127 78 : case ALTER_SUBSCRIPTION_OPTIONS:
1128 : {
1129 GBC 78 : supported_opts = (SUBOPT_SLOT_NAME |
1130 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1131 : SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
1132 : SUBOPT_PASSWORD_REQUIRED |
1133 : SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
1134 :
1135 GIC 78 : parse_subscription_options(pstate, stmt->options,
1136 ECB : supported_opts, &opts);
1137 :
1138 GIC 66 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1139 : {
1140 : /*
1141 : * The subscription must be disabled to allow slot_name as
1142 : * 'none', otherwise, the apply worker will repeatedly try
1143 : * to stream the data using that slot_name which neither
1144 : * exists on the publisher nor the user will be allowed to
1145 : * create it.
1146 : */
1147 28 : if (sub->enabled && !opts.slot_name)
1148 UIC 0 : ereport(ERROR,
1149 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1150 : errmsg("cannot set %s for enabled subscription",
1151 : "slot_name = NONE")));
1152 EUB :
1153 GIC 28 : if (opts.slot_name)
1154 GBC 3 : values[Anum_pg_subscription_subslotname - 1] =
1155 GIC 3 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1156 : else
1157 25 : nulls[Anum_pg_subscription_subslotname - 1] = true;
1158 28 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1159 : }
1160 :
1161 66 : if (opts.synchronous_commit)
1162 : {
1163 3 : values[Anum_pg_subscription_subsynccommit - 1] =
1164 3 : CStringGetTextDatum(opts.synchronous_commit);
1165 3 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1166 EUB : }
1167 :
1168 GBC 66 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1169 : {
1170 GIC 9 : values[Anum_pg_subscription_subbinary - 1] =
1171 9 : BoolGetDatum(opts.binary);
1172 GBC 9 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1173 : }
1174 ECB :
1175 GIC 66 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1176 ECB : {
1177 GIC 15 : values[Anum_pg_subscription_substream - 1] =
1178 GNC 15 : CharGetDatum(opts.streaming);
1179 CBC 15 : replaces[Anum_pg_subscription_substream - 1] = true;
1180 ECB : }
1181 :
1182 GIC 66 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1183 : {
1184 : values[Anum_pg_subscription_subdisableonerr - 1]
1185 3 : = BoolGetDatum(opts.disableonerr);
1186 ECB : replaces[Anum_pg_subscription_subdisableonerr - 1]
1187 GIC 3 : = true;
1188 : }
1189 :
1190 GNC 66 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1191 : {
1192 : /* Non-superuser may not disable password_required. */
1193 6 : if (!opts.passwordrequired && !superuser())
1194 UNC 0 : ereport(ERROR,
1195 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1196 : errmsg("password_required=false is superuser-only"),
1197 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1198 :
1199 : values[Anum_pg_subscription_subpasswordrequired - 1]
1200 GNC 6 : = BoolGetDatum(opts.passwordrequired);
1201 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1202 6 : = true;
1203 : }
1204 :
1205 66 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1206 : {
1207 3 : values[Anum_pg_subscription_suborigin - 1] =
1208 3 : CStringGetTextDatum(opts.origin);
1209 3 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1210 : }
1211 :
1212 GIC 66 : update_tuple = true;
1213 66 : break;
1214 : }
1215 :
1216 21 : case ALTER_SUBSCRIPTION_ENABLED:
1217 : {
1218 CBC 21 : parse_subscription_options(pstate, stmt->options,
1219 : SUBOPT_ENABLED, &opts);
1220 GIC 21 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1221 :
1222 CBC 21 : if (!sub->slotname && opts.enabled)
1223 GIC 3 : ereport(ERROR,
1224 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1225 : errmsg("cannot enable subscription that does not have a slot name")));
1226 :
1227 CBC 18 : values[Anum_pg_subscription_subenabled - 1] =
1228 GIC 18 : BoolGetDatum(opts.enabled);
1229 18 : replaces[Anum_pg_subscription_subenabled - 1] = true;
1230 ECB :
1231 CBC 18 : if (opts.enabled)
1232 GIC 10 : ApplyLauncherWakeupAtCommit();
1233 :
1234 18 : update_tuple = true;
1235 18 : break;
1236 ECB : }
1237 :
1238 GIC 7 : case ALTER_SUBSCRIPTION_CONNECTION:
1239 : /* Load the library providing us libpq calls. */
1240 CBC 7 : load_file("libpqwalreceiver", false);
1241 EUB : /* Check the connection info string. */
1242 GNC 7 : walrcv_check_conninfo(stmt->conninfo,
1243 : sub->passwordrequired && !superuser_arg(sub->owner));
1244 :
1245 CBC 4 : values[Anum_pg_subscription_subconninfo - 1] =
1246 GIC 4 : CStringGetTextDatum(stmt->conninfo);
1247 4 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1248 4 : update_tuple = true;
1249 4 : break;
1250 :
1251 CBC 24 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
1252 EUB : {
1253 GIC 24 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1254 24 : parse_subscription_options(pstate, stmt->options,
1255 : supported_opts, &opts);
1256 :
1257 24 : values[Anum_pg_subscription_subpublications - 1] =
1258 CBC 24 : publicationListToArray(stmt->publication);
1259 GIC 24 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1260 :
1261 CBC 24 : update_tuple = true;
1262 ECB :
1263 : /* Refresh if user asked us to. */
1264 GIC 24 : if (opts.refresh)
1265 ECB : {
1266 GIC 21 : if (!sub->enabled)
1267 LBC 0 : ereport(ERROR,
1268 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1269 ECB : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1270 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1271 :
1272 : /*
1273 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1274 : * not allowed.
1275 : */
1276 GIC 21 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1277 UIC 0 : ereport(ERROR,
1278 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1279 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1280 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1281 :
1282 GIC 21 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1283 :
1284 : /* Make sure refresh sees the new list of publications. */
1285 15 : sub->publications = stmt->publication;
1286 :
1287 CBC 15 : AlterSubscription_refresh(sub, opts.copy_data,
1288 EUB : stmt->publication);
1289 : }
1290 :
1291 GIC 18 : break;
1292 : }
1293 ECB :
1294 CBC 27 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
1295 ECB : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1296 : {
1297 : List *publist;
1298 CBC 27 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1299 :
1300 GIC 27 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1301 CBC 27 : parse_subscription_options(pstate, stmt->options,
1302 : supported_opts, &opts);
1303 ECB :
1304 CBC 27 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1305 9 : values[Anum_pg_subscription_subpublications - 1] =
1306 GIC 9 : publicationListToArray(publist);
1307 9 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1308 ECB :
1309 GIC 9 : update_tuple = true;
1310 ECB :
1311 : /* Refresh if user asked us to. */
1312 CBC 9 : if (opts.refresh)
1313 : {
1314 : /* We only need to validate user specified publications. */
1315 3 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1316 :
1317 3 : if (!sub->enabled)
1318 LBC 0 : ereport(ERROR,
1319 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1320 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1321 : /* translator: %s is an SQL ALTER command */
1322 : errhint("Use %s instead.",
1323 : isadd ?
1324 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1325 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1326 :
1327 : /*
1328 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1329 : * not allowed.
1330 : */
1331 GIC 3 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1332 UIC 0 : ereport(ERROR,
1333 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1334 EUB : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1335 : /* translator: %s is an SQL ALTER command */
1336 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1337 : isadd ?
1338 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1339 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1340 ECB :
1341 GIC 3 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1342 ECB :
1343 : /* Refresh the new list of publications. */
1344 GIC 3 : sub->publications = publist;
1345 ECB :
1346 GIC 3 : AlterSubscription_refresh(sub, opts.copy_data,
1347 ECB : validate_publications);
1348 : }
1349 :
1350 GIC 9 : break;
1351 : }
1352 ECB :
1353 CBC 26 : case ALTER_SUBSCRIPTION_REFRESH:
1354 : {
1355 GIC 26 : if (!sub->enabled)
1356 CBC 3 : ereport(ERROR,
1357 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1358 ECB : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1359 :
1360 CBC 23 : parse_subscription_options(pstate, stmt->options,
1361 : SUBOPT_COPY_DATA, &opts);
1362 ECB :
1363 : /*
1364 : * The subscription option "two_phase" requires that
1365 : * replication has passed the initial table synchronization
1366 : * phase before the two_phase becomes properly enabled.
1367 : *
1368 : * But, having reached this two-phase commit "enabled" state
1369 : * we must not allow any subsequent table initialization to
1370 : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1371 : * when the user had requested two_phase = on mode.
1372 : *
1373 : * The exception to this restriction is when copy_data =
1374 : * false, because when copy_data is false the tablesync will
1375 : * start already in READY state and will exit directly without
1376 : * doing anything.
1377 : *
1378 : * For more details see comments atop worker.c.
1379 : */
1380 CBC 23 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1381 UIC 0 : ereport(ERROR,
1382 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
1383 : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1384 : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1385 :
1386 CBC 23 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1387 ECB :
1388 CBC 20 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
1389 ECB :
1390 GIC 20 : break;
1391 ECB : }
1392 :
1393 CBC 12 : case ALTER_SUBSCRIPTION_SKIP:
1394 ECB : {
1395 GIC 12 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1396 :
1397 ECB : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1398 CBC 9 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1399 ECB :
1400 : /*
1401 : * If the user sets subskiplsn, we do a sanity check to make
1402 EUB : * sure that the specified LSN is a probable value.
1403 : */
1404 GIC 9 : if (!XLogRecPtrIsInvalid(opts.lsn))
1405 : {
1406 : RepOriginId originid;
1407 : char originname[NAMEDATALEN];
1408 : XLogRecPtr remote_lsn;
1409 :
1410 GNC 6 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1411 : originname, sizeof(originname));
1412 CBC 6 : originid = replorigin_by_name(originname, false);
1413 GBC 6 : remote_lsn = replorigin_get_progress(originid, false);
1414 :
1415 : /* Check the given LSN is at least a future LSN */
1416 GIC 6 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1417 UIC 0 : ereport(ERROR,
1418 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1419 : errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1420 : LSN_FORMAT_ARGS(opts.lsn),
1421 : LSN_FORMAT_ARGS(remote_lsn))));
1422 : }
1423 :
1424 GIC 9 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1425 9 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1426 :
1427 CBC 9 : update_tuple = true;
1428 GIC 9 : break;
1429 : }
1430 ECB :
1431 UIC 0 : default:
1432 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1433 : stmt->kind);
1434 ECB : }
1435 :
1436 : /* Update the catalog if needed. */
1437 CBC 144 : if (update_tuple)
1438 : {
1439 GIC 124 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1440 ECB : replaces);
1441 :
1442 CBC 124 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1443 ECB :
1444 GIC 124 : heap_freetuple(tup);
1445 ECB : }
1446 :
1447 GIC 144 : table_close(rel, RowExclusiveLock);
1448 ECB :
1449 GIC 144 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1450 :
1451 CBC 144 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1452 :
1453 : /* Wake up related replication workers to handle this change quickly. */
1454 GNC 144 : LogicalRepWorkersWakeupAtCommit(subid);
1455 :
1456 CBC 144 : return myself;
1457 EUB : }
1458 :
1459 : /*
1460 : * Drop a subscription
1461 : */
1462 : void
1463 GIC 78 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1464 : {
1465 : Relation rel;
1466 : ObjectAddress myself;
1467 : HeapTuple tup;
1468 : Oid subid;
1469 : Oid subowner;
1470 : Datum datum;
1471 ECB : bool isnull;
1472 EUB : char *subname;
1473 : char *conninfo;
1474 : char *slotname;
1475 : List *subworkers;
1476 : ListCell *lc;
1477 : char originname[NAMEDATALEN];
1478 GIC 78 : char *err = NULL;
1479 : WalReceiverConn *wrconn;
1480 : Form_pg_subscription form;
1481 ECB : List *rstates;
1482 : bool must_use_password;
1483 :
1484 : /*
1485 : * Lock pg_subscription with AccessExclusiveLock to ensure that the
1486 : * launcher doesn't restart new worker during dropping the subscription
1487 : */
1488 GIC 78 : rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
1489 :
1490 78 : tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
1491 CBC 78 : CStringGetDatum(stmt->subname));
1492 :
1493 GIC 78 : if (!HeapTupleIsValid(tup))
1494 ECB : {
1495 GIC 6 : table_close(rel, NoLock);
1496 ECB :
1497 CBC 6 : if (!stmt->missing_ok)
1498 GIC 3 : ereport(ERROR,
1499 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1500 : errmsg("subscription \"%s\" does not exist",
1501 ECB : stmt->subname)));
1502 : else
1503 GIC 3 : ereport(NOTICE,
1504 : (errmsg("subscription \"%s\" does not exist, skipping",
1505 : stmt->subname)));
1506 :
1507 38 : return;
1508 : }
1509 :
1510 72 : form = (Form_pg_subscription) GETSTRUCT(tup);
1511 72 : subid = form->oid;
1512 GNC 72 : subowner = form->subowner;
1513 72 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1514 :
1515 : /* must be owner */
1516 72 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1517 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1518 0 : stmt->subname);
1519 :
1520 : /* DROP hook for the subscription being removed */
1521 GIC 72 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1522 :
1523 ECB : /*
1524 EUB : * Lock the subscription so nobody else can do anything with it (including
1525 : * the replication workers).
1526 : */
1527 GIC 72 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1528 :
1529 ECB : /* Get subname */
1530 GNC 72 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1531 : Anum_pg_subscription_subname);
1532 CBC 72 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1533 :
1534 : /* Get conninfo */
1535 GNC 72 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1536 : Anum_pg_subscription_subconninfo);
1537 GIC 72 : conninfo = TextDatumGetCString(datum);
1538 :
1539 ECB : /* Get slotname */
1540 GIC 72 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1541 : Anum_pg_subscription_subslotname, &isnull);
1542 72 : if (!isnull)
1543 37 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1544 : else
1545 CBC 35 : slotname = NULL;
1546 :
1547 : /*
1548 : * Since dropping a replication slot is not transactional, the replication
1549 : * slot stays dropped even if the transaction rolls back. So we cannot
1550 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
1551 ECB : * replication slot. Also, in this case, we report a message for dropping
1552 : * the subscription to the cumulative stats system.
1553 : *
1554 : * XXX The command name should really be something like "DROP SUBSCRIPTION
1555 : * of a subscription that is associated with a replication slot", but we
1556 : * don't have the proper facilities for that.
1557 : */
1558 GBC 72 : if (slotname)
1559 GIC 37 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1560 :
1561 69 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1562 69 : EventTriggerSQLDropAddObject(&myself, true, true);
1563 :
1564 : /* Remove the tuple from catalog. */
1565 CBC 69 : CatalogTupleDelete(rel, &tup->t_self);
1566 ECB :
1567 GIC 69 : ReleaseSysCache(tup);
1568 ECB :
1569 : /*
1570 : * Stop all the subscription workers immediately.
1571 : *
1572 EUB : * This is necessary if we are dropping the replication slot, so that the
1573 : * slot becomes accessible.
1574 : *
1575 : * It is also necessary if the subscription is disabled and was disabled
1576 : * in the same transaction. Then the workers haven't seen the disabling
1577 : * yet and will still be running, leading to hangs later when we want to
1578 ECB : * drop the replication origin. If the subscription was disabled before
1579 : * this transaction, then there shouldn't be any workers left, so this
1580 : * won't make a difference.
1581 : *
1582 : * New workers won't be started because we hold an exclusive lock on the
1583 : * subscription till the end of the transaction.
1584 : */
1585 CBC 69 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1586 GIC 69 : subworkers = logicalrep_workers_find(subid, false);
1587 69 : LWLockRelease(LogicalRepWorkerLock);
1588 CBC 106 : foreach(lc, subworkers)
1589 : {
1590 37 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1591 :
1592 37 : logicalrep_worker_stop(w->subid, w->relid);
1593 : }
1594 GIC 69 : list_free(subworkers);
1595 ECB :
1596 : /*
1597 : * Remove the no-longer-useful entry in the launcher's table of apply
1598 : * worker start times.
1599 : *
1600 : * If this transaction rolls back, the launcher might restart a failed
1601 : * apply worker before wal_retrieve_retry_interval milliseconds have
1602 : * elapsed, but that's pretty harmless.
1603 : */
1604 GNC 69 : ApplyLauncherForgetWorkerStartTime(subid);
1605 :
1606 : /*
1607 ECB : * Cleanup of tablesync replication origins.
1608 : *
1609 : * Any READY-state relations would already have dealt with clean-ups.
1610 : *
1611 : * Note that the state can't change because we have already stopped both
1612 : * the apply and tablesync workers and they can't restart because of
1613 : * exclusive lock on the subscription.
1614 : */
1615 GNC 69 : rstates = GetSubscriptionRelations(subid, true);
1616 GIC 70 : foreach(lc, rstates)
1617 : {
1618 1 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1619 1 : Oid relid = rstate->relid;
1620 :
1621 : /* Only cleanup resources of tablesync workers */
1622 1 : if (!OidIsValid(relid))
1623 UIC 0 : continue;
1624 :
1625 : /*
1626 : * Drop the tablesync's origin tracking if exists.
1627 : *
1628 : * It is possible that the origin is not yet created for tablesync
1629 ECB : * worker so passing missing_ok = true. This can happen for the states
1630 : * before SUBREL_STATE_FINISHEDCOPY.
1631 : */
1632 GNC 1 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
1633 : sizeof(originname));
1634 GIC 1 : replorigin_drop_by_name(originname, true, false);
1635 : }
1636 :
1637 : /* Clean up dependencies */
1638 69 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1639 ECB :
1640 : /* Remove any associated relation synchronization states. */
1641 CBC 69 : RemoveSubscriptionRel(subid, InvalidOid);
1642 ECB :
1643 : /* Remove the origin tracking if exists. */
1644 GNC 69 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1645 GIC 69 : replorigin_drop_by_name(originname, true, false);
1646 ECB :
1647 : /*
1648 : * If there is no slot associated with the subscription, we can finish
1649 : * here.
1650 : */
1651 GIC 69 : if (!slotname && rstates == NIL)
1652 : {
1653 35 : table_close(rel, NoLock);
1654 CBC 35 : return;
1655 : }
1656 :
1657 : /*
1658 ECB : * Try to acquire the connection necessary for dropping slots.
1659 : *
1660 : * Note: If the slotname is NONE/NULL then we allow the command to finish
1661 : * and users need to manually cleanup the apply and tablesync worker slots
1662 : * later.
1663 : *
1664 : * This has to be at the end because otherwise if there is an error while
1665 : * doing the database operations we won't be able to rollback dropped
1666 : * slot.
1667 : */
1668 GBC 34 : load_file("libpqwalreceiver", false);
1669 EUB :
1670 GNC 34 : wrconn = walrcv_connect(conninfo, true, must_use_password,
1671 : subname, &err);
1672 GIC 34 : if (wrconn == NULL)
1673 ECB : {
1674 UIC 0 : if (!slotname)
1675 : {
1676 : /* be tidy */
1677 0 : list_free(rstates);
1678 0 : table_close(rel, NoLock);
1679 LBC 0 : return;
1680 : }
1681 : else
1682 ECB : {
1683 UIC 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
1684 ECB : }
1685 : }
1686 :
1687 CBC 34 : PG_TRY();
1688 : {
1689 35 : foreach(lc, rstates)
1690 : {
1691 GIC 1 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1692 CBC 1 : Oid relid = rstate->relid;
1693 :
1694 ECB : /* Only cleanup resources of tablesync workers */
1695 CBC 1 : if (!OidIsValid(relid))
1696 UIC 0 : continue;
1697 ECB :
1698 : /*
1699 : * Drop the tablesync slots associated with removed tables.
1700 : *
1701 : * For SYNCDONE/READY states, the tablesync slot is known to have
1702 : * already been dropped by the tablesync worker.
1703 : *
1704 : * For other states, there is no certainty, maybe the slot does
1705 : * not exist yet. Also, if we fail after removing some of the
1706 : * slots, next time, it will again try to drop already dropped
1707 : * slots and fail. For these reasons, we allow missing_ok = true
1708 : * for the drop.
1709 : */
1710 CBC 1 : if (rstate->state != SUBREL_STATE_SYNCDONE)
1711 ECB : {
1712 GIC 1 : char syncslotname[NAMEDATALEN] = {0};
1713 ECB :
1714 CBC 1 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
1715 : sizeof(syncslotname));
1716 GIC 1 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1717 ECB : }
1718 : }
1719 :
1720 GIC 34 : list_free(rstates);
1721 :
1722 : /*
1723 : * If there is a slot associated with the subscription, then drop the
1724 : * replication slot at the publisher.
1725 : */
1726 34 : if (slotname)
1727 34 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
1728 : }
1729 UIC 0 : PG_FINALLY();
1730 : {
1731 GIC 34 : walrcv_disconnect(wrconn);
1732 : }
1733 34 : PG_END_TRY();
1734 :
1735 : /*
1736 : * Tell the cumulative stats system that the subscription is getting
1737 ECB : * dropped.
1738 : */
1739 CBC 34 : pgstat_drop_subscription(subid);
1740 ECB :
1741 GIC 34 : table_close(rel, NoLock);
1742 ECB : }
1743 :
1744 : /*
1745 : * Drop the replication slot at the publisher node using the replication
1746 : * connection.
1747 : *
1748 : * missing_ok - if true then only issue a LOG message if the slot doesn't
1749 : * exist.
1750 : */
1751 : void
1752 GIC 189 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
1753 : {
1754 : StringInfoData cmd;
1755 :
1756 CBC 189 : Assert(wrconn);
1757 :
1758 GIC 189 : load_file("libpqwalreceiver", false);
1759 :
1760 189 : initStringInfo(&cmd);
1761 189 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1762 :
1763 189 : PG_TRY();
1764 : {
1765 : WalRcvExecResult *res;
1766 :
1767 CBC 189 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1768 ECB :
1769 GIC 189 : if (res->status == WALRCV_OK_COMMAND)
1770 ECB : {
1771 : /* NOTICE. Success. */
1772 GIC 189 : ereport(NOTICE,
1773 : (errmsg("dropped replication slot \"%s\" on publisher",
1774 ECB : slotname)));
1775 EUB : }
1776 UIC 0 : else if (res->status == WALRCV_ERROR &&
1777 0 : missing_ok &&
1778 0 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1779 : {
1780 : /* LOG. Error, but missing_ok = true. */
1781 0 : ereport(LOG,
1782 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
1783 : slotname, res->err)));
1784 ECB : }
1785 : else
1786 : {
1787 : /* ERROR. */
1788 UIC 0 : ereport(ERROR,
1789 : (errcode(ERRCODE_CONNECTION_FAILURE),
1790 ECB : errmsg("could not drop replication slot \"%s\" on publisher: %s",
1791 : slotname, res->err)));
1792 : }
1793 :
1794 GIC 189 : walrcv_clear_result(res);
1795 : }
1796 LBC 0 : PG_FINALLY();
1797 ECB : {
1798 GIC 189 : pfree(cmd.data);
1799 : }
1800 189 : PG_END_TRY();
1801 189 : }
1802 :
1803 ECB : /*
1804 : * Internal workhorse for changing a subscription owner
1805 : */
1806 : static void
1807 GIC 6 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1808 : {
1809 : Form_pg_subscription form;
1810 : AclResult aclresult;
1811 :
1812 6 : form = (Form_pg_subscription) GETSTRUCT(tup);
1813 :
1814 6 : if (form->subowner == newOwnerId)
1815 UIC 0 : return;
1816 :
1817 GNC 6 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
1818 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
1819 0 : NameStr(form->subname));
1820 :
1821 : /*
1822 : * Don't allow non-superuser modification of a subscription with
1823 : * password_required=false.
1824 : */
1825 GNC 6 : if (!form->subpasswordrequired && !superuser())
1826 LBC 0 : ereport(ERROR,
1827 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1828 : errmsg("password_required=false is superuser-only"),
1829 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1830 :
1831 : /* Must be able to become new owner */
1832 GNC 6 : check_can_set_role(GetUserId(), newOwnerId);
1833 :
1834 : /*
1835 : * current owner must have CREATE on database
1836 : *
1837 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
1838 : * other object types behave differently (e.g. you can't give a table to
1839 : * a user who lacks CREATE privileges on a schema).
1840 : */
1841 3 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
1842 : GetUserId(), ACL_CREATE);
1843 3 : if (aclresult != ACLCHECK_OK)
1844 UNC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
1845 0 : get_database_name(MyDatabaseId));
1846 :
1847 GIC 3 : form->subowner = newOwnerId;
1848 GBC 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1849 EUB :
1850 : /* Update owner dependency reference */
1851 GIC 3 : changeDependencyOnOwner(SubscriptionRelationId,
1852 : form->oid,
1853 : newOwnerId);
1854 EUB :
1855 GIC 3 : InvokeObjectPostAlterHook(SubscriptionRelationId,
1856 : form->oid, 0);
1857 :
1858 : /* Wake up related background processes to handle this change quickly. */
1859 CBC 3 : ApplyLauncherWakeupAtCommit();
1860 GNC 3 : LogicalRepWorkersWakeupAtCommit(form->oid);
1861 : }
1862 ECB :
1863 : /*
1864 : * Change subscription owner -- by name
1865 : */
1866 : ObjectAddress
1867 GIC 6 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1868 ECB : {
1869 EUB : Oid subid;
1870 : HeapTuple tup;
1871 : Relation rel;
1872 : ObjectAddress address;
1873 : Form_pg_subscription form;
1874 :
1875 GIC 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1876 :
1877 6 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1878 : CStringGetDatum(name));
1879 :
1880 6 : if (!HeapTupleIsValid(tup))
1881 UIC 0 : ereport(ERROR,
1882 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1883 ECB : errmsg("subscription \"%s\" does not exist", name)));
1884 :
1885 CBC 6 : form = (Form_pg_subscription) GETSTRUCT(tup);
1886 GIC 6 : subid = form->oid;
1887 ECB :
1888 GIC 6 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1889 ECB :
1890 GIC 3 : ObjectAddressSet(address, SubscriptionRelationId, subid);
1891 :
1892 3 : heap_freetuple(tup);
1893 ECB :
1894 GIC 3 : table_close(rel, RowExclusiveLock);
1895 :
1896 3 : return address;
1897 : }
1898 :
1899 ECB : /*
1900 : * Change subscription owner -- by OID
1901 : */
1902 EUB : void
1903 UIC 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1904 ECB : {
1905 : HeapTuple tup;
1906 : Relation rel;
1907 :
1908 UIC 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1909 :
1910 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1911 :
1912 LBC 0 : if (!HeapTupleIsValid(tup))
1913 UIC 0 : ereport(ERROR,
1914 ECB : (errcode(ERRCODE_UNDEFINED_OBJECT),
1915 : errmsg("subscription with OID %u does not exist", subid)));
1916 :
1917 UIC 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1918 :
1919 0 : heap_freetuple(tup);
1920 :
1921 0 : table_close(rel, RowExclusiveLock);
1922 0 : }
1923 :
1924 : /*
1925 : * Check and log a warning if the publisher has subscribed to the same table
1926 : * from some other publisher. This check is required only if "copy_data = true"
1927 : * and "origin = none" for CREATE SUBSCRIPTION and
1928 : * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
1929 : * having origin might have been copied.
1930 : *
1931 : * This check need not be performed on the tables that are already added
1932 : * because incremental sync for those tables will happen through WAL and the
1933 : * origin of the data can be identified from the WAL records.
1934 : *
1935 : * subrel_local_oids contains the list of relation oids that are already
1936 : * present on the subscriber.
1937 : */
1938 : static void
1939 GNC 109 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
1940 : bool copydata, char *origin, Oid *subrel_local_oids,
1941 : int subrel_count, char *subname)
1942 : {
1943 : WalRcvExecResult *res;
1944 : StringInfoData cmd;
1945 : TupleTableSlot *slot;
1946 109 : Oid tableRow[1] = {TEXTOID};
1947 109 : List *publist = NIL;
1948 : int i;
1949 :
1950 211 : if (!copydata || !origin ||
1951 102 : (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
1952 103 : return;
1953 :
1954 6 : initStringInfo(&cmd);
1955 6 : appendStringInfoString(&cmd,
1956 : "SELECT DISTINCT P.pubname AS pubname\n"
1957 : "FROM pg_publication P,\n"
1958 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1959 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1960 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1961 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
1962 6 : get_publications_str(publications, &cmd, true);
1963 6 : appendStringInfoString(&cmd, ")\n");
1964 :
1965 : /*
1966 : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
1967 : * the list of relation oids that are already present on the subscriber.
1968 : * This check should be skipped for these tables.
1969 : */
1970 9 : for (i = 0; i < subrel_count; i++)
1971 : {
1972 3 : Oid relid = subrel_local_oids[i];
1973 3 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
1974 3 : char *tablename = get_rel_name(relid);
1975 :
1976 3 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
1977 : schemaname, tablename);
1978 : }
1979 :
1980 6 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
1981 6 : pfree(cmd.data);
1982 :
1983 6 : if (res->status != WALRCV_OK_TUPLES)
1984 UNC 0 : ereport(ERROR,
1985 : (errcode(ERRCODE_CONNECTION_FAILURE),
1986 : errmsg("could not receive list of replicated tables from the publisher: %s",
1987 : res->err)));
1988 :
1989 : /* Process tables. */
1990 GNC 6 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1991 8 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1992 : {
1993 : char *pubname;
1994 : bool isnull;
1995 :
1996 2 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1997 2 : Assert(!isnull);
1998 :
1999 2 : ExecClearTuple(slot);
2000 2 : publist = list_append_unique(publist, makeString(pubname));
2001 : }
2002 :
2003 : /*
2004 : * Log a warning if the publisher has subscribed to the same table from
2005 : * some other publisher. We cannot know the origin of data during the
2006 : * initial sync. Data origins can be found only from the WAL by looking at
2007 : * the origin id.
2008 : *
2009 : * XXX: For simplicity, we don't check whether the table has any data or
2010 : * not. If the table doesn't have any data then we don't need to
2011 : * distinguish between data having origin and data not having origin so we
2012 : * can avoid logging a warning in that case.
2013 : */
2014 6 : if (publist)
2015 : {
2016 2 : StringInfo pubnames = makeStringInfo();
2017 :
2018 : /* Prepare the list of publication(s) for warning message. */
2019 2 : get_publications_str(publist, pubnames, false);
2020 2 : ereport(WARNING,
2021 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2022 : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2023 : subname),
2024 : errdetail_plural("Subscribed publication %s is subscribing to other publications.",
2025 : "Subscribed publications %s are subscribing to other publications.",
2026 : list_length(publist), pubnames->data),
2027 : errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2028 : }
2029 :
2030 6 : ExecDropSingleTupleTableSlot(slot);
2031 :
2032 6 : walrcv_clear_result(res);
2033 : }
2034 :
2035 : /*
2036 ECB : * Get the list of tables which belong to specified publications on the
2037 : * publisher connection.
2038 : *
2039 : * Note that we don't support the case where the column list is different for
2040 : * the same table in different publications to avoid sending unwanted column
2041 : * information for some of the rows. This can happen when both the column
2042 : * list and row filter are specified for different publications.
2043 : */
2044 : static List *
2045 CBC 109 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2046 : {
2047 ECB : WalRcvExecResult *res;
2048 : StringInfoData cmd;
2049 : TupleTableSlot *slot;
2050 GNC 109 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
2051 CBC 109 : List *tablelist = NIL;
2052 GNC 109 : int server_version = walrcv_server_version(wrconn);
2053 109 : bool check_columnlist = (server_version >= 150000);
2054 ECB :
2055 GIC 109 : initStringInfo(&cmd);
2056 ECB :
2057 : /* Get the list of tables from the publisher. */
2058 GNC 109 : if (server_version >= 160000)
2059 : {
2060 : StringInfoData pub_names;
2061 EUB :
2062 GNC 109 : tableRow[2] = INT2VECTOROID;
2063 109 : initStringInfo(&pub_names);
2064 109 : get_publications_str(publications, &pub_names, true);
2065 :
2066 : /*
2067 : * From version 16, we allowed passing multiple publications to the
2068 : * function pg_get_publication_tables. This helped to filter out the
2069 : * partition table whose ancestor is also published in this
2070 : * publication array.
2071 : *
2072 : * Join pg_get_publication_tables with pg_publication to exclude
2073 : * non-existing publications.
2074 : *
2075 : * Note that attrs are always stored in sorted order so we don't need
2076 : * to worry if different publications have specified them in a
2077 : * different order. See publication_translate_columns.
2078 : */
2079 109 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2080 : " FROM pg_class c\n"
2081 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2082 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2083 : " FROM pg_publication\n"
2084 : " WHERE pubname IN ( %s )) AS gpt\n"
2085 : " ON gpt.relid = c.oid\n",
2086 : pub_names.data);
2087 :
2088 109 : pfree(pub_names.data);
2089 : }
2090 : else
2091 : {
2092 UNC 0 : tableRow[2] = NAMEARRAYOID;
2093 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2094 :
2095 : /* Get column lists for each relation if the publisher supports it */
2096 0 : if (check_columnlist)
2097 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2098 :
2099 0 : appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2100 : " WHERE t.pubname IN (");
2101 0 : get_publications_str(publications, &cmd, true);
2102 0 : appendStringInfoChar(&cmd, ')');
2103 : }
2104 EUB :
2105 GIC 109 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
2106 109 : pfree(cmd.data);
2107 :
2108 109 : if (res->status != WALRCV_OK_TUPLES)
2109 UIC 0 : ereport(ERROR,
2110 : (errcode(ERRCODE_CONNECTION_FAILURE),
2111 EUB : errmsg("could not receive list of replicated tables from the publisher: %s",
2112 : res->err)));
2113 :
2114 : /* Process tables. */
2115 GIC 109 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2116 330 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2117 ECB : {
2118 : char *nspname;
2119 EUB : char *relname;
2120 : bool isnull;
2121 ECB : RangeVar *rv;
2122 :
2123 CBC 222 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2124 222 : Assert(!isnull);
2125 GIC 222 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2126 222 : Assert(!isnull);
2127 :
2128 222 : rv = makeRangeVar(nspname, relname, -1);
2129 :
2130 CBC 222 : if (check_columnlist && list_member(tablelist, rv))
2131 GIC 1 : ereport(ERROR,
2132 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2133 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2134 : nspname, relname));
2135 ECB : else
2136 GIC 221 : tablelist = lappend(tablelist, rv);
2137 ECB :
2138 GBC 221 : ExecClearTuple(slot);
2139 : }
2140 CBC 108 : ExecDropSingleTupleTableSlot(slot);
2141 EUB :
2142 GBC 108 : walrcv_clear_result(res);
2143 :
2144 GIC 108 : return tablelist;
2145 : }
2146 :
2147 : /*
2148 ECB : * This is to report the connection failure while dropping replication slots.
2149 EUB : * Here, we report the WARNING for all tablesync slots so that user can drop
2150 : * them manually, if required.
2151 : */
2152 : static void
2153 UIC 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2154 : {
2155 ECB : ListCell *lc;
2156 :
2157 UIC 0 : foreach(lc, rstates)
2158 : {
2159 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2160 0 : Oid relid = rstate->relid;
2161 :
2162 : /* Only cleanup resources of tablesync workers */
2163 0 : if (!OidIsValid(relid))
2164 LBC 0 : continue;
2165 :
2166 ECB : /*
2167 EUB : * Caller needs to ensure that relstate doesn't change underneath us.
2168 : * See DropSubscription where we get the relstates.
2169 : */
2170 LBC 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
2171 ECB : {
2172 UIC 0 : char syncslotname[NAMEDATALEN] = {0};
2173 :
2174 LBC 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2175 : sizeof(syncslotname));
2176 UIC 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2177 : syncslotname);
2178 ECB : }
2179 : }
2180 :
2181 UIC 0 : ereport(ERROR,
2182 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
2183 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2184 : slotname, err),
2185 : /* translator: %s is an SQL ALTER command */
2186 : errhint("Use %s to disassociate the subscription from the slot.",
2187 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2188 : }
2189 :
2190 : /*
2191 : * Check for duplicates in the given list of publications and error out if
2192 : * found one. Add publications to datums as text datums, if datums is not
2193 : * NULL.
2194 : */
2195 : static void
2196 GIC 174 : check_duplicates_in_publist(List *publist, Datum *datums)
2197 : {
2198 ECB : ListCell *cell;
2199 GIC 174 : int j = 0;
2200 ECB :
2201 GIC 409 : foreach(cell, publist)
2202 : {
2203 CBC 244 : char *name = strVal(lfirst(cell));
2204 EUB : ListCell *pcell;
2205 :
2206 GIC 380 : foreach(pcell, publist)
2207 : {
2208 CBC 380 : char *pname = strVal(lfirst(pcell));
2209 ECB :
2210 GIC 380 : if (pcell == cell)
2211 CBC 235 : break;
2212 :
2213 145 : if (strcmp(name, pname) == 0)
2214 GIC 9 : ereport(ERROR,
2215 ECB : (errcode(ERRCODE_DUPLICATE_OBJECT),
2216 : errmsg("publication name \"%s\" used more than once",
2217 : pname)));
2218 : }
2219 :
2220 GIC 235 : if (datums)
2221 192 : datums[j++] = CStringGetTextDatum(name);
2222 : }
2223 165 : }
2224 :
2225 : /*
2226 EUB : * Merge current subscription's publications and user-specified publications
2227 : * from ADD/DROP PUBLICATIONS.
2228 : *
2229 : * If addpub is true, we will add the list of publications into oldpublist.
2230 : * Otherwise, we will delete the list of publications from oldpublist. The
2231 : * returned list is a copy, oldpublist itself is not changed.
2232 : *
2233 : * subname is the subscription name, for error messages.
2234 : */
2235 : static List *
2236 GBC 27 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2237 : {
2238 : ListCell *lc;
2239 :
2240 27 : oldpublist = list_copy(oldpublist);
2241 :
2242 27 : check_duplicates_in_publist(newpublist, NULL);
2243 :
2244 46 : foreach(lc, newpublist)
2245 EUB : {
2246 GIC 34 : char *name = strVal(lfirst(lc));
2247 : ListCell *lc2;
2248 34 : bool found = false;
2249 :
2250 67 : foreach(lc2, oldpublist)
2251 : {
2252 55 : char *pubname = strVal(lfirst(lc2));
2253 :
2254 55 : if (strcmp(name, pubname) == 0)
2255 : {
2256 22 : found = true;
2257 22 : if (addpub)
2258 6 : ereport(ERROR,
2259 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2260 : errmsg("publication \"%s\" is already in subscription \"%s\"",
2261 : name, subname)));
2262 ECB : else
2263 GIC 16 : oldpublist = foreach_delete_current(oldpublist, lc2);
2264 :
2265 16 : break;
2266 : }
2267 : }
2268 :
2269 CBC 28 : if (addpub && !found)
2270 9 : oldpublist = lappend(oldpublist, makeString(name));
2271 GIC 19 : else if (!addpub && !found)
2272 3 : ereport(ERROR,
2273 ECB : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2274 : errmsg("publication \"%s\" is not in subscription \"%s\"",
2275 : name, subname)));
2276 : }
2277 :
2278 : /*
2279 : * XXX Probably no strong reason for this, but for now it's to make ALTER
2280 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2281 : */
2282 GIC 12 : if (!oldpublist)
2283 3 : ereport(ERROR,
2284 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2285 ECB : errmsg("cannot drop all the publications from a subscription")));
2286 :
2287 GIC 9 : return oldpublist;
2288 : }
2289 :
2290 : /*
2291 : * Extract the streaming mode value from a DefElem. This is like
2292 : * defGetBoolean() but also accepts the special value of "parallel".
2293 : */
2294 : char
2295 GNC 65 : defGetStreamingMode(DefElem *def)
2296 : {
2297 : /*
2298 : * If no parameter value given, assume "true" is meant.
2299 : */
2300 65 : if (!def->arg)
2301 UNC 0 : return LOGICALREP_STREAM_ON;
2302 :
2303 : /*
2304 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2305 : */
2306 GNC 65 : switch (nodeTag(def->arg))
2307 : {
2308 UNC 0 : case T_Integer:
2309 0 : switch (intVal(def->arg))
2310 : {
2311 0 : case 0:
2312 0 : return LOGICALREP_STREAM_OFF;
2313 0 : case 1:
2314 0 : return LOGICALREP_STREAM_ON;
2315 0 : default:
2316 : /* otherwise, error out below */
2317 0 : break;
2318 : }
2319 0 : break;
2320 GNC 65 : default:
2321 : {
2322 65 : char *sval = defGetString(def);
2323 :
2324 : /*
2325 : * The set of strings accepted here should match up with the
2326 : * grammar's opt_boolean_or_string production.
2327 : */
2328 127 : if (pg_strcasecmp(sval, "false") == 0 ||
2329 62 : pg_strcasecmp(sval, "off") == 0)
2330 3 : return LOGICALREP_STREAM_OFF;
2331 115 : if (pg_strcasecmp(sval, "true") == 0 ||
2332 53 : pg_strcasecmp(sval, "on") == 0)
2333 44 : return LOGICALREP_STREAM_ON;
2334 18 : if (pg_strcasecmp(sval, "parallel") == 0)
2335 15 : return LOGICALREP_STREAM_PARALLEL;
2336 : }
2337 3 : break;
2338 : }
2339 :
2340 3 : ereport(ERROR,
2341 : (errcode(ERRCODE_SYNTAX_ERROR),
2342 : errmsg("%s requires a Boolean value or \"parallel\"",
2343 : def->defname)));
2344 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2345 : }
|