Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/table.h"
19 : #include "access/xact.h"
20 : #include "catalog/catalog.h"
21 : #include "catalog/dependency.h"
22 : #include "catalog/indexing.h"
23 : #include "catalog/namespace.h"
24 : #include "catalog/objectaccess.h"
25 : #include "catalog/objectaddress.h"
26 : #include "catalog/pg_authid_d.h"
27 : #include "catalog/pg_database_d.h"
28 : #include "catalog/pg_subscription.h"
29 : #include "catalog/pg_subscription_rel.h"
30 : #include "catalog/pg_type.h"
31 : #include "commands/dbcommands.h"
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/subscriptioncmds.h"
35 : #include "executor/executor.h"
36 : #include "miscadmin.h"
37 : #include "nodes/makefuncs.h"
38 : #include "pgstat.h"
39 : #include "replication/logicallauncher.h"
40 : #include "replication/logicalworker.h"
41 : #include "replication/origin.h"
42 : #include "replication/slot.h"
43 : #include "replication/walreceiver.h"
44 : #include "replication/walsender.h"
45 : #include "replication/worker_internal.h"
46 : #include "storage/lmgr.h"
47 : #include "utils/acl.h"
48 : #include "utils/builtins.h"
49 : #include "utils/guc.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/pg_lsn.h"
53 : #include "utils/syscache.h"
54 :
55 : /*
56 : * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
57 : * command.
58 : */
59 : #define SUBOPT_CONNECT 0x00000001
60 : #define SUBOPT_ENABLED 0x00000002
61 : #define SUBOPT_CREATE_SLOT 0x00000004
62 : #define SUBOPT_SLOT_NAME 0x00000008
63 : #define SUBOPT_COPY_DATA 0x00000010
64 : #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
65 : #define SUBOPT_REFRESH 0x00000040
66 : #define SUBOPT_BINARY 0x00000080
67 : #define SUBOPT_STREAMING 0x00000100
68 : #define SUBOPT_TWOPHASE_COMMIT 0x00000200
69 : #define SUBOPT_DISABLE_ON_ERR 0x00000400
70 : #define SUBOPT_PASSWORD_REQUIRED 0x00000800
71 : #define SUBOPT_RUN_AS_OWNER 0x00001000
72 : #define SUBOPT_LSN 0x00002000
73 : #define SUBOPT_ORIGIN 0x00004000
74 :
75 : /* check if the 'val' has 'bits' set */
76 : #define IsSet(val, bits) (((val) & (bits)) == (bits))
77 :
78 : /*
79 : * Structure to hold a bitmap representing the user-provided CREATE/ALTER
80 : * SUBSCRIPTION command options and the parsed/default values of each of them.
81 : */
82 : typedef struct SubOpts
83 : {
84 : bits32 specified_opts;
85 : char *slot_name;
86 : char *synchronous_commit;
87 : bool connect;
88 : bool enabled;
89 : bool create_slot;
90 : bool copy_data;
91 : bool refresh;
92 : bool binary;
93 : char streaming;
94 : bool twophase;
95 : bool disableonerr;
96 : bool passwordrequired;
97 : bool runasowner;
98 : char *origin;
99 : XLogRecPtr lsn;
100 : } SubOpts;
101 :
102 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
103 : static void check_publications_origin(WalReceiverConn *wrconn,
104 : List *publications, bool copydata,
105 : char *origin, Oid *subrel_local_oids,
106 : int subrel_count, char *subname);
107 : static void check_duplicates_in_publist(List *publist, Datum *datums);
108 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
109 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
110 :
111 :
112 : /*
113 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
114 : *
115 : * Since not all options can be specified in both commands, this function
116 : * will report an error if mutually exclusive options are specified.
117 : */
118 : static void
633 dean.a.rasheed 119 GIC 362 : parse_subscription_options(ParseState *pstate, List *stmt_options,
120 : bits32 supported_opts, SubOpts *opts)
121 : {
122 : ListCell *lc;
123 :
124 : /* Start out with cleared opts. */
487 michael 125 362 : memset(opts, 0, sizeof(SubOpts));
126 :
127 : /* caller must expect some option */
642 akapila 128 362 : Assert(supported_opts != 0);
129 :
130 : /* If connect option is supported, these others also need to be. */
131 362 : Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
132 : IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
642 akapila 133 ECB : SUBOPT_COPY_DATA));
134 :
135 : /* Set default values for the supported options. */
642 akapila 136 GIC 362 : if (IsSet(supported_opts, SUBOPT_CONNECT))
137 177 : opts->connect = true;
138 362 : if (IsSet(supported_opts, SUBOPT_ENABLED))
642 akapila 139 CBC 198 : opts->enabled = true;
642 akapila 140 GIC 362 : if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
141 177 : opts->create_slot = true;
642 akapila 142 CBC 362 : if (IsSet(supported_opts, SUBOPT_COPY_DATA))
642 akapila 143 GIC 251 : opts->copy_data = true;
144 362 : if (IsSet(supported_opts, SUBOPT_REFRESH))
642 akapila 145 CBC 51 : opts->refresh = true;
642 akapila 146 GIC 362 : if (IsSet(supported_opts, SUBOPT_BINARY))
147 255 : opts->binary = false;
148 362 : if (IsSet(supported_opts, SUBOPT_STREAMING))
90 akapila 149 GNC 255 : opts->streaming = LOGICALREP_STREAM_OFF;
634 akapila 150 CBC 362 : if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
151 177 : opts->twophase = false;
391 152 362 : if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
153 255 : opts->disableonerr = false;
10 rhaas 154 GNC 362 : if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
155 255 : opts->passwordrequired = true;
5 156 362 : if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
157 255 : opts->runasowner = false;
262 akapila 158 362 : if (IsSet(supported_opts, SUBOPT_ORIGIN))
159 255 : opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
2271 peter_e 160 ECB :
161 : /* Parse options */
642 akapila 162 CBC 693 : foreach(lc, stmt_options)
2271 peter_e 163 ECB : {
2271 peter_e 164 CBC 361 : DefElem *defel = (DefElem *) lfirst(lc);
2271 peter_e 165 ECB :
642 akapila 166 CBC 361 : if (IsSet(supported_opts, SUBOPT_CONNECT) &&
167 213 : strcmp(defel->defname, "connect") == 0)
2271 peter_e 168 ECB : {
642 akapila 169 CBC 79 : if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
633 dean.a.rasheed 170 LBC 0 : errorConflictingDefElem(defel, pstate);
2271 peter_e 171 ECB :
642 akapila 172 CBC 79 : opts->specified_opts |= SUBOPT_CONNECT;
173 79 : opts->connect = defGetBoolean(defel);
2271 peter_e 174 ECB : }
642 akapila 175 CBC 282 : else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
176 155 : strcmp(defel->defname, "enabled") == 0)
2271 peter_e 177 ECB : {
642 akapila 178 CBC 33 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
633 dean.a.rasheed 179 LBC 0 : errorConflictingDefElem(defel, pstate);
180 :
642 akapila 181 GIC 33 : opts->specified_opts |= SUBOPT_ENABLED;
642 akapila 182 CBC 33 : opts->enabled = defGetBoolean(defel);
183 : }
184 249 : else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
642 akapila 185 GIC 122 : strcmp(defel->defname, "create_slot") == 0)
2271 peter_e 186 ECB : {
642 akapila 187 CBC 16 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
633 dean.a.rasheed 188 UIC 0 : errorConflictingDefElem(defel, pstate);
2271 peter_e 189 ECB :
642 akapila 190 GBC 16 : opts->specified_opts |= SUBOPT_CREATE_SLOT;
642 akapila 191 GIC 16 : opts->create_slot = defGetBoolean(defel);
2271 peter_e 192 ECB : }
642 akapila 193 CBC 233 : else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
642 akapila 194 GIC 185 : strcmp(defel->defname, "slot_name") == 0)
2271 peter_e 195 ECB : {
642 akapila 196 CBC 62 : if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
633 dean.a.rasheed 197 UIC 0 : errorConflictingDefElem(defel, pstate);
2271 peter_e 198 ECB :
642 akapila 199 GBC 62 : opts->specified_opts |= SUBOPT_SLOT_NAME;
642 akapila 200 GIC 62 : opts->slot_name = defGetString(defel);
2161 peter_e 201 ECB :
202 : /* Setting slot_name = NONE is treated as no slot name. */
642 akapila 203 GIC 62 : if (strcmp(opts->slot_name, "none") == 0)
642 akapila 204 CBC 53 : opts->slot_name = NULL;
629 akapila 205 ECB : else
629 akapila 206 GIC 9 : ReplicationSlotValidateName(opts->slot_name, ERROR);
2271 peter_e 207 ECB : }
642 akapila 208 GBC 171 : else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
642 akapila 209 GIC 111 : strcmp(defel->defname, "copy_data") == 0)
2208 peter_e 210 ECB : {
642 akapila 211 CBC 15 : if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
633 dean.a.rasheed 212 UIC 0 : errorConflictingDefElem(defel, pstate);
2208 peter_e 213 ECB :
642 akapila 214 CBC 15 : opts->specified_opts |= SUBOPT_COPY_DATA;
642 akapila 215 GIC 15 : opts->copy_data = defGetBoolean(defel);
2208 peter_e 216 ECB : }
642 akapila 217 GBC 156 : else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
642 akapila 218 GIC 111 : strcmp(defel->defname, "synchronous_commit") == 0)
2186 peter_e 219 ECB : {
642 akapila 220 CBC 6 : if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
633 dean.a.rasheed 221 UIC 0 : errorConflictingDefElem(defel, pstate);
222 :
642 akapila 223 CBC 6 : opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
224 6 : opts->synchronous_commit = defGetString(defel);
225 :
2186 peter_e 226 ECB : /* Test if the given value is valid for synchronous_commit GUC. */
642 akapila 227 GIC 6 : (void) set_config_option("synchronous_commit", opts->synchronous_commit,
2186 peter_e 228 ECB : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
229 : false, 0, false);
230 : }
642 akapila 231 CBC 150 : else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
642 akapila 232 GBC 33 : strcmp(defel->defname, "refresh") == 0)
233 : {
642 akapila 234 CBC 33 : if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
633 dean.a.rasheed 235 LBC 0 : errorConflictingDefElem(defel, pstate);
236 :
642 akapila 237 CBC 33 : opts->specified_opts |= SUBOPT_REFRESH;
238 33 : opts->refresh = defGetBoolean(defel);
239 : }
240 117 : else if (IsSet(supported_opts, SUBOPT_BINARY) &&
642 akapila 241 GBC 105 : strcmp(defel->defname, "binary") == 0)
242 : {
642 akapila 243 CBC 16 : if (IsSet(opts->specified_opts, SUBOPT_BINARY))
633 dean.a.rasheed 244 LBC 0 : errorConflictingDefElem(defel, pstate);
245 :
642 akapila 246 GIC 16 : opts->specified_opts |= SUBOPT_BINARY;
642 akapila 247 CBC 16 : opts->binary = defGetBoolean(defel);
248 : }
642 akapila 249 GIC 101 : else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
250 89 : strcmp(defel->defname, "streaming") == 0)
948 akapila 251 ECB : {
642 akapila 252 CBC 31 : if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
633 dean.a.rasheed 253 UIC 0 : errorConflictingDefElem(defel, pstate);
948 akapila 254 ECB :
642 akapila 255 GBC 31 : opts->specified_opts |= SUBOPT_STREAMING;
90 akapila 256 GNC 31 : opts->streaming = defGetStreamingMode(defel);
948 akapila 257 ECB : }
634 akapila 258 CBC 70 : else if (strcmp(defel->defname, "two_phase") == 0)
259 : {
634 akapila 260 ECB : /*
261 : * Do not allow toggling of two_phase option. Doing so could cause
262 : * missing of transactions and lead to an inconsistent replica.
263 : * See comments atop worker.c
634 akapila 264 EUB : *
265 : * Note: Unsupported twophase indicates that this call originated
634 akapila 266 ECB : * from AlterSubscription.
267 : */
634 akapila 268 GIC 18 : if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
634 akapila 269 CBC 3 : ereport(ERROR,
634 akapila 270 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
271 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
272 :
634 akapila 273 GBC 15 : if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
633 dean.a.rasheed 274 UIC 0 : errorConflictingDefElem(defel, pstate);
634 akapila 275 ECB :
634 akapila 276 CBC 15 : opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
634 akapila 277 GIC 15 : opts->twophase = defGetBoolean(defel);
634 akapila 278 ECB : }
391 akapila 279 GIC 52 : else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
280 40 : strcmp(defel->defname, "disable_on_error") == 0)
281 : {
282 10 : if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
391 akapila 283 UIC 0 : errorConflictingDefElem(defel, pstate);
284 :
391 akapila 285 GIC 10 : opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
286 10 : opts->disableonerr = defGetBoolean(defel);
287 : }
10 rhaas 288 GNC 42 : else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
289 30 : strcmp(defel->defname, "password_required") == 0)
290 : {
291 11 : if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
10 rhaas 292 UNC 0 : errorConflictingDefElem(defel, pstate);
293 :
10 rhaas 294 GNC 11 : opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
295 11 : opts->passwordrequired = defGetBoolean(defel);
296 : }
5 297 31 : else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
298 19 : strcmp(defel->defname, "run_as_owner") == 0)
299 : {
300 1 : if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
5 rhaas 301 UNC 0 : errorConflictingDefElem(defel, pstate);
302 :
5 rhaas 303 GNC 1 : opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
304 1 : opts->runasowner = defGetBoolean(defel);
305 : }
262 akapila 306 30 : else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
307 18 : strcmp(defel->defname, "origin") == 0)
308 : {
309 15 : if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
262 akapila 310 UNC 0 : errorConflictingDefElem(defel, pstate);
311 :
262 akapila 312 GNC 15 : opts->specified_opts |= SUBOPT_ORIGIN;
313 15 : pfree(opts->origin);
314 :
315 : /*
316 : * Even though the "origin" parameter allows only "none" and "any"
317 : * values, it is implemented as a string type so that the
318 : * parameter can be extended in future versions to support
319 : * filtering using origin names specified by the user.
320 : */
321 15 : opts->origin = defGetString(defel);
322 :
323 22 : if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
324 7 : (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
325 3 : ereport(ERROR,
326 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
327 : errmsg("unrecognized origin value: \"%s\"", opts->origin));
328 : }
383 akapila 329 CBC 15 : else if (IsSet(supported_opts, SUBOPT_LSN) &&
330 12 : strcmp(defel->defname, "lsn") == 0)
383 akapila 331 GIC 9 : {
332 12 : char *lsn_str = defGetString(defel);
333 : XLogRecPtr lsn;
383 akapila 334 ECB :
383 akapila 335 GBC 12 : if (IsSet(opts->specified_opts, SUBOPT_LSN))
383 akapila 336 UIC 0 : errorConflictingDefElem(defel, pstate);
383 akapila 337 ECB :
338 : /* Setting lsn = NONE is treated as resetting LSN */
383 akapila 339 GIC 12 : if (strcmp(lsn_str, "none") == 0)
383 akapila 340 CBC 3 : lsn = InvalidXLogRecPtr;
383 akapila 341 ECB : else
342 : {
343 : /* Parse the argument as LSN */
383 akapila 344 GBC 9 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
345 : CStringGetDatum(lsn_str)));
383 akapila 346 ECB :
383 akapila 347 CBC 9 : if (XLogRecPtrIsInvalid(lsn))
383 akapila 348 GIC 3 : ereport(ERROR,
383 akapila 349 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
350 : errmsg("invalid WAL location (LSN): %s", lsn_str)));
351 : }
352 :
383 akapila 353 GBC 9 : opts->specified_opts |= SUBOPT_LSN;
383 akapila 354 GIC 9 : opts->lsn = lsn;
383 akapila 355 ECB : }
2271 peter_e 356 : else
2153 peter_e 357 GIC 3 : ereport(ERROR,
2153 peter_e 358 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
1424 alvherre 359 : errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
360 : }
2208 peter_e 361 :
2208 peter_e 362 EUB : /*
363 : * We've been explicitly asked to not connect, that requires some
2208 peter_e 364 ECB : * additional processing.
365 : */
642 akapila 366 GIC 332 : if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
2208 peter_e 367 ECB : {
368 : /* Check for incompatible options from the user. */
642 akapila 369 GIC 64 : if (opts->enabled &&
642 akapila 370 CBC 64 : IsSet(opts->specified_opts, SUBOPT_ENABLED))
2208 peter_e 371 GBC 3 : ereport(ERROR,
372 : (errcode(ERRCODE_SYNTAX_ERROR),
1424 alvherre 373 ECB : /*- translator: both %s are strings of the form "option = value" */
374 : errmsg("%s and %s are mutually exclusive options",
375 : "connect = false", "enabled = true")));
376 :
642 akapila 377 GIC 61 : if (opts->create_slot &&
378 58 : IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
2208 peter_e 379 3 : ereport(ERROR,
380 : (errcode(ERRCODE_SYNTAX_ERROR),
381 : errmsg("%s and %s are mutually exclusive options",
1424 alvherre 382 ECB : "connect = false", "create_slot = true")));
383 :
642 akapila 384 CBC 58 : if (opts->copy_data &&
385 55 : IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
2208 peter_e 386 3 : ereport(ERROR,
387 : (errcode(ERRCODE_SYNTAX_ERROR),
388 : errmsg("%s and %s are mutually exclusive options",
389 : "connect = false", "copy_data = true")));
2208 peter_e 390 ECB :
391 : /* Change the defaults of other options. */
642 akapila 392 CBC 55 : opts->enabled = false;
393 55 : opts->create_slot = false;
642 akapila 394 GIC 55 : opts->copy_data = false;
395 : }
2161 peter_e 396 ECB :
2161 peter_e 397 EUB : /*
398 : * Do additional checking for disallowed combination when slot_name = NONE
399 : * was used.
2161 peter_e 400 ECB : */
642 akapila 401 CBC 323 : if (!opts->slot_name &&
642 akapila 402 GIC 317 : IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
403 : {
487 michael 404 50 : if (opts->enabled)
487 michael 405 ECB : {
487 michael 406 GIC 9 : if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
407 3 : ereport(ERROR,
487 michael 408 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
409 : /*- translator: both %s are strings of the form "option = value" */
410 : errmsg("%s and %s are mutually exclusive options",
411 : "slot_name = NONE", "enabled = true")));
412 : else
487 michael 413 GIC 6 : ereport(ERROR,
487 michael 414 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
415 : /*- translator: both %s are strings of the form "option = value" */
416 : errmsg("subscription with %s must also set %s",
417 : "slot_name = NONE", "enabled = false")));
418 : }
419 :
487 michael 420 GIC 41 : if (opts->create_slot)
421 : {
422 6 : if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
423 3 : ereport(ERROR,
424 : (errcode(ERRCODE_SYNTAX_ERROR),
425 : /*- translator: both %s are strings of the form "option = value" */
426 : errmsg("%s and %s are mutually exclusive options",
487 michael 427 ECB : "slot_name = NONE", "create_slot = true")));
428 : else
487 michael 429 GIC 3 : ereport(ERROR,
487 michael 430 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
431 : /*- translator: both %s are strings of the form "option = value" */
432 : errmsg("subscription with %s must also set %s",
433 : "slot_name = NONE", "create_slot = false")));
434 : }
435 : }
2271 peter_e 436 GIC 308 : }
437 :
374 akapila 438 ECB : /*
439 : * Add publication names from the list to a string.
440 : */
441 : static void
374 akapila 442 GIC 208 : get_publications_str(List *publications, StringInfo dest, bool quote_literal)
443 : {
444 : ListCell *lc;
374 akapila 445 CBC 208 : bool first = true;
374 akapila 446 ECB :
235 tgl 447 GNC 208 : Assert(publications != NIL);
448 :
374 akapila 449 GIC 495 : foreach(lc, publications)
450 : {
451 287 : char *pubname = strVal(lfirst(lc));
452 :
374 akapila 453 CBC 287 : if (first)
454 208 : first = false;
374 akapila 455 ECB : else
374 akapila 456 GIC 79 : appendStringInfoString(dest, ", ");
457 :
458 287 : if (quote_literal)
459 281 : appendStringInfoString(dest, quote_literal_cstr(pubname));
460 : else
461 : {
374 akapila 462 CBC 6 : appendStringInfoChar(dest, '"');
463 6 : appendStringInfoString(dest, pubname);
374 akapila 464 GIC 6 : appendStringInfoChar(dest, '"');
374 akapila 465 ECB : }
466 : }
374 akapila 467 CBC 208 : }
374 akapila 468 ECB :
469 : /*
470 : * Check that the specified publications are present on the publisher.
471 : */
472 : static void
374 akapila 473 GIC 88 : check_publications(WalReceiverConn *wrconn, List *publications)
374 akapila 474 ECB : {
475 : WalRcvExecResult *res;
476 : StringInfo cmd;
477 : TupleTableSlot *slot;
374 akapila 478 GIC 88 : List *publicationsCopy = NIL;
479 88 : Oid tableRow[1] = {TEXTOID};
480 :
374 akapila 481 CBC 88 : cmd = makeStringInfo();
374 akapila 482 GIC 88 : appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
374 akapila 483 ECB : " pg_catalog.pg_publication t WHERE\n"
484 : " t.pubname IN (");
374 akapila 485 GIC 88 : get_publications_str(publications, cmd, true);
486 88 : appendStringInfoChar(cmd, ')');
487 :
488 88 : res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
489 88 : pfree(cmd->data);
374 akapila 490 CBC 88 : pfree(cmd);
491 :
374 akapila 492 GIC 88 : if (res->status != WALRCV_OK_TUPLES)
374 akapila 493 UIC 0 : ereport(ERROR,
494 : errmsg("could not receive list of publications from the publisher: %s",
495 : res->err));
496 :
374 akapila 497 CBC 88 : publicationsCopy = list_copy(publications);
498 :
499 : /* Process publication(s). */
374 akapila 500 GIC 88 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
501 202 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
502 : {
374 akapila 503 ECB : char *pubname;
504 : bool isnull;
505 :
374 akapila 506 CBC 114 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
374 akapila 507 GIC 114 : Assert(!isnull);
374 akapila 508 ECB :
509 : /* Delete the publication present in publisher from the list. */
374 akapila 510 CBC 114 : publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
374 akapila 511 GIC 114 : ExecClearTuple(slot);
374 akapila 512 ECB : }
513 :
374 akapila 514 CBC 88 : ExecDropSingleTupleTableSlot(slot);
374 akapila 515 ECB :
374 akapila 516 GIC 88 : walrcv_clear_result(res);
374 akapila 517 ECB :
374 akapila 518 GIC 88 : if (list_length(publicationsCopy))
374 akapila 519 ECB : {
520 : /* Prepare the list of non-existent publication(s) for error message. */
374 akapila 521 GIC 3 : StringInfo pubnames = makeStringInfo();
522 :
374 akapila 523 CBC 3 : get_publications_str(publicationsCopy, pubnames, false);
524 3 : ereport(WARNING,
374 akapila 525 ECB : errcode(ERRCODE_UNDEFINED_OBJECT),
526 : errmsg_plural("publication %s does not exist on the publisher",
527 : "publications %s do not exist on the publisher",
528 : list_length(publicationsCopy),
529 : pubnames->data));
530 : }
374 akapila 531 GIC 88 : }
532 :
533 : /*
2024 tgl 534 ECB : * Auxiliary function to build a text array out of a list of String nodes.
535 : */
536 : static Datum
2271 peter_e 537 GIC 147 : publicationListToArray(List *publist)
538 : {
2271 peter_e 539 ECB : ArrayType *arr;
540 : Datum *datums;
541 : MemoryContext memcxt;
542 : MemoryContext oldcxt;
543 :
544 : /* Create memory context for temporary allocations. */
2271 peter_e 545 GIC 147 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
2271 peter_e 546 ECB : "publicationListToArray to array",
1943 tgl 547 : ALLOCSET_DEFAULT_SIZES);
2271 peter_e 548 GIC 147 : oldcxt = MemoryContextSwitchTo(memcxt);
2271 peter_e 549 ECB :
2024 tgl 550 CBC 147 : datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
2024 tgl 551 ECB :
733 peter 552 GIC 147 : check_duplicates_in_publist(publist, datums);
2271 peter_e 553 ECB :
2271 peter_e 554 GBC 144 : MemoryContextSwitchTo(oldcxt);
555 :
282 peter 556 GNC 144 : arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
2024 tgl 557 ECB :
2271 peter_e 558 GIC 144 : MemoryContextDelete(memcxt);
559 :
2271 peter_e 560 CBC 144 : return PointerGetDatum(arr);
2271 peter_e 561 ECB : }
562 :
563 : /*
564 : * Create new subscription.
565 : */
566 : ObjectAddress
633 dean.a.rasheed 567 CBC 177 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
568 : bool isTopLevel)
569 : {
2271 peter_e 570 ECB : Relation rel;
571 : ObjectAddress myself;
572 : Oid subid;
573 : bool nulls[Natts_pg_subscription];
574 : Datum values[Natts_pg_subscription];
2270 alvherre 575 GIC 177 : Oid owner = GetUserId();
2271 peter_e 576 ECB : HeapTuple tup;
577 : char *conninfo;
578 : char originname[NAMEDATALEN];
579 : List *publications;
580 : bits32 supported_opts;
642 akapila 581 CBC 177 : SubOpts opts = {0};
582 : AclResult aclresult;
583 :
2228 peter_e 584 ECB : /*
585 : * Parse and check options.
586 : *
587 : * Connection and publication should not be specified here.
588 : */
642 akapila 589 GIC 177 : supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
590 : SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
591 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
391 akapila 592 ECB : SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
593 : SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
594 : SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
633 dean.a.rasheed 595 GIC 177 : parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
596 :
597 : /*
598 : * Since creating a replication slot is not transactional, rolling back
2228 peter_e 599 ECB : * the transaction leaves the created replication slot. So we cannot run
600 : * CREATE SUBSCRIPTION inside a transaction block if creating a
601 : * replication slot.
602 : */
642 akapila 603 GIC 138 : if (opts.create_slot)
1878 peter_e 604 82 : PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
605 :
606 : /*
607 : * We don't want to allow unprivileged users to be able to trigger attempts
608 : * to access arbitrary network destinations, so require the user to have
609 : * been specifically authorized to create subscriptions.
610 : */
10 rhaas 611 GNC 135 : if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
2271 peter_e 612 CBC 3 : ereport(ERROR,
613 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
614 : errmsg("must have privileges of pg_create_subscription to create subscriptions")));
615 :
616 : /*
617 : * Since a subscription is a database object, we also check for CREATE
618 : * permission on the database.
619 : */
10 rhaas 620 GNC 132 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
621 : owner, ACL_CREATE);
622 132 : if (aclresult != ACLCHECK_OK)
623 6 : aclcheck_error(aclresult, OBJECT_DATABASE,
624 3 : get_database_name(MyDatabaseId));
625 :
626 : /*
627 : * Non-superusers are required to set a password for authentication, and
628 : * that password must be used by the target server, but the superuser can
629 : * exempt a subscription from this requirement.
630 : */
631 129 : if (!opts.passwordrequired && !superuser_arg(owner))
632 3 : ereport(ERROR,
633 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
634 : errmsg("password_required=false is superuser-only"),
635 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2271 peter_e 636 ECB :
637 : /*
1380 tgl 638 : * If built with appropriate switch, whine when regression-testing
639 : * conventions for subscription names are violated.
640 : */
641 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
642 : if (strncmp(stmt->subname, "regress_", 8) != 0)
643 : elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
644 : #endif
645 :
1539 andres 646 CBC 126 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
647 :
2271 peter_e 648 ECB : /* Check if name is used */
1601 andres 649 GIC 126 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
650 : MyDatabaseId, CStringGetDatum(stmt->subname));
2271 peter_e 651 126 : if (OidIsValid(subid))
652 : {
653 3 : ereport(ERROR,
654 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2271 peter_e 655 ECB : errmsg("subscription \"%s\" already exists",
656 : stmt->subname)));
657 : }
658 :
642 akapila 659 GIC 123 : if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
660 110 : opts.slot_name == NULL)
661 110 : opts.slot_name = stmt->subname;
662 :
2186 peter_e 663 ECB : /* The default for synchronous_commit of subscriptions is off. */
642 akapila 664 GIC 123 : if (opts.synchronous_commit == NULL)
665 123 : opts.synchronous_commit = "off";
666 :
2271 peter_e 667 123 : conninfo = stmt->conninfo;
668 123 : publications = stmt->publication;
2271 peter_e 669 ECB :
670 : /* Load the library providing us libpq calls. */
2271 peter_e 671 GIC 123 : load_file("libpqwalreceiver", false);
672 :
673 : /* Check the connection info string. */
10 rhaas 674 GNC 123 : walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
675 :
676 : /* Everything ok, form a new tuple. */
2271 peter_e 677 CBC 114 : memset(values, 0, sizeof(values));
2271 peter_e 678 GIC 114 : memset(nulls, false, sizeof(nulls));
679 :
1601 andres 680 114 : subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
681 : Anum_pg_subscription_oid);
682 114 : values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
2271 peter_e 683 CBC 114 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
367 akapila 684 GIC 114 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
2271 peter_e 685 114 : values[Anum_pg_subscription_subname - 1] =
686 114 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
2270 alvherre 687 114 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
642 akapila 688 114 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
689 114 : values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
90 akapila 690 GNC 114 : values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
634 akapila 691 CBC 114 : values[Anum_pg_subscription_subtwophasestate - 1] =
692 114 : CharGetDatum(opts.twophase ?
693 : LOGICALREP_TWOPHASE_STATE_PENDING :
694 : LOGICALREP_TWOPHASE_STATE_DISABLED);
391 akapila 695 GIC 114 : values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
10 rhaas 696 GNC 114 : values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
5 697 114 : values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
2271 peter_e 698 GIC 114 : values[Anum_pg_subscription_subconninfo - 1] =
699 114 : CStringGetTextDatum(conninfo);
642 akapila 700 114 : if (opts.slot_name)
2161 peter_e 701 CBC 104 : values[Anum_pg_subscription_subslotname - 1] =
642 akapila 702 104 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
703 : else
2161 peter_e 704 GIC 10 : nulls[Anum_pg_subscription_subslotname - 1] = true;
2186 705 114 : values[Anum_pg_subscription_subsynccommit - 1] =
642 akapila 706 114 : CStringGetTextDatum(opts.synchronous_commit);
2271 peter_e 707 111 : values[Anum_pg_subscription_subpublications - 1] =
2153 bruce 708 114 : publicationListToArray(publications);
262 akapila 709 GNC 111 : values[Anum_pg_subscription_suborigin - 1] =
710 111 : CStringGetTextDatum(opts.origin);
711 :
2271 peter_e 712 CBC 111 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
713 :
2271 peter_e 714 ECB : /* Insert tuple into catalog. */
1601 andres 715 CBC 111 : CatalogTupleInsert(rel, tup);
2271 peter_e 716 111 : heap_freetuple(tup);
717 :
2270 alvherre 718 GIC 111 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
719 :
180 akapila 720 GNC 111 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
2271 peter_e 721 GIC 111 : replorigin_create(originname);
722 :
2271 peter_e 723 ECB : /*
2208 724 : * Connect to remote side to execute requested commands and fetch table
725 : * info.
726 : */
642 akapila 727 GIC 111 : if (opts.connect)
728 : {
729 : char *err;
730 : WalReceiverConn *wrconn;
731 : List *tables;
732 : ListCell *lc;
733 : char table_state;
734 : bool must_use_password;
735 :
736 : /* Try to connect to the publisher. */
10 rhaas 737 GNC 74 : must_use_password = !superuser_arg(owner) && opts.passwordrequired;
738 74 : wrconn = walrcv_connect(conninfo, true, must_use_password,
739 : stmt->subname, &err);
2271 peter_e 740 GIC 74 : if (!wrconn)
2271 peter_e 741 CBC 3 : ereport(ERROR,
742 : (errcode(ERRCODE_CONNECTION_FAILURE),
743 : errmsg("could not connect to the publisher: %s", err)));
2271 peter_e 744 ECB :
2265 peter_e 745 GIC 71 : PG_TRY();
2265 peter_e 746 ECB : {
374 akapila 747 GIC 71 : check_publications(wrconn, publications);
213 akapila 748 GNC 71 : check_publications_origin(wrconn, publications, opts.copy_data,
749 : opts.origin, NULL, 0, stmt->subname);
374 akapila 750 ECB :
751 : /*
752 : * Set sync state based on if we were asked to do data copy or
753 : * not.
754 : */
367 tomas.vondra 755 GIC 71 : table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
2208 peter_e 756 ECB :
757 : /*
367 tomas.vondra 758 : * Get the table list from publisher and build local table status
759 : * info.
760 : */
367 tomas.vondra 761 CBC 71 : tables = fetch_table_list(wrconn, publications);
762 198 : foreach(lc, tables)
763 : {
2208 peter_e 764 128 : RangeVar *rv = (RangeVar *) lfirst(lc);
2208 peter_e 765 ECB : Oid relid;
766 :
2207 peter_e 767 GIC 128 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
2208 peter_e 768 ECB :
769 : /* Check for supported relkind. */
2154 peter_e 770 GIC 128 : CheckSubscriptionRelkind(get_rel_relkind(relid),
2154 peter_e 771 CBC 128 : rv->schemaname, rv->relname);
772 :
367 tomas.vondra 773 GIC 128 : AddSubscriptionRelState(subid, relid, table_state,
1829 peter_e 774 ECB : InvalidXLogRecPtr);
2208 775 : }
776 :
2179 777 : /*
778 : * If requested, create permanent slot for the subscription. We
2153 bruce 779 : * won't use the initial snapshot for anything, so no need to
780 : * export it.
2179 peter_e 781 : */
642 akapila 782 CBC 70 : if (opts.create_slot)
2179 peter_e 783 ECB : {
634 akapila 784 CBC 69 : bool twophase_enabled = false;
634 akapila 785 ECB :
642 akapila 786 CBC 69 : Assert(opts.slot_name);
2161 peter_e 787 ECB :
634 akapila 788 : /*
789 : * Even if two_phase is set, don't create the slot with
790 : * two-phase enabled. Will enable it once all the tables are
791 : * synced and ready. This avoids race-conditions like prepared
792 : * transactions being skipped due to changes not being applied
793 : * due to checks in should_apply_changes_for_rel() when
794 : * tablesync for the corresponding tables are in progress. See
795 : * comments atop worker.c.
796 : *
797 : * Note that if tables were specified but copy_data is false
798 : * then it is safe to enable two_phase up-front because those
367 tomas.vondra 799 : * tables are already initially in READY state. When the
800 : * subscription has no tables, we leave the twophase state as
801 : * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
634 akapila 802 : * PUBLICATION to work.
803 : */
367 tomas.vondra 804 CBC 69 : if (opts.twophase && !opts.copy_data && tables != NIL)
634 akapila 805 1 : twophase_enabled = true;
634 akapila 806 ECB :
634 akapila 807 CBC 69 : walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
808 : CRS_NOEXPORT_SNAPSHOT, NULL);
634 akapila 809 ECB :
634 akapila 810 GIC 69 : if (twophase_enabled)
811 1 : UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
634 akapila 812 ECB :
2179 peter_e 813 CBC 69 : ereport(NOTICE,
814 : (errmsg("created replication slot \"%s\" on publisher",
642 akapila 815 ECB : opts.slot_name)));
816 : }
2265 peter_e 817 : }
1255 peter 818 CBC 1 : PG_FINALLY();
819 : {
2265 peter_e 820 GIC 71 : walrcv_disconnect(wrconn);
821 : }
822 71 : PG_END_TRY();
823 : }
2208 peter_e 824 ECB : else
2208 peter_e 825 GIC 37 : ereport(WARNING,
826 : (errmsg("subscription was created, but is not connected"),
827 : errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
828 :
1539 andres 829 107 : table_close(rel, RowExclusiveLock);
830 :
368 831 107 : pgstat_create_subscription(subid);
832 :
642 akapila 833 CBC 107 : if (opts.enabled)
2169 peter_e 834 70 : ApplyLauncherWakeupAtCommit();
835 :
2271 836 107 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
2271 peter_e 837 ECB :
2271 peter_e 838 GIC 107 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
839 :
840 107 : return myself;
2271 peter_e 841 ECB : }
842 :
2208 843 : static void
374 akapila 844 CBC 38 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
845 : List *validate_publications)
846 : {
847 : char *err;
848 : List *pubrel_names;
849 : List *subrel_states;
850 : Oid *subrel_local_oids;
2208 peter_e 851 ECB : Oid *pubrel_local_oids;
852 : ListCell *lc;
853 : int off;
854 : int remove_rel_len;
855 : int subrel_count;
786 akapila 856 GIC 38 : Relation rel = NULL;
857 : typedef struct SubRemoveRels
786 akapila 858 ECB : {
859 : Oid relid;
860 : char state;
861 : } SubRemoveRels;
862 : SubRemoveRels *sub_remove_rels;
863 : WalReceiverConn *wrconn;
864 : bool must_use_password;
2208 peter_e 865 :
866 : /* Load the library providing us libpq calls. */
2208 peter_e 867 GIC 38 : load_file("libpqwalreceiver", false);
2208 peter_e 868 ECB :
702 alvherre 869 : /* Try to connect to the publisher. */
10 rhaas 870 GNC 38 : must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
871 38 : wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
872 : sub->name, &err);
702 alvherre 873 CBC 38 : if (!wrconn)
702 alvherre 874 UIC 0 : ereport(ERROR,
875 : (errcode(ERRCODE_CONNECTION_FAILURE),
876 : errmsg("could not connect to the publisher: %s", err)));
877 :
786 akapila 878 GIC 38 : PG_TRY();
879 : {
374 880 38 : if (validate_publications)
881 17 : check_publications(wrconn, validate_publications);
374 akapila 882 ECB :
883 : /* Get the table list from publisher. */
786 akapila 884 CBC 38 : pubrel_names = fetch_table_list(wrconn, sub->publications);
885 :
786 akapila 886 ECB : /* Get local table list. */
256 michael 887 GNC 38 : subrel_states = GetSubscriptionRelations(sub->oid, false);
213 akapila 888 38 : subrel_count = list_length(subrel_states);
889 :
890 : /*
891 : * Build qsorted array of local table oids for faster lookup. This can
892 : * potentially contain all tables in the database so speed of lookup
893 : * is important.
894 : */
895 38 : subrel_local_oids = palloc(subrel_count * sizeof(Oid));
786 akapila 896 GIC 38 : off = 0;
897 131 : foreach(lc, subrel_states)
898 : {
899 93 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
900 :
901 93 : subrel_local_oids[off++] = relstate->relid;
902 : }
213 akapila 903 GNC 38 : qsort(subrel_local_oids, subrel_count,
904 : sizeof(Oid), oid_cmp);
786 akapila 905 ECB :
213 akapila 906 GNC 38 : check_publications_origin(wrconn, sub->publications, copy_data,
907 : sub->origin, subrel_local_oids,
908 : subrel_count, sub->name);
909 :
786 akapila 910 ECB : /*
911 : * Rels that we want to remove from subscription and drop any slots
912 : * and origins corresponding to them.
913 : */
213 akapila 914 GNC 38 : sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
786 akapila 915 ECB :
916 : /*
917 : * Walk over the remote tables and try to match them to locally known
918 : * tables. If the table is not known locally create a new state for
919 : * it.
920 : *
921 : * Also builds array of local oids of remote tables for the next step.
922 : */
786 akapila 923 CBC 38 : off = 0;
786 akapila 924 GIC 38 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
786 akapila 925 ECB :
786 akapila 926 GIC 130 : foreach(lc, pubrel_names)
786 akapila 927 ECB : {
786 akapila 928 GIC 92 : RangeVar *rv = (RangeVar *) lfirst(lc);
929 : Oid relid;
2208 peter_e 930 ECB :
786 akapila 931 GIC 92 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
932 :
933 : /* Check for supported relkind. */
786 akapila 934 CBC 92 : CheckSubscriptionRelkind(get_rel_relkind(relid),
786 akapila 935 GIC 92 : rv->schemaname, rv->relname);
2208 peter_e 936 ECB :
786 akapila 937 GIC 92 : pubrel_local_oids[off++] = relid;
2154 peter_e 938 ECB :
786 akapila 939 CBC 92 : if (!bsearch(&relid, subrel_local_oids,
940 : subrel_count, sizeof(Oid), oid_cmp))
786 akapila 941 ECB : {
786 akapila 942 GIC 29 : AddSubscriptionRelState(sub->oid, relid,
786 akapila 943 ECB : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
944 : InvalidXLogRecPtr);
786 akapila 945 CBC 29 : ereport(DEBUG1,
946 : (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
947 : rv->schemaname, rv->relname, sub->name)));
948 : }
786 akapila 949 ECB : }
950 :
951 : /*
952 : * Next remove state for tables we should not care about anymore using
953 : * the data we collected above
954 : */
786 akapila 955 GIC 38 : qsort(pubrel_local_oids, list_length(pubrel_names),
956 : sizeof(Oid), oid_cmp);
957 :
958 38 : remove_rel_len = 0;
213 akapila 959 GNC 131 : for (off = 0; off < subrel_count; off++)
960 : {
786 akapila 961 CBC 93 : Oid relid = subrel_local_oids[off];
962 :
786 akapila 963 GIC 93 : if (!bsearch(&relid, pubrel_local_oids,
964 93 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
965 : {
966 : char state;
967 : XLogRecPtr statelsn;
968 :
969 : /*
970 : * Lock pg_subscription_rel with AccessExclusiveLock to
971 : * prevent any race conditions with the apply worker
786 akapila 972 ECB : * re-launching workers at the same time this code is trying
973 : * to remove those tables.
974 : *
975 : * Even if new worker for this particular rel is restarted it
976 : * won't be able to make any progress as we hold exclusive
977 : * lock on subscription_rel till the transaction end. It will
978 : * simply exit as there is no corresponding rel entry.
786 akapila 979 EUB : *
980 : * This locking also ensures that the state of rels won't
981 : * change till we are done with this refresh operation.
982 : */
786 akapila 983 CBC 30 : if (!rel)
786 akapila 984 GIC 15 : rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
786 akapila 985 ECB :
986 : /* Last known rel state. */
786 akapila 987 GIC 30 : state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
988 :
786 akapila 989 CBC 30 : sub_remove_rels[remove_rel_len].relid = relid;
786 akapila 990 GIC 30 : sub_remove_rels[remove_rel_len++].state = state;
991 :
786 akapila 992 CBC 30 : RemoveSubscriptionRel(sub->oid, relid);
786 akapila 993 ECB :
786 akapila 994 GIC 30 : logicalrep_worker_stop(sub->oid, relid);
995 :
996 : /*
997 : * For READY state, we would have already dropped the
998 : * tablesync origin.
999 : */
209 akapila 1000 CBC 30 : if (state != SUBREL_STATE_READY)
786 akapila 1001 ECB : {
1002 : char originname[NAMEDATALEN];
1003 :
1004 : /*
1005 : * Drop the tablesync's origin tracking if exists.
1006 : *
1007 : * It is possible that the origin is not yet created for
209 1008 : * tablesync worker, this can happen for the states before
1009 : * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1010 : * apply worker can also concurrently try to drop the
1011 : * origin and by this time the origin might be already
1012 : * removed. For these reasons, passing missing_ok = true.
1013 : */
180 akapila 1014 UNC 0 : ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1015 : sizeof(originname));
786 akapila 1016 UIC 0 : replorigin_drop_by_name(originname, true, false);
1017 : }
1018 :
786 akapila 1019 CBC 30 : ereport(DEBUG1,
1020 : (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1021 : get_namespace_name(get_rel_namespace(relid)),
1022 : get_rel_name(relid),
1023 : sub->name)));
1024 : }
1025 : }
1026 :
1027 : /*
786 akapila 1028 ECB : * Drop the tablesync slots associated with removed tables. This has
1029 : * to be at the end because otherwise if there is an error while doing
1030 : * the database operations we won't be able to rollback dropped slots.
1031 : */
786 akapila 1032 GIC 68 : for (off = 0; off < remove_rel_len; off++)
2208 peter_e 1033 ECB : {
786 akapila 1034 GIC 30 : if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
786 akapila 1035 UIC 0 : sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
786 akapila 1036 ECB : {
786 akapila 1037 UIC 0 : char syncslotname[NAMEDATALEN] = {0};
1038 :
786 akapila 1039 ECB : /*
1040 : * For READY/SYNCDONE states we know the tablesync slot has
1041 : * already been dropped by the tablesync worker.
1042 : *
1043 : * For other states, there is no certainty, maybe the slot
1044 : * does not exist yet. Also, if we fail after removing some of
1045 : * the slots, next time, it will again try to drop already
1046 : * dropped slots and fail. For these reasons, we allow
1047 : * missing_ok = true for the drop.
1048 : */
783 akapila 1049 UIC 0 : ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
783 akapila 1050 ECB : syncslotname, sizeof(syncslotname));
786 akapila 1051 UIC 0 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1052 : }
1053 : }
1054 : }
1055 0 : PG_FINALLY();
1056 : {
702 alvherre 1057 GIC 38 : walrcv_disconnect(wrconn);
1058 : }
786 akapila 1059 38 : PG_END_TRY();
786 akapila 1060 ECB :
786 akapila 1061 GIC 38 : if (rel)
1062 15 : table_close(rel, NoLock);
2208 peter_e 1063 CBC 38 : }
2208 peter_e 1064 ECB :
1065 : /*
2271 1066 : * Alter the existing subscription.
1067 : */
1068 : ObjectAddress
633 dean.a.rasheed 1069 CBC 198 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1070 : bool isTopLevel)
1071 : {
1072 : Relation rel;
1073 : ObjectAddress myself;
1074 : bool nulls[Natts_pg_subscription];
1075 : bool replaces[Natts_pg_subscription];
1076 : Datum values[Natts_pg_subscription];
1077 : HeapTuple tup;
1078 : Oid subid;
2208 peter_e 1079 GIC 198 : bool update_tuple = false;
1080 : Subscription *sub;
1081 : Form_pg_subscription form;
1082 : bits32 supported_opts;
642 akapila 1083 198 : SubOpts opts = {0};
1084 :
1539 andres 1085 198 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1086 :
1087 : /* Fetch the existing tuple. */
2271 peter_e 1088 CBC 198 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
2271 peter_e 1089 ECB : CStringGetDatum(stmt->subname));
1090 :
2271 peter_e 1091 GIC 198 : if (!HeapTupleIsValid(tup))
2271 peter_e 1092 CBC 3 : ereport(ERROR,
1093 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2271 peter_e 1094 ECB : errmsg("subscription \"%s\" does not exist",
1095 : stmt->subname)));
1096 :
1601 andres 1097 CBC 195 : form = (Form_pg_subscription) GETSTRUCT(tup);
1601 andres 1098 GIC 195 : subid = form->oid;
1601 andres 1099 ECB :
1100 : /* must be owner */
147 peter 1101 GNC 195 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1954 peter_e 1102 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2271 1103 0 : stmt->subname);
1104 :
2161 peter_e 1105 CBC 195 : sub = GetSubscription(subid, false);
1106 :
1107 : /*
1108 : * Don't allow non-superuser modification of a subscription with
1109 : * password_required=false.
1110 : */
10 rhaas 1111 GNC 195 : if (!sub->passwordrequired && !superuser())
10 rhaas 1112 UNC 0 : ereport(ERROR,
1113 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1114 : errmsg("password_required=false is superuser-only"),
1115 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1116 :
1117 : /* Lock the subscription so nobody else can do anything with it. */
2106 peter_e 1118 GIC 195 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1119 :
1120 : /* Form a new tuple. */
2271 1121 195 : memset(values, 0, sizeof(values));
1122 195 : memset(nulls, false, sizeof(nulls));
1123 195 : memset(replaces, false, sizeof(replaces));
1124 :
2208 1125 195 : switch (stmt->kind)
1126 : {
1127 78 : case ALTER_SUBSCRIPTION_OPTIONS:
1128 : {
642 akapila 1129 GBC 78 : supported_opts = (SUBOPT_SLOT_NAME |
1130 : SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
1131 : SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
1132 : SUBOPT_PASSWORD_REQUIRED |
1133 : SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
1134 :
633 dean.a.rasheed 1135 GIC 78 : parse_subscription_options(pstate, stmt->options,
633 dean.a.rasheed 1136 ECB : supported_opts, &opts);
1137 :
642 akapila 1138 GIC 66 : if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1139 : {
1140 : /*
1141 : * The subscription must be disabled to allow slot_name as
1142 : * 'none', otherwise, the apply worker will repeatedly try
1143 : * to stream the data using that slot_name which neither
1144 : * exists on the publisher nor the user will be allowed to
1145 : * create it.
1146 : */
1147 28 : if (sub->enabled && !opts.slot_name)
2161 peter_e 1148 UIC 0 : ereport(ERROR,
662 tgl 1149 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1150 : errmsg("cannot set %s for enabled subscription",
1424 alvherre 1151 : "slot_name = NONE")));
2161 peter_e 1152 EUB :
642 akapila 1153 GIC 28 : if (opts.slot_name)
2161 peter_e 1154 GBC 3 : values[Anum_pg_subscription_subslotname - 1] =
642 akapila 1155 GIC 3 : DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
1156 : else
2161 peter_e 1157 25 : nulls[Anum_pg_subscription_subslotname - 1] = true;
2186 1158 28 : replaces[Anum_pg_subscription_subslotname - 1] = true;
1159 : }
1160 :
642 akapila 1161 66 : if (opts.synchronous_commit)
1162 : {
2186 peter_e 1163 3 : values[Anum_pg_subscription_subsynccommit - 1] =
642 akapila 1164 3 : CStringGetTextDatum(opts.synchronous_commit);
2186 peter_e 1165 3 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
2186 peter_e 1166 EUB : }
1167 :
642 akapila 1168 GBC 66 : if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1169 : {
995 tgl 1170 GIC 9 : values[Anum_pg_subscription_subbinary - 1] =
642 akapila 1171 9 : BoolGetDatum(opts.binary);
995 tgl 1172 GBC 9 : replaces[Anum_pg_subscription_subbinary - 1] = true;
1173 : }
995 tgl 1174 ECB :
642 akapila 1175 GIC 66 : if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
948 akapila 1176 ECB : {
948 akapila 1177 GIC 15 : values[Anum_pg_subscription_substream - 1] =
90 akapila 1178 GNC 15 : CharGetDatum(opts.streaming);
948 akapila 1179 CBC 15 : replaces[Anum_pg_subscription_substream - 1] = true;
948 akapila 1180 ECB : }
1181 :
391 akapila 1182 GIC 66 : if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1183 : {
1184 : values[Anum_pg_subscription_subdisableonerr - 1]
1185 3 : = BoolGetDatum(opts.disableonerr);
391 akapila 1186 ECB : replaces[Anum_pg_subscription_subdisableonerr - 1]
391 akapila 1187 GIC 3 : = true;
1188 : }
1189 :
10 rhaas 1190 GNC 66 : if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1191 : {
1192 : /* Non-superuser may not disable password_required. */
1193 6 : if (!opts.passwordrequired && !superuser())
10 rhaas 1194 UNC 0 : ereport(ERROR,
1195 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1196 : errmsg("password_required=false is superuser-only"),
1197 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1198 :
1199 : values[Anum_pg_subscription_subpasswordrequired - 1]
10 rhaas 1200 GNC 6 : = BoolGetDatum(opts.passwordrequired);
1201 : replaces[Anum_pg_subscription_subpasswordrequired - 1]
1202 6 : = true;
1203 : }
1204 :
262 akapila 1205 66 : if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1206 : {
1207 3 : values[Anum_pg_subscription_suborigin - 1] =
1208 3 : CStringGetTextDatum(opts.origin);
1209 3 : replaces[Anum_pg_subscription_suborigin - 1] = true;
1210 : }
1211 :
2208 peter_e 1212 GIC 66 : update_tuple = true;
1213 66 : break;
1214 : }
1215 :
1216 21 : case ALTER_SUBSCRIPTION_ENABLED:
1217 : {
633 dean.a.rasheed 1218 CBC 21 : parse_subscription_options(pstate, stmt->options,
1219 : SUBOPT_ENABLED, &opts);
642 akapila 1220 GIC 21 : Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1221 :
642 akapila 1222 CBC 21 : if (!sub->slotname && opts.enabled)
2161 peter_e 1223 GIC 3 : ereport(ERROR,
662 tgl 1224 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1225 : errmsg("cannot enable subscription that does not have a slot name")));
1226 :
2208 peter_e 1227 CBC 18 : values[Anum_pg_subscription_subenabled - 1] =
642 akapila 1228 GIC 18 : BoolGetDatum(opts.enabled);
2208 peter_e 1229 18 : replaces[Anum_pg_subscription_subenabled - 1] = true;
2208 peter_e 1230 ECB :
642 akapila 1231 CBC 18 : if (opts.enabled)
2175 peter_e 1232 GIC 10 : ApplyLauncherWakeupAtCommit();
1233 :
2208 1234 18 : update_tuple = true;
1235 18 : break;
2208 peter_e 1236 ECB : }
1237 :
2208 peter_e 1238 GIC 7 : case ALTER_SUBSCRIPTION_CONNECTION:
1239 : /* Load the library providing us libpq calls. */
2162 peter_e 1240 CBC 7 : load_file("libpqwalreceiver", false);
2162 peter_e 1241 EUB : /* Check the connection info string. */
10 rhaas 1242 GNC 7 : walrcv_check_conninfo(stmt->conninfo,
1243 : sub->passwordrequired && !superuser_arg(sub->owner));
1244 :
2208 peter_e 1245 CBC 4 : values[Anum_pg_subscription_subconninfo - 1] =
2208 peter_e 1246 GIC 4 : CStringGetTextDatum(stmt->conninfo);
1247 4 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
1248 4 : update_tuple = true;
1249 4 : break;
1250 :
733 peter 1251 CBC 24 : case ALTER_SUBSCRIPTION_SET_PUBLICATION:
2208 peter_e 1252 EUB : {
642 akapila 1253 GIC 24 : supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
633 dean.a.rasheed 1254 24 : parse_subscription_options(pstate, stmt->options,
1255 : supported_opts, &opts);
1256 :
2208 peter_e 1257 24 : values[Anum_pg_subscription_subpublications - 1] =
2153 bruce 1258 CBC 24 : publicationListToArray(stmt->publication);
2208 peter_e 1259 GIC 24 : replaces[Anum_pg_subscription_subpublications - 1] = true;
1260 :
2208 peter_e 1261 CBC 24 : update_tuple = true;
2208 peter_e 1262 ECB :
1263 : /* Refresh if user asked us to. */
642 akapila 1264 GIC 24 : if (opts.refresh)
2208 peter_e 1265 ECB : {
2161 peter_e 1266 GIC 21 : if (!sub->enabled)
2161 peter_e 1267 LBC 0 : ereport(ERROR,
1268 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2134 peter_e 1269 ECB : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1270 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1271 :
1272 : /*
1273 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1274 : * not allowed.
634 akapila 1275 : */
634 akapila 1276 GIC 21 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
634 akapila 1277 UIC 0 : ereport(ERROR,
173 alvherre 1278 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1279 : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1280 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1281 :
786 akapila 1282 GIC 21 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1283 :
1284 : /* Make sure refresh sees the new list of publications. */
2208 peter_e 1285 15 : sub->publications = stmt->publication;
1286 :
374 akapila 1287 CBC 15 : AlterSubscription_refresh(sub, opts.copy_data,
374 akapila 1288 EUB : stmt->publication);
1289 : }
1290 :
2208 peter_e 1291 GIC 18 : break;
1292 : }
2208 peter_e 1293 ECB :
733 peter 1294 CBC 27 : case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
733 peter 1295 ECB : case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
1296 : {
1297 : List *publist;
642 akapila 1298 CBC 27 : bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1299 :
593 akapila 1300 GIC 27 : supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
633 dean.a.rasheed 1301 CBC 27 : parse_subscription_options(pstate, stmt->options,
1302 : supported_opts, &opts);
653 peter 1303 ECB :
642 akapila 1304 CBC 27 : publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
733 peter 1305 9 : values[Anum_pg_subscription_subpublications - 1] =
733 peter 1306 GIC 9 : publicationListToArray(publist);
1307 9 : replaces[Anum_pg_subscription_subpublications - 1] = true;
733 peter 1308 ECB :
733 peter 1309 GIC 9 : update_tuple = true;
733 peter 1310 ECB :
1311 : /* Refresh if user asked us to. */
642 akapila 1312 CBC 9 : if (opts.refresh)
1313 : {
1314 : /* We only need to validate user specified publications. */
374 1315 3 : List *validate_publications = (isadd) ? stmt->publication : NULL;
1316 :
733 peter 1317 3 : if (!sub->enabled)
733 peter 1318 LBC 0 : ereport(ERROR,
662 tgl 1319 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1320 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1321 : /* translator: %s is an SQL ALTER command */
173 alvherre 1322 : errhint("Use %s instead.",
1323 : isadd ?
1324 : "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1325 : "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1326 :
634 akapila 1327 : /*
1328 : * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1329 : * not allowed.
1330 : */
634 akapila 1331 GIC 3 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
634 akapila 1332 UIC 0 : ereport(ERROR,
173 alvherre 1333 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
634 akapila 1334 EUB : errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1335 : /* translator: %s is an SQL ALTER command */
1336 : errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1337 : isadd ?
1338 : "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1339 : "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
634 akapila 1340 ECB :
733 peter 1341 GIC 3 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
733 peter 1342 ECB :
1343 : /* Refresh the new list of publications. */
593 akapila 1344 GIC 3 : sub->publications = publist;
733 peter 1345 ECB :
374 akapila 1346 GIC 3 : AlterSubscription_refresh(sub, opts.copy_data,
374 akapila 1347 ECB : validate_publications);
733 peter 1348 : }
1349 :
733 peter 1350 GIC 9 : break;
1351 : }
733 peter 1352 ECB :
2208 peter_e 1353 CBC 26 : case ALTER_SUBSCRIPTION_REFRESH:
1354 : {
2161 peter_e 1355 GIC 26 : if (!sub->enabled)
2161 peter_e 1356 CBC 3 : ereport(ERROR,
1357 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2161 peter_e 1358 ECB : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1359 :
633 dean.a.rasheed 1360 CBC 23 : parse_subscription_options(pstate, stmt->options,
1361 : SUBOPT_COPY_DATA, &opts);
2208 peter_e 1362 ECB :
634 akapila 1363 : /*
1364 : * The subscription option "two_phase" requires that
1365 : * replication has passed the initial table synchronization
1366 : * phase before the two_phase becomes properly enabled.
1367 : *
1368 : * But, having reached this two-phase commit "enabled" state
1369 : * we must not allow any subsequent table initialization to
1370 : * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1371 : * when the user had requested two_phase = on mode.
1372 : *
1373 : * The exception to this restriction is when copy_data =
1374 : * false, because when copy_data is false the tablesync will
1375 : * start already in READY state and will exit directly without
1376 : * doing anything.
1377 : *
1378 : * For more details see comments atop worker.c.
1379 : */
634 akapila 1380 CBC 23 : if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
634 akapila 1381 UIC 0 : ereport(ERROR,
634 akapila 1382 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
1383 : errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1384 : errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1385 :
786 akapila 1386 CBC 23 : PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
786 akapila 1387 ECB :
374 akapila 1388 CBC 20 : AlterSubscription_refresh(sub, opts.copy_data, NULL);
2208 peter_e 1389 ECB :
2208 peter_e 1390 GIC 20 : break;
2208 peter_e 1391 ECB : }
1392 :
383 akapila 1393 CBC 12 : case ALTER_SUBSCRIPTION_SKIP:
383 akapila 1394 ECB : {
383 akapila 1395 GIC 12 : parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1396 :
383 akapila 1397 ECB : /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
383 akapila 1398 CBC 9 : Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
383 akapila 1399 ECB :
1400 : /*
1401 : * If the user sets subskiplsn, we do a sanity check to make
383 akapila 1402 EUB : * sure that the specified LSN is a probable value.
1403 : */
383 akapila 1404 GIC 9 : if (!XLogRecPtrIsInvalid(opts.lsn))
1405 : {
1406 : RepOriginId originid;
1407 : char originname[NAMEDATALEN];
1408 : XLogRecPtr remote_lsn;
1409 :
180 akapila 1410 GNC 6 : ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1411 : originname, sizeof(originname));
383 akapila 1412 CBC 6 : originid = replorigin_by_name(originname, false);
383 akapila 1413 GBC 6 : remote_lsn = replorigin_get_progress(originid, false);
1414 :
1415 : /* Check the given LSN is at least a future LSN */
383 akapila 1416 GIC 6 : if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
383 akapila 1417 UIC 0 : ereport(ERROR,
383 akapila 1418 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1419 : errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1420 : LSN_FORMAT_ARGS(opts.lsn),
1421 : LSN_FORMAT_ARGS(remote_lsn))));
1422 : }
1423 :
383 akapila 1424 GIC 9 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1425 9 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1426 :
383 akapila 1427 CBC 9 : update_tuple = true;
383 akapila 1428 GIC 9 : break;
1429 : }
383 akapila 1430 ECB :
2208 peter_e 1431 UIC 0 : default:
1432 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1433 : stmt->kind);
2271 peter_e 1434 ECB : }
1435 :
2208 1436 : /* Update the catalog if needed. */
2208 peter_e 1437 CBC 144 : if (update_tuple)
1438 : {
2208 peter_e 1439 GIC 124 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
2208 peter_e 1440 ECB : replaces);
2271 1441 :
2208 peter_e 1442 CBC 124 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2271 peter_e 1443 ECB :
2208 peter_e 1444 GIC 124 : heap_freetuple(tup);
2208 peter_e 1445 ECB : }
1446 :
1539 andres 1447 GIC 144 : table_close(rel, RowExclusiveLock);
2271 peter_e 1448 ECB :
2208 peter_e 1449 GIC 144 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1450 :
2271 peter_e 1451 CBC 144 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1452 :
1453 : /* Wake up related replication workers to handle this change quickly. */
93 tgl 1454 GNC 144 : LogicalRepWorkersWakeupAtCommit(subid);
1455 :
2271 peter_e 1456 CBC 144 : return myself;
2271 peter_e 1457 EUB : }
1458 :
1459 : /*
1460 : * Drop a subscription
1461 : */
1462 : void
2228 peter_e 1463 GIC 78 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
1464 : {
1465 : Relation rel;
1466 : ObjectAddress myself;
1467 : HeapTuple tup;
1468 : Oid subid;
1469 : Oid subowner;
1470 : Datum datum;
2271 peter_e 1471 ECB : bool isnull;
2271 peter_e 1472 EUB : char *subname;
1473 : char *conninfo;
1474 : char *slotname;
1475 : List *subworkers;
1476 : ListCell *lc;
1477 : char originname[NAMEDATALEN];
2271 peter_e 1478 GIC 78 : char *err = NULL;
1479 : WalReceiverConn *wrconn;
1480 : Form_pg_subscription form;
786 akapila 1481 ECB : List *rstates;
1482 : bool must_use_password;
1483 :
1484 : /*
2153 bruce 1485 : * Lock pg_subscription with AccessExclusiveLock to ensure that the
1486 : * launcher doesn't restart new worker during dropping the subscription
2222 fujii 1487 : */
1539 andres 1488 GIC 78 : rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
1489 :
2271 peter_e 1490 78 : tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
2271 peter_e 1491 CBC 78 : CStringGetDatum(stmt->subname));
1492 :
2271 peter_e 1493 GIC 78 : if (!HeapTupleIsValid(tup))
2271 peter_e 1494 ECB : {
1539 andres 1495 GIC 6 : table_close(rel, NoLock);
2271 peter_e 1496 ECB :
2271 peter_e 1497 CBC 6 : if (!stmt->missing_ok)
2271 peter_e 1498 GIC 3 : ereport(ERROR,
1499 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1500 : errmsg("subscription \"%s\" does not exist",
2271 peter_e 1501 ECB : stmt->subname)));
1502 : else
2271 peter_e 1503 GIC 3 : ereport(NOTICE,
1504 : (errmsg("subscription \"%s\" does not exist, skipping",
1505 : stmt->subname)));
1506 :
1507 38 : return;
1508 : }
1509 :
1601 andres 1510 72 : form = (Form_pg_subscription) GETSTRUCT(tup);
1511 72 : subid = form->oid;
10 rhaas 1512 GNC 72 : subowner = form->subowner;
1513 72 : must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
1514 :
1515 : /* must be owner */
147 peter 1516 72 : if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1954 peter_e 1517 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2271 1518 0 : stmt->subname);
1519 :
1520 : /* DROP hook for the subscription being removed */
2271 peter_e 1521 GIC 72 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1522 :
2271 peter_e 1523 ECB : /*
2153 bruce 1524 EUB : * Lock the subscription so nobody else can do anything with it (including
1525 : * the replication workers).
1526 : */
2271 peter_e 1527 GIC 72 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1528 :
2271 peter_e 1529 ECB : /* Get subname */
15 dgustafsson 1530 GNC 72 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1531 : Anum_pg_subscription_subname);
2271 peter_e 1532 CBC 72 : subname = pstrdup(NameStr(*DatumGetName(datum)));
1533 :
1534 : /* Get conninfo */
15 dgustafsson 1535 GNC 72 : datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
1536 : Anum_pg_subscription_subconninfo);
2186 peter_e 1537 GIC 72 : conninfo = TextDatumGetCString(datum);
1538 :
2271 peter_e 1539 ECB : /* Get slotname */
2271 peter_e 1540 GIC 72 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1541 : Anum_pg_subscription_subslotname, &isnull);
2161 1542 72 : if (!isnull)
1543 37 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
1544 : else
2161 peter_e 1545 CBC 35 : slotname = NULL;
1546 :
1547 : /*
1548 : * Since dropping a replication slot is not transactional, the replication
1549 : * slot stays dropped even if the transaction rolls back. So we cannot
1550 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
495 akapila 1551 ECB : * replication slot. Also, in this case, we report a message for dropping
1552 : * the subscription to the cumulative stats system.
2161 peter_e 1553 : *
1554 : * XXX The command name should really be something like "DROP SUBSCRIPTION
1555 : * of a subscription that is associated with a replication slot", but we
1556 : * don't have the proper facilities for that.
1557 : */
2161 peter_e 1558 GBC 72 : if (slotname)
1878 peter_e 1559 GIC 37 : PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1560 :
2271 1561 69 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
1562 69 : EventTriggerSQLDropAddObject(&myself, true, true);
1563 :
1564 : /* Remove the tuple from catalog. */
2258 tgl 1565 CBC 69 : CatalogTupleDelete(rel, &tup->t_self);
2271 peter_e 1566 ECB :
2271 peter_e 1567 GIC 69 : ReleaseSysCache(tup);
2271 peter_e 1568 ECB :
2074 1569 : /*
1570 : * Stop all the subscription workers immediately.
1571 : *
2030 peter_e 1572 EUB : * This is necessary if we are dropping the replication slot, so that the
1573 : * slot becomes accessible.
1574 : *
1575 : * It is also necessary if the subscription is disabled and was disabled
1576 : * in the same transaction. Then the workers haven't seen the disabling
1577 : * yet and will still be running, leading to hangs later when we want to
2030 peter_e 1578 ECB : * drop the replication origin. If the subscription was disabled before
1579 : * this transaction, then there shouldn't be any workers left, so this
1580 : * won't make a difference.
1581 : *
1582 : * New workers won't be started because we hold an exclusive lock on the
2074 1583 : * subscription till the end of the transaction.
1584 : */
2074 peter_e 1585 CBC 69 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
2074 peter_e 1586 GIC 69 : subworkers = logicalrep_workers_find(subid, false);
1587 69 : LWLockRelease(LogicalRepWorkerLock);
2064 tgl 1588 CBC 106 : foreach(lc, subworkers)
1589 : {
2074 peter_e 1590 37 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
1591 :
2030 1592 37 : logicalrep_worker_stop(w->subid, w->relid);
1593 : }
2074 peter_e 1594 GIC 69 : list_free(subworkers);
2074 peter_e 1595 ECB :
1596 : /*
1597 : * Remove the no-longer-useful entry in the launcher's table of apply
1598 : * worker start times.
1599 : *
1600 : * If this transaction rolls back, the launcher might restart a failed
1601 : * apply worker before wal_retrieve_retry_interval milliseconds have
1602 : * elapsed, but that's pretty harmless.
1603 : */
77 tgl 1604 GNC 69 : ApplyLauncherForgetWorkerStartTime(subid);
1605 :
1606 : /*
786 akapila 1607 ECB : * Cleanup of tablesync replication origins.
1608 : *
1609 : * Any READY-state relations would already have dealt with clean-ups.
1610 : *
1611 : * Note that the state can't change because we have already stopped both
1612 : * the apply and tablesync workers and they can't restart because of
1613 : * exclusive lock on the subscription.
1614 : */
256 michael 1615 GNC 69 : rstates = GetSubscriptionRelations(subid, true);
786 akapila 1616 GIC 70 : foreach(lc, rstates)
1617 : {
1618 1 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
1619 1 : Oid relid = rstate->relid;
1620 :
1621 : /* Only cleanup resources of tablesync workers */
1622 1 : if (!OidIsValid(relid))
786 akapila 1623 UIC 0 : continue;
1624 :
1625 : /*
1626 : * Drop the tablesync's origin tracking if exists.
1627 : *
1628 : * It is possible that the origin is not yet created for tablesync
786 akapila 1629 ECB : * worker so passing missing_ok = true. This can happen for the states
1630 : * before SUBREL_STATE_FINISHEDCOPY.
1631 : */
180 akapila 1632 GNC 1 : ReplicationOriginNameForLogicalRep(subid, relid, originname,
1633 : sizeof(originname));
209 akapila 1634 GIC 1 : replorigin_drop_by_name(originname, true, false);
1635 : }
1636 :
1637 : /* Clean up dependencies */
2270 alvherre 1638 69 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2270 alvherre 1639 ECB :
1640 : /* Remove any associated relation synchronization states. */
2208 peter_e 1641 CBC 69 : RemoveSubscriptionRel(subid, InvalidOid);
2208 peter_e 1642 ECB :
1643 : /* Remove the origin tracking if exists. */
180 akapila 1644 GNC 69 : ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
788 akapila 1645 GIC 69 : replorigin_drop_by_name(originname, true, false);
2271 peter_e 1646 ECB :
1647 : /*
2153 bruce 1648 : * If there is no slot associated with the subscription, we can finish
1649 : * here.
1650 : */
786 akapila 1651 GIC 69 : if (!slotname && rstates == NIL)
1652 : {
1539 andres 1653 35 : table_close(rel, NoLock);
2271 peter_e 1654 CBC 35 : return;
1655 : }
1656 :
1657 : /*
786 akapila 1658 ECB : * Try to acquire the connection necessary for dropping slots.
1659 : *
1660 : * Note: If the slotname is NONE/NULL then we allow the command to finish
1661 : * and users need to manually cleanup the apply and tablesync worker slots
1662 : * later.
1663 : *
1664 : * This has to be at the end because otherwise if there is an error while
1665 : * doing the database operations we won't be able to rollback dropped
1666 : * slot.
2271 peter_e 1667 : */
2271 peter_e 1668 GBC 34 : load_file("libpqwalreceiver", false);
2271 peter_e 1669 EUB :
10 rhaas 1670 GNC 34 : wrconn = walrcv_connect(conninfo, true, must_use_password,
1671 : subname, &err);
2271 peter_e 1672 GIC 34 : if (wrconn == NULL)
786 akapila 1673 ECB : {
786 akapila 1674 UIC 0 : if (!slotname)
1675 : {
1676 : /* be tidy */
1677 0 : list_free(rstates);
1678 0 : table_close(rel, NoLock);
786 akapila 1679 LBC 0 : return;
1680 : }
1681 : else
786 akapila 1682 ECB : {
786 akapila 1683 UIC 0 : ReportSlotConnectionError(rstates, subid, slotname, err);
786 akapila 1684 ECB : }
1685 : }
1686 :
786 akapila 1687 CBC 34 : PG_TRY();
1688 : {
1689 35 : foreach(lc, rstates)
1690 : {
786 akapila 1691 GIC 1 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
786 akapila 1692 CBC 1 : Oid relid = rstate->relid;
1693 :
786 akapila 1694 ECB : /* Only cleanup resources of tablesync workers */
786 akapila 1695 CBC 1 : if (!OidIsValid(relid))
786 akapila 1696 UIC 0 : continue;
786 akapila 1697 ECB :
1698 : /*
1699 : * Drop the tablesync slots associated with removed tables.
1700 : *
1701 : * For SYNCDONE/READY states, the tablesync slot is known to have
1702 : * already been dropped by the tablesync worker.
1703 : *
1704 : * For other states, there is no certainty, maybe the slot does
1705 : * not exist yet. Also, if we fail after removing some of the
1706 : * slots, next time, it will again try to drop already dropped
1707 : * slots and fail. For these reasons, we allow missing_ok = true
1708 : * for the drop.
1709 : */
786 akapila 1710 CBC 1 : if (rstate->state != SUBREL_STATE_SYNCDONE)
786 akapila 1711 ECB : {
786 akapila 1712 GIC 1 : char syncslotname[NAMEDATALEN] = {0};
786 akapila 1713 ECB :
783 akapila 1714 CBC 1 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
1715 : sizeof(syncslotname));
786 akapila 1716 GIC 1 : ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
786 akapila 1717 ECB : }
1718 : }
1719 :
786 akapila 1720 GIC 34 : list_free(rstates);
1721 :
1722 : /*
1723 : * If there is a slot associated with the subscription, then drop the
1724 : * replication slot at the publisher.
1725 : */
1726 34 : if (slotname)
1727 34 : ReplicationSlotDropAtPubNode(wrconn, slotname, false);
1728 : }
786 akapila 1729 UIC 0 : PG_FINALLY();
1730 : {
786 akapila 1731 GIC 34 : walrcv_disconnect(wrconn);
1732 : }
1733 34 : PG_END_TRY();
1734 :
1735 : /*
1736 : * Tell the cumulative stats system that the subscription is getting
277 andres 1737 ECB : * dropped.
495 akapila 1738 : */
277 andres 1739 CBC 34 : pgstat_drop_subscription(subid);
495 akapila 1740 ECB :
786 akapila 1741 GIC 34 : table_close(rel, NoLock);
786 akapila 1742 ECB : }
1743 :
1744 : /*
1745 : * Drop the replication slot at the publisher node using the replication
1746 : * connection.
1747 : *
1748 : * missing_ok - if true then only issue a LOG message if the slot doesn't
1749 : * exist.
1750 : */
1751 : void
786 akapila 1752 GIC 189 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
1753 : {
1754 : StringInfoData cmd;
1755 :
786 akapila 1756 CBC 189 : Assert(wrconn);
1757 :
786 akapila 1758 GIC 189 : load_file("libpqwalreceiver", false);
1759 :
1760 189 : initStringInfo(&cmd);
1761 189 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1762 :
2223 fujii 1763 189 : PG_TRY();
1764 : {
1765 : WalRcvExecResult *res;
1766 :
2208 peter_e 1767 CBC 189 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2208 peter_e 1768 ECB :
786 akapila 1769 GIC 189 : if (res->status == WALRCV_OK_COMMAND)
786 akapila 1770 ECB : {
1771 : /* NOTICE. Success. */
786 akapila 1772 GIC 189 : ereport(NOTICE,
1773 : (errmsg("dropped replication slot \"%s\" on publisher",
786 akapila 1774 ECB : slotname)));
786 akapila 1775 EUB : }
786 akapila 1776 UIC 0 : else if (res->status == WALRCV_ERROR &&
1777 0 : missing_ok &&
1778 0 : res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1779 : {
1780 : /* LOG. Error, but missing_ok = true. */
1781 0 : ereport(LOG,
1782 : (errmsg("could not drop replication slot \"%s\" on publisher: %s",
1783 : slotname, res->err)));
786 akapila 1784 ECB : }
1785 : else
1786 : {
1787 : /* ERROR. */
786 akapila 1788 UIC 0 : ereport(ERROR,
1789 : (errcode(ERRCODE_CONNECTION_FAILURE),
662 tgl 1790 ECB : errmsg("could not drop replication slot \"%s\" on publisher: %s",
1791 : slotname, res->err)));
1792 : }
2208 peter_e 1793 :
2208 peter_e 1794 GIC 189 : walrcv_clear_result(res);
1795 : }
1255 peter 1796 LBC 0 : PG_FINALLY();
2237 fujii 1797 ECB : {
786 akapila 1798 GIC 189 : pfree(cmd.data);
1799 : }
2223 fujii 1800 189 : PG_END_TRY();
2271 peter_e 1801 189 : }
1802 :
2271 peter_e 1803 ECB : /*
1804 : * Internal workhorse for changing a subscription owner
1805 : */
1806 : static void
2271 peter_e 1807 GIC 6 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1808 : {
1809 : Form_pg_subscription form;
1810 : AclResult aclresult;
1811 :
1812 6 : form = (Form_pg_subscription) GETSTRUCT(tup);
1813 :
1814 6 : if (form->subowner == newOwnerId)
2271 peter_e 1815 UIC 0 : return;
1816 :
147 peter 1817 GNC 6 : if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
1954 peter_e 1818 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
2271 1819 0 : NameStr(form->subname));
1820 :
1821 : /*
1822 : * Don't allow non-superuser modification of a subscription with
1823 : * password_required=false.
1824 : */
10 rhaas 1825 GNC 6 : if (!form->subpasswordrequired && !superuser())
2271 peter_e 1826 LBC 0 : ereport(ERROR,
1827 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1828 : errmsg("password_required=false is superuser-only"),
1829 : errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1830 :
1831 : /* Must be able to become new owner */
10 rhaas 1832 GNC 6 : check_can_set_role(GetUserId(), newOwnerId);
1833 :
1834 : /*
1835 : * current owner must have CREATE on database
1836 : *
1837 : * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
1838 : * other object types behave differently (e.g. you can't give a table to
1839 : * a user who lacks CREATE privileges on a schema).
1840 : */
1841 3 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
1842 : GetUserId(), ACL_CREATE);
1843 3 : if (aclresult != ACLCHECK_OK)
10 rhaas 1844 UNC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
1845 0 : get_database_name(MyDatabaseId));
1846 :
2271 peter_e 1847 GIC 3 : form->subowner = newOwnerId;
2259 alvherre 1848 GBC 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2271 peter_e 1849 EUB :
1850 : /* Update owner dependency reference */
2271 peter_e 1851 GIC 3 : changeDependencyOnOwner(SubscriptionRelationId,
1852 : form->oid,
1853 : newOwnerId);
2271 peter_e 1854 EUB :
2271 peter_e 1855 GIC 3 : InvokeObjectPostAlterHook(SubscriptionRelationId,
1856 : form->oid, 0);
1857 :
1858 : /* Wake up related background processes to handle this change quickly. */
457 jdavis 1859 CBC 3 : ApplyLauncherWakeupAtCommit();
93 tgl 1860 GNC 3 : LogicalRepWorkersWakeupAtCommit(form->oid);
1861 : }
2271 peter_e 1862 ECB :
1863 : /*
1864 : * Change subscription owner -- by name
1865 : */
1866 : ObjectAddress
2271 peter_e 1867 GIC 6 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2271 peter_e 1868 ECB : {
2271 peter_e 1869 EUB : Oid subid;
1870 : HeapTuple tup;
1871 : Relation rel;
1872 : ObjectAddress address;
1873 : Form_pg_subscription form;
1874 :
1539 andres 1875 GIC 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1876 :
2271 peter_e 1877 6 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1878 : CStringGetDatum(name));
1879 :
1880 6 : if (!HeapTupleIsValid(tup))
2271 peter_e 1881 UIC 0 : ereport(ERROR,
1882 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2271 peter_e 1883 ECB : errmsg("subscription \"%s\" does not exist", name)));
1884 :
1601 andres 1885 CBC 6 : form = (Form_pg_subscription) GETSTRUCT(tup);
1601 andres 1886 GIC 6 : subid = form->oid;
2271 peter_e 1887 ECB :
2271 peter_e 1888 GIC 6 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2271 peter_e 1889 ECB :
2271 peter_e 1890 GIC 3 : ObjectAddressSet(address, SubscriptionRelationId, subid);
1891 :
1892 3 : heap_freetuple(tup);
2271 peter_e 1893 ECB :
1539 andres 1894 GIC 3 : table_close(rel, RowExclusiveLock);
1895 :
2271 peter_e 1896 3 : return address;
1897 : }
1898 :
2271 peter_e 1899 ECB : /*
1900 : * Change subscription owner -- by OID
1901 : */
2271 peter_e 1902 EUB : void
2271 peter_e 1903 UIC 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
2271 peter_e 1904 ECB : {
1905 : HeapTuple tup;
1906 : Relation rel;
1907 :
1539 andres 1908 UIC 0 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1909 :
2271 peter_e 1910 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1911 :
2271 peter_e 1912 LBC 0 : if (!HeapTupleIsValid(tup))
2271 peter_e 1913 UIC 0 : ereport(ERROR,
2271 peter_e 1914 ECB : (errcode(ERRCODE_UNDEFINED_OBJECT),
1915 : errmsg("subscription with OID %u does not exist", subid)));
1916 :
2271 peter_e 1917 UIC 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1918 :
1919 0 : heap_freetuple(tup);
1920 :
1539 andres 1921 0 : table_close(rel, RowExclusiveLock);
2271 peter_e 1922 0 : }
1923 :
1924 : /*
1925 : * Check and log a warning if the publisher has subscribed to the same table
1926 : * from some other publisher. This check is required only if "copy_data = true"
1927 : * and "origin = none" for CREATE SUBSCRIPTION and
1928 : * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
1929 : * having origin might have been copied.
1930 : *
1931 : * This check need not be performed on the tables that are already added
1932 : * because incremental sync for those tables will happen through WAL and the
1933 : * origin of the data can be identified from the WAL records.
1934 : *
1935 : * subrel_local_oids contains the list of relation oids that are already
1936 : * present on the subscriber.
1937 : */
1938 : static void
213 akapila 1939 GNC 109 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
1940 : bool copydata, char *origin, Oid *subrel_local_oids,
1941 : int subrel_count, char *subname)
1942 : {
1943 : WalRcvExecResult *res;
1944 : StringInfoData cmd;
1945 : TupleTableSlot *slot;
1946 109 : Oid tableRow[1] = {TEXTOID};
1947 109 : List *publist = NIL;
1948 : int i;
1949 :
1950 211 : if (!copydata || !origin ||
1951 102 : (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
1952 103 : return;
1953 :
1954 6 : initStringInfo(&cmd);
1955 6 : appendStringInfoString(&cmd,
1956 : "SELECT DISTINCT P.pubname AS pubname\n"
1957 : "FROM pg_publication P,\n"
1958 : " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1959 : " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1960 : " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1961 : "WHERE C.oid = GPT.relid AND P.pubname IN (");
1962 6 : get_publications_str(publications, &cmd, true);
1963 6 : appendStringInfoString(&cmd, ")\n");
1964 :
1965 : /*
1966 : * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
1967 : * the list of relation oids that are already present on the subscriber.
1968 : * This check should be skipped for these tables.
1969 : */
1970 9 : for (i = 0; i < subrel_count; i++)
1971 : {
1972 3 : Oid relid = subrel_local_oids[i];
1973 3 : char *schemaname = get_namespace_name(get_rel_namespace(relid));
1974 3 : char *tablename = get_rel_name(relid);
1975 :
1976 3 : appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
1977 : schemaname, tablename);
1978 : }
1979 :
1980 6 : res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
1981 6 : pfree(cmd.data);
1982 :
1983 6 : if (res->status != WALRCV_OK_TUPLES)
213 akapila 1984 UNC 0 : ereport(ERROR,
1985 : (errcode(ERRCODE_CONNECTION_FAILURE),
1986 : errmsg("could not receive list of replicated tables from the publisher: %s",
1987 : res->err)));
1988 :
1989 : /* Process tables. */
213 akapila 1990 GNC 6 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1991 8 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1992 : {
1993 : char *pubname;
1994 : bool isnull;
1995 :
1996 2 : pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1997 2 : Assert(!isnull);
1998 :
1999 2 : ExecClearTuple(slot);
2000 2 : publist = list_append_unique(publist, makeString(pubname));
2001 : }
2002 :
2003 : /*
2004 : * Log a warning if the publisher has subscribed to the same table from
2005 : * some other publisher. We cannot know the origin of data during the
2006 : * initial sync. Data origins can be found only from the WAL by looking at
2007 : * the origin id.
2008 : *
2009 : * XXX: For simplicity, we don't check whether the table has any data or
2010 : * not. If the table doesn't have any data then we don't need to
2011 : * distinguish between data having origin and data not having origin so we
2012 : * can avoid logging a warning in that case.
2013 : */
2014 6 : if (publist)
2015 : {
2016 2 : StringInfo pubnames = makeStringInfo();
2017 :
2018 : /* Prepare the list of publication(s) for warning message. */
2019 2 : get_publications_str(publist, pubnames, false);
2020 2 : ereport(WARNING,
2021 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2022 : errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2023 : subname),
2024 : errdetail_plural("Subscribed publication %s is subscribing to other publications.",
2025 : "Subscribed publications %s are subscribing to other publications.",
2026 : list_length(publist), pubnames->data),
2027 : errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2028 : }
2029 :
2030 6 : ExecDropSingleTupleTableSlot(slot);
2031 :
2032 6 : walrcv_clear_result(res);
2033 : }
2034 :
2035 : /*
2208 peter_e 2036 ECB : * Get the list of tables which belong to specified publications on the
2037 : * publisher connection.
2038 : *
2039 : * Note that we don't support the case where the column list is different for
311 akapila 2040 : * the same table in different publications to avoid sending unwanted column
2041 : * information for some of the rows. This can happen when both the column
2042 : * list and row filter are specified for different publications.
2043 : */
2208 peter_e 2044 : static List *
2208 peter_e 2045 CBC 109 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
2046 : {
2153 bruce 2047 ECB : WalRcvExecResult *res;
2048 : StringInfoData cmd;
2049 : TupleTableSlot *slot;
11 akapila 2050 GNC 109 : Oid tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
2153 bruce 2051 CBC 109 : List *tablelist = NIL;
11 akapila 2052 GNC 109 : int server_version = walrcv_server_version(wrconn);
2053 109 : bool check_columnlist = (server_version >= 150000);
2208 peter_e 2054 ECB :
2208 peter_e 2055 GIC 109 : initStringInfo(&cmd);
311 akapila 2056 ECB :
2057 : /* Get the list of tables from the publisher. */
11 akapila 2058 GNC 109 : if (server_version >= 160000)
2059 : {
2060 : StringInfoData pub_names;
311 akapila 2061 EUB :
11 akapila 2062 GNC 109 : tableRow[2] = INT2VECTOROID;
2063 109 : initStringInfo(&pub_names);
2064 109 : get_publications_str(publications, &pub_names, true);
2065 :
2066 : /*
2067 : * From version 16, we allowed passing multiple publications to the
2068 : * function pg_get_publication_tables. This helped to filter out the
2069 : * partition table whose ancestor is also published in this
2070 : * publication array.
2071 : *
2072 : * Join pg_get_publication_tables with pg_publication to exclude
2073 : * non-existing publications.
2074 : *
2075 : * Note that attrs are always stored in sorted order so we don't need
2076 : * to worry if different publications have specified them in a
2077 : * different order. See publication_translate_columns.
2078 : */
2079 109 : appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
2080 : " FROM pg_class c\n"
2081 : " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2082 : " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2083 : " FROM pg_publication\n"
2084 : " WHERE pubname IN ( %s )) AS gpt\n"
2085 : " ON gpt.relid = c.oid\n",
2086 : pub_names.data);
2087 :
2088 109 : pfree(pub_names.data);
2089 : }
2090 : else
2091 : {
11 akapila 2092 UNC 0 : tableRow[2] = NAMEARRAYOID;
2093 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
2094 :
2095 : /* Get column lists for each relation if the publisher supports it */
2096 0 : if (check_columnlist)
2097 0 : appendStringInfoString(&cmd, ", t.attnames\n");
2098 :
2099 0 : appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2100 : " WHERE t.pubname IN (");
2101 0 : get_publications_str(publications, &cmd, true);
2102 0 : appendStringInfoChar(&cmd, ')');
2103 : }
2208 peter_e 2104 EUB :
311 akapila 2105 GIC 109 : res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
2208 peter_e 2106 109 : pfree(cmd.data);
2107 :
2108 109 : if (res->status != WALRCV_OK_TUPLES)
2208 peter_e 2109 UIC 0 : ereport(ERROR,
2110 : (errcode(ERRCODE_CONNECTION_FAILURE),
662 tgl 2111 EUB : errmsg("could not receive list of replicated tables from the publisher: %s",
2112 : res->err)));
2113 :
2114 : /* Process tables. */
1606 andres 2115 GIC 109 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
2208 peter_e 2116 330 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2208 peter_e 2117 ECB : {
2118 : char *nspname;
2208 peter_e 2119 EUB : char *relname;
2120 : bool isnull;
2208 peter_e 2121 ECB : RangeVar *rv;
2122 :
2208 peter_e 2123 CBC 222 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2124 222 : Assert(!isnull);
2208 peter_e 2125 GIC 222 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2126 222 : Assert(!isnull);
2127 :
813 akapila 2128 222 : rv = makeRangeVar(nspname, relname, -1);
2129 :
311 akapila 2130 CBC 222 : if (check_columnlist && list_member(tablelist, rv))
311 akapila 2131 GIC 1 : ereport(ERROR,
2132 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2133 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2134 : nspname, relname));
311 akapila 2135 ECB : else
311 akapila 2136 GIC 221 : tablelist = lappend(tablelist, rv);
2208 peter_e 2137 ECB :
2208 peter_e 2138 GBC 221 : ExecClearTuple(slot);
2139 : }
2208 peter_e 2140 CBC 108 : ExecDropSingleTupleTableSlot(slot);
2208 peter_e 2141 EUB :
2208 peter_e 2142 GBC 108 : walrcv_clear_result(res);
2143 :
2208 peter_e 2144 GIC 108 : return tablelist;
2145 : }
2146 :
2147 : /*
786 akapila 2148 ECB : * This is to report the connection failure while dropping replication slots.
786 akapila 2149 EUB : * Here, we report the WARNING for all tablesync slots so that user can drop
2150 : * them manually, if required.
2151 : */
2152 : static void
786 akapila 2153 UIC 0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2154 : {
786 akapila 2155 ECB : ListCell *lc;
2156 :
786 akapila 2157 UIC 0 : foreach(lc, rstates)
2158 : {
2159 0 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
2160 0 : Oid relid = rstate->relid;
2161 :
2162 : /* Only cleanup resources of tablesync workers */
2163 0 : if (!OidIsValid(relid))
786 akapila 2164 LBC 0 : continue;
2165 :
786 akapila 2166 ECB : /*
786 akapila 2167 EUB : * Caller needs to ensure that relstate doesn't change underneath us.
2168 : * See DropSubscription where we get the relstates.
2169 : */
786 akapila 2170 LBC 0 : if (rstate->state != SUBREL_STATE_SYNCDONE)
786 akapila 2171 ECB : {
786 akapila 2172 UIC 0 : char syncslotname[NAMEDATALEN] = {0};
2173 :
783 akapila 2174 LBC 0 : ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2175 : sizeof(syncslotname));
786 akapila 2176 UIC 0 : elog(WARNING, "could not drop tablesync replication slot \"%s\"",
2177 : syncslotname);
786 akapila 2178 ECB : }
2179 : }
2180 :
786 akapila 2181 UIC 0 : ereport(ERROR,
662 tgl 2182 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
173 alvherre 2183 : errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2184 : slotname, err),
2185 : /* translator: %s is an SQL ALTER command */
2186 : errhint("Use %s to disassociate the subscription from the slot.",
2187 : "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2188 : }
2189 :
733 peter 2190 : /*
2191 : * Check for duplicates in the given list of publications and error out if
2192 : * found one. Add publications to datums as text datums, if datums is not
2193 : * NULL.
2194 : */
2195 : static void
733 peter 2196 GIC 174 : check_duplicates_in_publist(List *publist, Datum *datums)
2197 : {
733 peter 2198 ECB : ListCell *cell;
733 peter 2199 GIC 174 : int j = 0;
733 peter 2200 ECB :
733 peter 2201 GIC 409 : foreach(cell, publist)
2202 : {
733 peter 2203 CBC 244 : char *name = strVal(lfirst(cell));
733 peter 2204 EUB : ListCell *pcell;
2205 :
733 peter 2206 GIC 380 : foreach(pcell, publist)
2207 : {
733 peter 2208 CBC 380 : char *pname = strVal(lfirst(pcell));
733 peter 2209 ECB :
733 peter 2210 GIC 380 : if (pcell == cell)
733 peter 2211 CBC 235 : break;
2212 :
2213 145 : if (strcmp(name, pname) == 0)
733 peter 2214 GIC 9 : ereport(ERROR,
662 tgl 2215 ECB : (errcode(ERRCODE_DUPLICATE_OBJECT),
2216 : errmsg("publication name \"%s\" used more than once",
733 peter 2217 : pname)));
2218 : }
2219 :
733 peter 2220 GIC 235 : if (datums)
2221 192 : datums[j++] = CStringGetTextDatum(name);
2222 : }
2223 165 : }
2224 :
2225 : /*
733 peter 2226 EUB : * Merge current subscription's publications and user-specified publications
2227 : * from ADD/DROP PUBLICATIONS.
2228 : *
2229 : * If addpub is true, we will add the list of publications into oldpublist.
2230 : * Otherwise, we will delete the list of publications from oldpublist. The
2231 : * returned list is a copy, oldpublist itself is not changed.
2232 : *
2233 : * subname is the subscription name, for error messages.
2234 : */
2235 : static List *
733 peter 2236 GBC 27 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
2237 : {
2238 : ListCell *lc;
2239 :
2240 27 : oldpublist = list_copy(oldpublist);
2241 :
2242 27 : check_duplicates_in_publist(newpublist, NULL);
2243 :
2244 46 : foreach(lc, newpublist)
733 peter 2245 EUB : {
733 peter 2246 GIC 34 : char *name = strVal(lfirst(lc));
2247 : ListCell *lc2;
2248 34 : bool found = false;
2249 :
2250 67 : foreach(lc2, oldpublist)
2251 : {
2252 55 : char *pubname = strVal(lfirst(lc2));
2253 :
2254 55 : if (strcmp(name, pubname) == 0)
2255 : {
2256 22 : found = true;
2257 22 : if (addpub)
2258 6 : ereport(ERROR,
2259 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2260 : errmsg("publication \"%s\" is already in subscription \"%s\"",
2261 : name, subname)));
733 peter 2262 ECB : else
733 peter 2263 GIC 16 : oldpublist = foreach_delete_current(oldpublist, lc2);
2264 :
2265 16 : break;
2266 : }
2267 : }
2268 :
733 peter 2269 CBC 28 : if (addpub && !found)
2270 9 : oldpublist = lappend(oldpublist, makeString(name));
733 peter 2271 GIC 19 : else if (!addpub && !found)
2272 3 : ereport(ERROR,
662 tgl 2273 ECB : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
733 peter 2274 : errmsg("publication \"%s\" is not in subscription \"%s\"",
2275 : name, subname)));
2276 : }
2277 :
2278 : /*
2279 : * XXX Probably no strong reason for this, but for now it's to make ALTER
2280 : * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
2281 : */
733 peter 2282 GIC 12 : if (!oldpublist)
2283 3 : ereport(ERROR,
2284 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
653 peter 2285 ECB : errmsg("cannot drop all the publications from a subscription")));
733 2286 :
733 peter 2287 GIC 9 : return oldpublist;
2288 : }
2289 :
2290 : /*
2291 : * Extract the streaming mode value from a DefElem. This is like
2292 : * defGetBoolean() but also accepts the special value of "parallel".
2293 : */
2294 : char
90 akapila 2295 GNC 65 : defGetStreamingMode(DefElem *def)
2296 : {
2297 : /*
2298 : * If no parameter value given, assume "true" is meant.
2299 : */
2300 65 : if (!def->arg)
90 akapila 2301 UNC 0 : return LOGICALREP_STREAM_ON;
2302 :
2303 : /*
2304 : * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2305 : */
90 akapila 2306 GNC 65 : switch (nodeTag(def->arg))
2307 : {
90 akapila 2308 UNC 0 : case T_Integer:
2309 0 : switch (intVal(def->arg))
2310 : {
2311 0 : case 0:
2312 0 : return LOGICALREP_STREAM_OFF;
2313 0 : case 1:
2314 0 : return LOGICALREP_STREAM_ON;
2315 0 : default:
2316 : /* otherwise, error out below */
2317 0 : break;
2318 : }
2319 0 : break;
90 akapila 2320 GNC 65 : default:
2321 : {
2322 65 : char *sval = defGetString(def);
2323 :
2324 : /*
2325 : * The set of strings accepted here should match up with the
2326 : * grammar's opt_boolean_or_string production.
2327 : */
2328 127 : if (pg_strcasecmp(sval, "false") == 0 ||
2329 62 : pg_strcasecmp(sval, "off") == 0)
2330 3 : return LOGICALREP_STREAM_OFF;
2331 115 : if (pg_strcasecmp(sval, "true") == 0 ||
2332 53 : pg_strcasecmp(sval, "on") == 0)
2333 44 : return LOGICALREP_STREAM_ON;
2334 18 : if (pg_strcasecmp(sval, "parallel") == 0)
2335 15 : return LOGICALREP_STREAM_PARALLEL;
2336 : }
2337 3 : break;
2338 : }
2339 :
2340 3 : ereport(ERROR,
2341 : (errcode(ERRCODE_SYNTAX_ERROR),
2342 : errmsg("%s requires a Boolean value or \"parallel\"",
2343 : def->defname)));
2344 : return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2345 : }
|