LCOV - differential code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Coverage Total Hit UNC LBC UIC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 87.6 % 796 697 154 13 60 27 350 139 181 37 394 5 11
Current Date: 2023-04-08 15:15:32 Functions: 88.9 % 18 16 2 14 2 2 16
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * subscriptioncmds.c
       4                 :  *      subscription catalog manipulation functions
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * IDENTIFICATION
      10                 :  *      src/backend/commands/subscriptioncmds.c
      11                 :  *
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include "access/htup_details.h"
      18                 : #include "access/table.h"
      19                 : #include "access/xact.h"
      20                 : #include "catalog/catalog.h"
      21                 : #include "catalog/dependency.h"
      22                 : #include "catalog/indexing.h"
      23                 : #include "catalog/namespace.h"
      24                 : #include "catalog/objectaccess.h"
      25                 : #include "catalog/objectaddress.h"
      26                 : #include "catalog/pg_authid_d.h"
      27                 : #include "catalog/pg_database_d.h"
      28                 : #include "catalog/pg_subscription.h"
      29                 : #include "catalog/pg_subscription_rel.h"
      30                 : #include "catalog/pg_type.h"
      31                 : #include "commands/dbcommands.h"
      32                 : #include "commands/defrem.h"
      33                 : #include "commands/event_trigger.h"
      34                 : #include "commands/subscriptioncmds.h"
      35                 : #include "executor/executor.h"
      36                 : #include "miscadmin.h"
      37                 : #include "nodes/makefuncs.h"
      38                 : #include "pgstat.h"
      39                 : #include "replication/logicallauncher.h"
      40                 : #include "replication/logicalworker.h"
      41                 : #include "replication/origin.h"
      42                 : #include "replication/slot.h"
      43                 : #include "replication/walreceiver.h"
      44                 : #include "replication/walsender.h"
      45                 : #include "replication/worker_internal.h"
      46                 : #include "storage/lmgr.h"
      47                 : #include "utils/acl.h"
      48                 : #include "utils/builtins.h"
      49                 : #include "utils/guc.h"
      50                 : #include "utils/lsyscache.h"
      51                 : #include "utils/memutils.h"
      52                 : #include "utils/pg_lsn.h"
      53                 : #include "utils/syscache.h"
      54                 : 
      55                 : /*
      56                 :  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
      57                 :  * command.
      58                 :  */
      59                 : #define SUBOPT_CONNECT              0x00000001
      60                 : #define SUBOPT_ENABLED              0x00000002
      61                 : #define SUBOPT_CREATE_SLOT          0x00000004
      62                 : #define SUBOPT_SLOT_NAME            0x00000008
      63                 : #define SUBOPT_COPY_DATA            0x00000010
      64                 : #define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
      65                 : #define SUBOPT_REFRESH              0x00000040
      66                 : #define SUBOPT_BINARY               0x00000080
      67                 : #define SUBOPT_STREAMING            0x00000100
      68                 : #define SUBOPT_TWOPHASE_COMMIT      0x00000200
      69                 : #define SUBOPT_DISABLE_ON_ERR       0x00000400
      70                 : #define SUBOPT_PASSWORD_REQUIRED    0x00000800
      71                 : #define SUBOPT_RUN_AS_OWNER         0x00001000
      72                 : #define SUBOPT_LSN                  0x00002000
      73                 : #define SUBOPT_ORIGIN               0x00004000
      74                 : 
      75                 : /* check if the 'val' has 'bits' set */
      76                 : #define IsSet(val, bits)  (((val) & (bits)) == (bits))
      77                 : 
      78                 : /*
      79                 :  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
      80                 :  * SUBSCRIPTION command options and the parsed/default values of each of them.
      81                 :  */
      82                 : typedef struct SubOpts
      83                 : {
      84                 :     bits32      specified_opts;
      85                 :     char       *slot_name;
      86                 :     char       *synchronous_commit;
      87                 :     bool        connect;
      88                 :     bool        enabled;
      89                 :     bool        create_slot;
      90                 :     bool        copy_data;
      91                 :     bool        refresh;
      92                 :     bool        binary;
      93                 :     char        streaming;
      94                 :     bool        twophase;
      95                 :     bool        disableonerr;
      96                 :     bool        passwordrequired;
      97                 :     bool        runasowner;
      98                 :     char       *origin;
      99                 :     XLogRecPtr  lsn;
     100                 : } SubOpts;
     101                 : 
     102                 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
     103                 : static void check_publications_origin(WalReceiverConn *wrconn,
     104                 :                                       List *publications, bool copydata,
     105                 :                                       char *origin, Oid *subrel_local_oids,
     106                 :                                       int subrel_count, char *subname);
     107                 : static void check_duplicates_in_publist(List *publist, Datum *datums);
     108                 : static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
     109                 : static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
     110                 : 
     111                 : 
     112                 : /*
     113                 :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
     114                 :  *
     115                 :  * Since not all options can be specified in both commands, this function
     116                 :  * will report an error if mutually exclusive options are specified.
     117                 :  */
     118                 : static void
     119 GIC         362 : parse_subscription_options(ParseState *pstate, List *stmt_options,
     120                 :                            bits32 supported_opts, SubOpts *opts)
     121                 : {
     122                 :     ListCell   *lc;
     123                 : 
     124                 :     /* Start out with cleared opts. */
     125             362 :     memset(opts, 0, sizeof(SubOpts));
     126                 : 
     127                 :     /* caller must expect some option */
     128             362 :     Assert(supported_opts != 0);
     129                 : 
     130                 :     /* If connect option is supported, these others also need to be. */
     131             362 :     Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
     132                 :            IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     133 ECB             :                  SUBOPT_COPY_DATA));
     134                 : 
     135                 :     /* Set default values for the supported options. */
     136 GIC         362 :     if (IsSet(supported_opts, SUBOPT_CONNECT))
     137             177 :         opts->connect = true;
     138             362 :     if (IsSet(supported_opts, SUBOPT_ENABLED))
     139 CBC         198 :         opts->enabled = true;
     140 GIC         362 :     if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
     141             177 :         opts->create_slot = true;
     142 CBC         362 :     if (IsSet(supported_opts, SUBOPT_COPY_DATA))
     143 GIC         251 :         opts->copy_data = true;
     144             362 :     if (IsSet(supported_opts, SUBOPT_REFRESH))
     145 CBC          51 :         opts->refresh = true;
     146 GIC         362 :     if (IsSet(supported_opts, SUBOPT_BINARY))
     147             255 :         opts->binary = false;
     148             362 :     if (IsSet(supported_opts, SUBOPT_STREAMING))
     149 GNC         255 :         opts->streaming = LOGICALREP_STREAM_OFF;
     150 CBC         362 :     if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     151             177 :         opts->twophase = false;
     152             362 :     if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
     153             255 :         opts->disableonerr = false;
     154 GNC         362 :     if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
     155             255 :         opts->passwordrequired = true;
     156             362 :     if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
     157             255 :         opts->runasowner = false;
     158             362 :     if (IsSet(supported_opts, SUBOPT_ORIGIN))
     159             255 :         opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
     160 ECB             : 
     161                 :     /* Parse options */
     162 CBC         693 :     foreach(lc, stmt_options)
     163 ECB             :     {
     164 CBC         361 :         DefElem    *defel = (DefElem *) lfirst(lc);
     165 ECB             : 
     166 CBC         361 :         if (IsSet(supported_opts, SUBOPT_CONNECT) &&
     167             213 :             strcmp(defel->defname, "connect") == 0)
     168 ECB             :         {
     169 CBC          79 :             if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
     170 LBC           0 :                 errorConflictingDefElem(defel, pstate);
     171 ECB             : 
     172 CBC          79 :             opts->specified_opts |= SUBOPT_CONNECT;
     173              79 :             opts->connect = defGetBoolean(defel);
     174 ECB             :         }
     175 CBC         282 :         else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
     176             155 :                  strcmp(defel->defname, "enabled") == 0)
     177 ECB             :         {
     178 CBC          33 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     179 LBC           0 :                 errorConflictingDefElem(defel, pstate);
     180                 : 
     181 GIC          33 :             opts->specified_opts |= SUBOPT_ENABLED;
     182 CBC          33 :             opts->enabled = defGetBoolean(defel);
     183                 :         }
     184             249 :         else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
     185 GIC         122 :                  strcmp(defel->defname, "create_slot") == 0)
     186 ECB             :         {
     187 CBC          16 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     188 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     189 ECB             : 
     190 GBC          16 :             opts->specified_opts |= SUBOPT_CREATE_SLOT;
     191 GIC          16 :             opts->create_slot = defGetBoolean(defel);
     192 ECB             :         }
     193 CBC         233 :         else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
     194 GIC         185 :                  strcmp(defel->defname, "slot_name") == 0)
     195 ECB             :         {
     196 CBC          62 :             if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     197 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     198 ECB             : 
     199 GBC          62 :             opts->specified_opts |= SUBOPT_SLOT_NAME;
     200 GIC          62 :             opts->slot_name = defGetString(defel);
     201 ECB             : 
     202                 :             /* Setting slot_name = NONE is treated as no slot name. */
     203 GIC          62 :             if (strcmp(opts->slot_name, "none") == 0)
     204 CBC          53 :                 opts->slot_name = NULL;
     205 ECB             :             else
     206 GIC           9 :                 ReplicationSlotValidateName(opts->slot_name, ERROR);
     207 ECB             :         }
     208 GBC         171 :         else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
     209 GIC         111 :                  strcmp(defel->defname, "copy_data") == 0)
     210 ECB             :         {
     211 CBC          15 :             if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     212 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     213 ECB             : 
     214 CBC          15 :             opts->specified_opts |= SUBOPT_COPY_DATA;
     215 GIC          15 :             opts->copy_data = defGetBoolean(defel);
     216 ECB             :         }
     217 GBC         156 :         else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
     218 GIC         111 :                  strcmp(defel->defname, "synchronous_commit") == 0)
     219 ECB             :         {
     220 CBC           6 :             if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
     221 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     222                 : 
     223 CBC           6 :             opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
     224               6 :             opts->synchronous_commit = defGetString(defel);
     225                 : 
     226 ECB             :             /* Test if the given value is valid for synchronous_commit GUC. */
     227 GIC           6 :             (void) set_config_option("synchronous_commit", opts->synchronous_commit,
     228 ECB             :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     229                 :                                      false, 0, false);
     230                 :         }
     231 CBC         150 :         else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
     232 GBC          33 :                  strcmp(defel->defname, "refresh") == 0)
     233                 :         {
     234 CBC          33 :             if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
     235 LBC           0 :                 errorConflictingDefElem(defel, pstate);
     236                 : 
     237 CBC          33 :             opts->specified_opts |= SUBOPT_REFRESH;
     238              33 :             opts->refresh = defGetBoolean(defel);
     239                 :         }
     240             117 :         else if (IsSet(supported_opts, SUBOPT_BINARY) &&
     241 GBC         105 :                  strcmp(defel->defname, "binary") == 0)
     242                 :         {
     243 CBC          16 :             if (IsSet(opts->specified_opts, SUBOPT_BINARY))
     244 LBC           0 :                 errorConflictingDefElem(defel, pstate);
     245                 : 
     246 GIC          16 :             opts->specified_opts |= SUBOPT_BINARY;
     247 CBC          16 :             opts->binary = defGetBoolean(defel);
     248                 :         }
     249 GIC         101 :         else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
     250              89 :                  strcmp(defel->defname, "streaming") == 0)
     251 ECB             :         {
     252 CBC          31 :             if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
     253 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     254 ECB             : 
     255 GBC          31 :             opts->specified_opts |= SUBOPT_STREAMING;
     256 GNC          31 :             opts->streaming = defGetStreamingMode(defel);
     257 ECB             :         }
     258 CBC          70 :         else if (strcmp(defel->defname, "two_phase") == 0)
     259                 :         {
     260 ECB             :             /*
     261                 :              * Do not allow toggling of two_phase option. Doing so could cause
     262                 :              * missing of transactions and lead to an inconsistent replica.
     263                 :              * See comments atop worker.c
     264 EUB             :              *
     265                 :              * Note: Unsupported twophase indicates that this call originated
     266 ECB             :              * from AlterSubscription.
     267                 :              */
     268 GIC          18 :             if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
     269 CBC           3 :                 ereport(ERROR,
     270 ECB             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     271                 :                          errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     272                 : 
     273 GBC          15 :             if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
     274 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     275 ECB             : 
     276 CBC          15 :             opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
     277 GIC          15 :             opts->twophase = defGetBoolean(defel);
     278 ECB             :         }
     279 GIC          52 :         else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
     280              40 :                  strcmp(defel->defname, "disable_on_error") == 0)
     281                 :         {
     282              10 :             if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
     283 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     284                 : 
     285 GIC          10 :             opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
     286              10 :             opts->disableonerr = defGetBoolean(defel);
     287                 :         }
     288 GNC          42 :         else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
     289              30 :                  strcmp(defel->defname, "password_required") == 0)
     290                 :         {
     291              11 :             if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
     292 UNC           0 :                 errorConflictingDefElem(defel, pstate);
     293                 : 
     294 GNC          11 :             opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
     295              11 :             opts->passwordrequired = defGetBoolean(defel);
     296                 :         }
     297              31 :         else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
     298              19 :                  strcmp(defel->defname, "run_as_owner") == 0)
     299                 :         {
     300               1 :             if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
     301 UNC           0 :                 errorConflictingDefElem(defel, pstate);
     302                 : 
     303 GNC           1 :             opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
     304               1 :             opts->runasowner = defGetBoolean(defel);
     305                 :         }
     306              30 :         else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
     307              18 :                  strcmp(defel->defname, "origin") == 0)
     308                 :         {
     309              15 :             if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
     310 UNC           0 :                 errorConflictingDefElem(defel, pstate);
     311                 : 
     312 GNC          15 :             opts->specified_opts |= SUBOPT_ORIGIN;
     313              15 :             pfree(opts->origin);
     314                 : 
     315                 :             /*
     316                 :              * Even though the "origin" parameter allows only "none" and "any"
     317                 :              * values, it is implemented as a string type so that the
     318                 :              * parameter can be extended in future versions to support
     319                 :              * filtering using origin names specified by the user.
     320                 :              */
     321              15 :             opts->origin = defGetString(defel);
     322                 : 
     323              22 :             if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
     324               7 :                 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
     325               3 :                 ereport(ERROR,
     326                 :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     327                 :                         errmsg("unrecognized origin value: \"%s\"", opts->origin));
     328                 :         }
     329 CBC          15 :         else if (IsSet(supported_opts, SUBOPT_LSN) &&
     330              12 :                  strcmp(defel->defname, "lsn") == 0)
     331 GIC           9 :         {
     332              12 :             char       *lsn_str = defGetString(defel);
     333                 :             XLogRecPtr  lsn;
     334 ECB             : 
     335 GBC          12 :             if (IsSet(opts->specified_opts, SUBOPT_LSN))
     336 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     337 ECB             : 
     338                 :             /* Setting lsn = NONE is treated as resetting LSN */
     339 GIC          12 :             if (strcmp(lsn_str, "none") == 0)
     340 CBC           3 :                 lsn = InvalidXLogRecPtr;
     341 ECB             :             else
     342                 :             {
     343                 :                 /* Parse the argument as LSN */
     344 GBC           9 :                 lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
     345                 :                                                       CStringGetDatum(lsn_str)));
     346 ECB             : 
     347 CBC           9 :                 if (XLogRecPtrIsInvalid(lsn))
     348 GIC           3 :                     ereport(ERROR,
     349 ECB             :                             (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     350                 :                              errmsg("invalid WAL location (LSN): %s", lsn_str)));
     351                 :             }
     352                 : 
     353 GBC           9 :             opts->specified_opts |= SUBOPT_LSN;
     354 GIC           9 :             opts->lsn = lsn;
     355 ECB             :         }
     356                 :         else
     357 GIC           3 :             ereport(ERROR,
     358 ECB             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     359                 :                      errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
     360                 :     }
     361                 : 
     362 EUB             :     /*
     363                 :      * We've been explicitly asked to not connect, that requires some
     364 ECB             :      * additional processing.
     365                 :      */
     366 GIC         332 :     if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
     367 ECB             :     {
     368                 :         /* Check for incompatible options from the user. */
     369 GIC          64 :         if (opts->enabled &&
     370 CBC          64 :             IsSet(opts->specified_opts, SUBOPT_ENABLED))
     371 GBC           3 :             ereport(ERROR,
     372                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
     373 ECB             :             /*- translator: both %s are strings of the form "option = value" */
     374                 :                      errmsg("%s and %s are mutually exclusive options",
     375                 :                             "connect = false", "enabled = true")));
     376                 : 
     377 GIC          61 :         if (opts->create_slot &&
     378              58 :             IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     379               3 :             ereport(ERROR,
     380                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
     381                 :                      errmsg("%s and %s are mutually exclusive options",
     382 ECB             :                             "connect = false", "create_slot = true")));
     383                 : 
     384 CBC          58 :         if (opts->copy_data &&
     385              55 :             IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
     386               3 :             ereport(ERROR,
     387                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
     388                 :                      errmsg("%s and %s are mutually exclusive options",
     389                 :                             "connect = false", "copy_data = true")));
     390 ECB             : 
     391                 :         /* Change the defaults of other options. */
     392 CBC          55 :         opts->enabled = false;
     393              55 :         opts->create_slot = false;
     394 GIC          55 :         opts->copy_data = false;
     395                 :     }
     396 ECB             : 
     397 EUB             :     /*
     398                 :      * Do additional checking for disallowed combination when slot_name = NONE
     399                 :      * was used.
     400 ECB             :      */
     401 CBC         323 :     if (!opts->slot_name &&
     402 GIC         317 :         IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
     403                 :     {
     404              50 :         if (opts->enabled)
     405 ECB             :         {
     406 GIC           9 :             if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
     407               3 :                 ereport(ERROR,
     408 ECB             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     409                 :                 /*- translator: both %s are strings of the form "option = value" */
     410                 :                          errmsg("%s and %s are mutually exclusive options",
     411                 :                                 "slot_name = NONE", "enabled = true")));
     412                 :             else
     413 GIC           6 :                 ereport(ERROR,
     414 ECB             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     415                 :                 /*- translator: both %s are strings of the form "option = value" */
     416                 :                          errmsg("subscription with %s must also set %s",
     417                 :                                 "slot_name = NONE", "enabled = false")));
     418                 :         }
     419                 : 
     420 GIC          41 :         if (opts->create_slot)
     421                 :         {
     422               6 :             if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
     423               3 :                 ereport(ERROR,
     424                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
     425                 :                 /*- translator: both %s are strings of the form "option = value" */
     426                 :                          errmsg("%s and %s are mutually exclusive options",
     427 ECB             :                                 "slot_name = NONE", "create_slot = true")));
     428                 :             else
     429 GIC           3 :                 ereport(ERROR,
     430 ECB             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     431                 :                 /*- translator: both %s are strings of the form "option = value" */
     432                 :                          errmsg("subscription with %s must also set %s",
     433                 :                                 "slot_name = NONE", "create_slot = false")));
     434                 :         }
     435                 :     }
     436 GIC         308 : }
     437                 : 
     438 ECB             : /*
     439                 :  * Add publication names from the list to a string.
     440                 :  */
     441                 : static void
     442 GIC         208 : get_publications_str(List *publications, StringInfo dest, bool quote_literal)
     443                 : {
     444                 :     ListCell   *lc;
     445 CBC         208 :     bool        first = true;
     446 ECB             : 
     447 GNC         208 :     Assert(publications != NIL);
     448                 : 
     449 GIC         495 :     foreach(lc, publications)
     450                 :     {
     451             287 :         char       *pubname = strVal(lfirst(lc));
     452                 : 
     453 CBC         287 :         if (first)
     454             208 :             first = false;
     455 ECB             :         else
     456 GIC          79 :             appendStringInfoString(dest, ", ");
     457                 : 
     458             287 :         if (quote_literal)
     459             281 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
     460                 :         else
     461                 :         {
     462 CBC           6 :             appendStringInfoChar(dest, '"');
     463               6 :             appendStringInfoString(dest, pubname);
     464 GIC           6 :             appendStringInfoChar(dest, '"');
     465 ECB             :         }
     466                 :     }
     467 CBC         208 : }
     468 ECB             : 
     469                 : /*
     470                 :  * Check that the specified publications are present on the publisher.
     471                 :  */
     472                 : static void
     473 GIC          88 : check_publications(WalReceiverConn *wrconn, List *publications)
     474 ECB             : {
     475                 :     WalRcvExecResult *res;
     476                 :     StringInfo  cmd;
     477                 :     TupleTableSlot *slot;
     478 GIC          88 :     List       *publicationsCopy = NIL;
     479              88 :     Oid         tableRow[1] = {TEXTOID};
     480                 : 
     481 CBC          88 :     cmd = makeStringInfo();
     482 GIC          88 :     appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
     483 ECB             :                            " pg_catalog.pg_publication t WHERE\n"
     484                 :                            " t.pubname IN (");
     485 GIC          88 :     get_publications_str(publications, cmd, true);
     486              88 :     appendStringInfoChar(cmd, ')');
     487                 : 
     488              88 :     res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
     489              88 :     pfree(cmd->data);
     490 CBC          88 :     pfree(cmd);
     491                 : 
     492 GIC          88 :     if (res->status != WALRCV_OK_TUPLES)
     493 UIC           0 :         ereport(ERROR,
     494                 :                 errmsg("could not receive list of publications from the publisher: %s",
     495                 :                        res->err));
     496                 : 
     497 CBC          88 :     publicationsCopy = list_copy(publications);
     498                 : 
     499                 :     /* Process publication(s). */
     500 GIC          88 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     501             202 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     502                 :     {
     503 ECB             :         char       *pubname;
     504                 :         bool        isnull;
     505                 : 
     506 CBC         114 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
     507 GIC         114 :         Assert(!isnull);
     508 ECB             : 
     509                 :         /* Delete the publication present in publisher from the list. */
     510 CBC         114 :         publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
     511 GIC         114 :         ExecClearTuple(slot);
     512 ECB             :     }
     513                 : 
     514 CBC          88 :     ExecDropSingleTupleTableSlot(slot);
     515 ECB             : 
     516 GIC          88 :     walrcv_clear_result(res);
     517 ECB             : 
     518 GIC          88 :     if (list_length(publicationsCopy))
     519 ECB             :     {
     520                 :         /* Prepare the list of non-existent publication(s) for error message. */
     521 GIC           3 :         StringInfo  pubnames = makeStringInfo();
     522                 : 
     523 CBC           3 :         get_publications_str(publicationsCopy, pubnames, false);
     524               3 :         ereport(WARNING,
     525 ECB             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     526                 :                 errmsg_plural("publication %s does not exist on the publisher",
     527                 :                               "publications %s do not exist on the publisher",
     528                 :                               list_length(publicationsCopy),
     529                 :                               pubnames->data));
     530                 :     }
     531 GIC          88 : }
     532                 : 
     533                 : /*
     534 ECB             :  * Auxiliary function to build a text array out of a list of String nodes.
     535                 :  */
     536                 : static Datum
     537 GIC         147 : publicationListToArray(List *publist)
     538                 : {
     539 ECB             :     ArrayType  *arr;
     540                 :     Datum      *datums;
     541                 :     MemoryContext memcxt;
     542                 :     MemoryContext oldcxt;
     543                 : 
     544                 :     /* Create memory context for temporary allocations. */
     545 GIC         147 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     546 ECB             :                                    "publicationListToArray to array",
     547                 :                                    ALLOCSET_DEFAULT_SIZES);
     548 GIC         147 :     oldcxt = MemoryContextSwitchTo(memcxt);
     549 ECB             : 
     550 CBC         147 :     datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
     551 ECB             : 
     552 GIC         147 :     check_duplicates_in_publist(publist, datums);
     553 ECB             : 
     554 GBC         144 :     MemoryContextSwitchTo(oldcxt);
     555                 : 
     556 GNC         144 :     arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
     557 ECB             : 
     558 GIC         144 :     MemoryContextDelete(memcxt);
     559                 : 
     560 CBC         144 :     return PointerGetDatum(arr);
     561 ECB             : }
     562                 : 
     563                 : /*
     564                 :  * Create new subscription.
     565                 :  */
     566                 : ObjectAddress
     567 CBC         177 : CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
     568                 :                    bool isTopLevel)
     569                 : {
     570 ECB             :     Relation    rel;
     571                 :     ObjectAddress myself;
     572                 :     Oid         subid;
     573                 :     bool        nulls[Natts_pg_subscription];
     574                 :     Datum       values[Natts_pg_subscription];
     575 GIC         177 :     Oid         owner = GetUserId();
     576 ECB             :     HeapTuple   tup;
     577                 :     char       *conninfo;
     578                 :     char        originname[NAMEDATALEN];
     579                 :     List       *publications;
     580                 :     bits32      supported_opts;
     581 CBC         177 :     SubOpts     opts = {0};
     582                 :     AclResult   aclresult;
     583                 : 
     584 ECB             :     /*
     585                 :      * Parse and check options.
     586                 :      *
     587                 :      * Connection and publication should not be specified here.
     588                 :      */
     589 GIC         177 :     supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
     590                 :                       SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
     591                 :                       SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
     592 ECB             :                       SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
     593                 :                       SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
     594                 :                       SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
     595 GIC         177 :     parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
     596                 : 
     597                 :     /*
     598                 :      * Since creating a replication slot is not transactional, rolling back
     599 ECB             :      * the transaction leaves the created replication slot.  So we cannot run
     600                 :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     601                 :      * replication slot.
     602                 :      */
     603 GIC         138 :     if (opts.create_slot)
     604              82 :         PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     605                 : 
     606                 :     /*
     607                 :      * We don't want to allow unprivileged users to be able to trigger attempts
     608                 :      * to access arbitrary network destinations, so require the user to have
     609                 :      * been specifically authorized to create subscriptions.
     610                 :      */
     611 GNC         135 :     if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
     612 CBC           3 :         ereport(ERROR,
     613                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     614                 :                  errmsg("must have privileges of pg_create_subscription to create subscriptions")));
     615                 : 
     616                 :     /*
     617                 :      * Since a subscription is a database object, we also check for CREATE
     618                 :      * permission on the database.
     619                 :      */
     620 GNC         132 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
     621                 :                                 owner, ACL_CREATE);
     622             132 :     if (aclresult != ACLCHECK_OK)
     623               6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     624               3 :                        get_database_name(MyDatabaseId));
     625                 : 
     626                 :     /*
     627                 :      * Non-superusers are required to set a password for authentication, and
     628                 :      * that password must be used by the target server, but the superuser can
     629                 :      * exempt a subscription from this requirement.
     630                 :      */
     631             129 :     if (!opts.passwordrequired && !superuser_arg(owner))
     632               3 :             ereport(ERROR,
     633                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     634                 :                      errmsg("password_required=false is superuser-only"),
     635                 :                      errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
     636 ECB             : 
     637                 :     /*
     638                 :      * If built with appropriate switch, whine when regression-testing
     639                 :      * conventions for subscription names are violated.
     640                 :      */
     641                 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
     642                 :     if (strncmp(stmt->subname, "regress_", 8) != 0)
     643                 :         elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
     644                 : #endif
     645                 : 
     646 CBC         126 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     647                 : 
     648 ECB             :     /* Check if name is used */
     649 GIC         126 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     650                 :                             MyDatabaseId, CStringGetDatum(stmt->subname));
     651             126 :     if (OidIsValid(subid))
     652                 :     {
     653               3 :         ereport(ERROR,
     654                 :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     655 ECB             :                  errmsg("subscription \"%s\" already exists",
     656                 :                         stmt->subname)));
     657                 :     }
     658                 : 
     659 GIC         123 :     if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
     660             110 :         opts.slot_name == NULL)
     661             110 :         opts.slot_name = stmt->subname;
     662                 : 
     663 ECB             :     /* The default for synchronous_commit of subscriptions is off. */
     664 GIC         123 :     if (opts.synchronous_commit == NULL)
     665             123 :         opts.synchronous_commit = "off";
     666                 : 
     667             123 :     conninfo = stmt->conninfo;
     668             123 :     publications = stmt->publication;
     669 ECB             : 
     670                 :     /* Load the library providing us libpq calls. */
     671 GIC         123 :     load_file("libpqwalreceiver", false);
     672                 : 
     673                 :     /* Check the connection info string. */
     674 GNC         123 :     walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
     675                 : 
     676                 :     /* Everything ok, form a new tuple. */
     677 CBC         114 :     memset(values, 0, sizeof(values));
     678 GIC         114 :     memset(nulls, false, sizeof(nulls));
     679                 : 
     680             114 :     subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
     681                 :                                Anum_pg_subscription_oid);
     682             114 :     values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
     683 CBC         114 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     684 GIC         114 :     values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
     685             114 :     values[Anum_pg_subscription_subname - 1] =
     686             114 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     687             114 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     688             114 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
     689             114 :     values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
     690 GNC         114 :     values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
     691 CBC         114 :     values[Anum_pg_subscription_subtwophasestate - 1] =
     692             114 :         CharGetDatum(opts.twophase ?
     693                 :                      LOGICALREP_TWOPHASE_STATE_PENDING :
     694                 :                      LOGICALREP_TWOPHASE_STATE_DISABLED);
     695 GIC         114 :     values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
     696 GNC         114 :     values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
     697             114 :     values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
     698 GIC         114 :     values[Anum_pg_subscription_subconninfo - 1] =
     699             114 :         CStringGetTextDatum(conninfo);
     700             114 :     if (opts.slot_name)
     701 CBC         104 :         values[Anum_pg_subscription_subslotname - 1] =
     702             104 :             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
     703                 :     else
     704 GIC          10 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     705             114 :     values[Anum_pg_subscription_subsynccommit - 1] =
     706             114 :         CStringGetTextDatum(opts.synchronous_commit);
     707             111 :     values[Anum_pg_subscription_subpublications - 1] =
     708             114 :         publicationListToArray(publications);
     709 GNC         111 :     values[Anum_pg_subscription_suborigin - 1] =
     710             111 :         CStringGetTextDatum(opts.origin);
     711                 : 
     712 CBC         111 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     713                 : 
     714 ECB             :     /* Insert tuple into catalog. */
     715 CBC         111 :     CatalogTupleInsert(rel, tup);
     716             111 :     heap_freetuple(tup);
     717                 : 
     718 GIC         111 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     719                 : 
     720 GNC         111 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
     721 GIC         111 :     replorigin_create(originname);
     722                 : 
     723 ECB             :     /*
     724                 :      * Connect to remote side to execute requested commands and fetch table
     725                 :      * info.
     726                 :      */
     727 GIC         111 :     if (opts.connect)
     728                 :     {
     729                 :         char       *err;
     730                 :         WalReceiverConn *wrconn;
     731                 :         List       *tables;
     732                 :         ListCell   *lc;
     733                 :         char        table_state;
     734                 :         bool        must_use_password;
     735                 : 
     736                 :         /* Try to connect to the publisher. */
     737 GNC          74 :         must_use_password = !superuser_arg(owner) && opts.passwordrequired;
     738              74 :         wrconn = walrcv_connect(conninfo, true, must_use_password,
     739                 :                                 stmt->subname, &err);
     740 GIC          74 :         if (!wrconn)
     741 CBC           3 :             ereport(ERROR,
     742                 :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     743                 :                      errmsg("could not connect to the publisher: %s", err)));
     744 ECB             : 
     745 GIC          71 :         PG_TRY();
     746 ECB             :         {
     747 GIC          71 :             check_publications(wrconn, publications);
     748 GNC          71 :             check_publications_origin(wrconn, publications, opts.copy_data,
     749                 :                                       opts.origin, NULL, 0, stmt->subname);
     750 ECB             : 
     751                 :             /*
     752                 :              * Set sync state based on if we were asked to do data copy or
     753                 :              * not.
     754                 :              */
     755 GIC          71 :             table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     756 ECB             : 
     757                 :             /*
     758                 :              * Get the table list from publisher and build local table status
     759                 :              * info.
     760                 :              */
     761 CBC          71 :             tables = fetch_table_list(wrconn, publications);
     762             198 :             foreach(lc, tables)
     763                 :             {
     764             128 :                 RangeVar   *rv = (RangeVar *) lfirst(lc);
     765 ECB             :                 Oid         relid;
     766                 : 
     767 GIC         128 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     768 ECB             : 
     769                 :                 /* Check for supported relkind. */
     770 GIC         128 :                 CheckSubscriptionRelkind(get_rel_relkind(relid),
     771 CBC         128 :                                          rv->schemaname, rv->relname);
     772                 : 
     773 GIC         128 :                 AddSubscriptionRelState(subid, relid, table_state,
     774 ECB             :                                         InvalidXLogRecPtr);
     775                 :             }
     776                 : 
     777                 :             /*
     778                 :              * If requested, create permanent slot for the subscription. We
     779                 :              * won't use the initial snapshot for anything, so no need to
     780                 :              * export it.
     781                 :              */
     782 CBC          70 :             if (opts.create_slot)
     783 ECB             :             {
     784 CBC          69 :                 bool        twophase_enabled = false;
     785 ECB             : 
     786 CBC          69 :                 Assert(opts.slot_name);
     787 ECB             : 
     788                 :                 /*
     789                 :                  * Even if two_phase is set, don't create the slot with
     790                 :                  * two-phase enabled. Will enable it once all the tables are
     791                 :                  * synced and ready. This avoids race-conditions like prepared
     792                 :                  * transactions being skipped due to changes not being applied
     793                 :                  * due to checks in should_apply_changes_for_rel() when
     794                 :                  * tablesync for the corresponding tables are in progress. See
     795                 :                  * comments atop worker.c.
     796                 :                  *
     797                 :                  * Note that if tables were specified but copy_data is false
     798                 :                  * then it is safe to enable two_phase up-front because those
     799                 :                  * tables are already initially in READY state. When the
     800                 :                  * subscription has no tables, we leave the twophase state as
     801                 :                  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
     802                 :                  * PUBLICATION to work.
     803                 :                  */
     804 CBC          69 :                 if (opts.twophase && !opts.copy_data && tables != NIL)
     805               1 :                     twophase_enabled = true;
     806 ECB             : 
     807 CBC          69 :                 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
     808                 :                                    CRS_NOEXPORT_SNAPSHOT, NULL);
     809 ECB             : 
     810 GIC          69 :                 if (twophase_enabled)
     811               1 :                     UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
     812 ECB             : 
     813 CBC          69 :                 ereport(NOTICE,
     814                 :                         (errmsg("created replication slot \"%s\" on publisher",
     815 ECB             :                                 opts.slot_name)));
     816                 :             }
     817                 :         }
     818 CBC           1 :         PG_FINALLY();
     819                 :         {
     820 GIC          71 :             walrcv_disconnect(wrconn);
     821                 :         }
     822              71 :         PG_END_TRY();
     823                 :     }
     824 ECB             :     else
     825 GIC          37 :         ereport(WARNING,
     826                 :                 (errmsg("subscription was created, but is not connected"),
     827                 :                  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
     828                 : 
     829             107 :     table_close(rel, RowExclusiveLock);
     830                 : 
     831             107 :     pgstat_create_subscription(subid);
     832                 : 
     833 CBC         107 :     if (opts.enabled)
     834              70 :         ApplyLauncherWakeupAtCommit();
     835                 : 
     836             107 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     837 ECB             : 
     838 GIC         107 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     839                 : 
     840             107 :     return myself;
     841 ECB             : }
     842                 : 
     843                 : static void
     844 CBC          38 : AlterSubscription_refresh(Subscription *sub, bool copy_data,
     845                 :                           List *validate_publications)
     846                 : {
     847                 :     char       *err;
     848                 :     List       *pubrel_names;
     849                 :     List       *subrel_states;
     850                 :     Oid        *subrel_local_oids;
     851 ECB             :     Oid        *pubrel_local_oids;
     852                 :     ListCell   *lc;
     853                 :     int         off;
     854                 :     int         remove_rel_len;
     855                 :     int         subrel_count;
     856 GIC          38 :     Relation    rel = NULL;
     857                 :     typedef struct SubRemoveRels
     858 ECB             :     {
     859                 :         Oid         relid;
     860                 :         char        state;
     861                 :     } SubRemoveRels;
     862                 :     SubRemoveRels *sub_remove_rels;
     863                 :     WalReceiverConn *wrconn;
     864                 :     bool        must_use_password;
     865                 : 
     866                 :     /* Load the library providing us libpq calls. */
     867 GIC          38 :     load_file("libpqwalreceiver", false);
     868 ECB             : 
     869                 :     /* Try to connect to the publisher. */
     870 GNC          38 :     must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
     871              38 :     wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
     872                 :                             sub->name, &err);
     873 CBC          38 :     if (!wrconn)
     874 UIC           0 :         ereport(ERROR,
     875                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     876                 :                  errmsg("could not connect to the publisher: %s", err)));
     877                 : 
     878 GIC          38 :     PG_TRY();
     879                 :     {
     880              38 :         if (validate_publications)
     881              17 :             check_publications(wrconn, validate_publications);
     882 ECB             : 
     883                 :         /* Get the table list from publisher. */
     884 CBC          38 :         pubrel_names = fetch_table_list(wrconn, sub->publications);
     885                 : 
     886 ECB             :         /* Get local table list. */
     887 GNC          38 :         subrel_states = GetSubscriptionRelations(sub->oid, false);
     888              38 :         subrel_count = list_length(subrel_states);
     889                 : 
     890                 :         /*
     891                 :          * Build qsorted array of local table oids for faster lookup. This can
     892                 :          * potentially contain all tables in the database so speed of lookup
     893                 :          * is important.
     894                 :          */
     895              38 :         subrel_local_oids = palloc(subrel_count * sizeof(Oid));
     896 GIC          38 :         off = 0;
     897             131 :         foreach(lc, subrel_states)
     898                 :         {
     899              93 :             SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
     900                 : 
     901              93 :             subrel_local_oids[off++] = relstate->relid;
     902                 :         }
     903 GNC          38 :         qsort(subrel_local_oids, subrel_count,
     904                 :               sizeof(Oid), oid_cmp);
     905 ECB             : 
     906 GNC          38 :         check_publications_origin(wrconn, sub->publications, copy_data,
     907                 :                                   sub->origin, subrel_local_oids,
     908                 :                                   subrel_count, sub->name);
     909                 : 
     910 ECB             :         /*
     911                 :          * Rels that we want to remove from subscription and drop any slots
     912                 :          * and origins corresponding to them.
     913                 :          */
     914 GNC          38 :         sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
     915 ECB             : 
     916                 :         /*
     917                 :          * Walk over the remote tables and try to match them to locally known
     918                 :          * tables. If the table is not known locally create a new state for
     919                 :          * it.
     920                 :          *
     921                 :          * Also builds array of local oids of remote tables for the next step.
     922                 :          */
     923 CBC          38 :         off = 0;
     924 GIC          38 :         pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
     925 ECB             : 
     926 GIC         130 :         foreach(lc, pubrel_names)
     927 ECB             :         {
     928 GIC          92 :             RangeVar   *rv = (RangeVar *) lfirst(lc);
     929                 :             Oid         relid;
     930 ECB             : 
     931 GIC          92 :             relid = RangeVarGetRelid(rv, AccessShareLock, false);
     932                 : 
     933                 :             /* Check for supported relkind. */
     934 CBC          92 :             CheckSubscriptionRelkind(get_rel_relkind(relid),
     935 GIC          92 :                                      rv->schemaname, rv->relname);
     936 ECB             : 
     937 GIC          92 :             pubrel_local_oids[off++] = relid;
     938 ECB             : 
     939 CBC          92 :             if (!bsearch(&relid, subrel_local_oids,
     940                 :                          subrel_count, sizeof(Oid), oid_cmp))
     941 ECB             :             {
     942 GIC          29 :                 AddSubscriptionRelState(sub->oid, relid,
     943 ECB             :                                         copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
     944                 :                                         InvalidXLogRecPtr);
     945 CBC          29 :                 ereport(DEBUG1,
     946                 :                         (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
     947                 :                                          rv->schemaname, rv->relname, sub->name)));
     948                 :             }
     949 ECB             :         }
     950                 : 
     951                 :         /*
     952                 :          * Next remove state for tables we should not care about anymore using
     953                 :          * the data we collected above
     954                 :          */
     955 GIC          38 :         qsort(pubrel_local_oids, list_length(pubrel_names),
     956                 :               sizeof(Oid), oid_cmp);
     957                 : 
     958              38 :         remove_rel_len = 0;
     959 GNC         131 :         for (off = 0; off < subrel_count; off++)
     960                 :         {
     961 CBC          93 :             Oid         relid = subrel_local_oids[off];
     962                 : 
     963 GIC          93 :             if (!bsearch(&relid, pubrel_local_oids,
     964              93 :                          list_length(pubrel_names), sizeof(Oid), oid_cmp))
     965                 :             {
     966                 :                 char        state;
     967                 :                 XLogRecPtr  statelsn;
     968                 : 
     969                 :                 /*
     970                 :                  * Lock pg_subscription_rel with AccessExclusiveLock to
     971                 :                  * prevent any race conditions with the apply worker
     972 ECB             :                  * re-launching workers at the same time this code is trying
     973                 :                  * to remove those tables.
     974                 :                  *
     975                 :                  * Even if new worker for this particular rel is restarted it
     976                 :                  * won't be able to make any progress as we hold exclusive
     977                 :                  * lock on subscription_rel till the transaction end. It will
     978                 :                  * simply exit as there is no corresponding rel entry.
     979 EUB             :                  *
     980                 :                  * This locking also ensures that the state of rels won't
     981                 :                  * change till we are done with this refresh operation.
     982                 :                  */
     983 CBC          30 :                 if (!rel)
     984 GIC          15 :                     rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
     985 ECB             : 
     986                 :                 /* Last known rel state. */
     987 GIC          30 :                 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
     988                 : 
     989 CBC          30 :                 sub_remove_rels[remove_rel_len].relid = relid;
     990 GIC          30 :                 sub_remove_rels[remove_rel_len++].state = state;
     991                 : 
     992 CBC          30 :                 RemoveSubscriptionRel(sub->oid, relid);
     993 ECB             : 
     994 GIC          30 :                 logicalrep_worker_stop(sub->oid, relid);
     995                 : 
     996                 :                 /*
     997                 :                  * For READY state, we would have already dropped the
     998                 :                  * tablesync origin.
     999                 :                  */
    1000 CBC          30 :                 if (state != SUBREL_STATE_READY)
    1001 ECB             :                 {
    1002                 :                     char        originname[NAMEDATALEN];
    1003                 : 
    1004                 :                     /*
    1005                 :                      * Drop the tablesync's origin tracking if exists.
    1006                 :                      *
    1007                 :                      * It is possible that the origin is not yet created for
    1008                 :                      * tablesync worker, this can happen for the states before
    1009                 :                      * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
    1010                 :                      * apply worker can also concurrently try to drop the
    1011                 :                      * origin and by this time the origin might be already
    1012                 :                      * removed. For these reasons, passing missing_ok = true.
    1013                 :                      */
    1014 UNC           0 :                     ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
    1015                 :                                                        sizeof(originname));
    1016 UIC           0 :                     replorigin_drop_by_name(originname, true, false);
    1017                 :                 }
    1018                 : 
    1019 CBC          30 :                 ereport(DEBUG1,
    1020                 :                         (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
    1021                 :                                          get_namespace_name(get_rel_namespace(relid)),
    1022                 :                                          get_rel_name(relid),
    1023                 :                                          sub->name)));
    1024                 :             }
    1025                 :         }
    1026                 : 
    1027                 :         /*
    1028 ECB             :          * Drop the tablesync slots associated with removed tables. This has
    1029                 :          * to be at the end because otherwise if there is an error while doing
    1030                 :          * the database operations we won't be able to rollback dropped slots.
    1031                 :          */
    1032 GIC          68 :         for (off = 0; off < remove_rel_len; off++)
    1033 ECB             :         {
    1034 GIC          30 :             if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
    1035 UIC           0 :                 sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
    1036 ECB             :             {
    1037 UIC           0 :                 char        syncslotname[NAMEDATALEN] = {0};
    1038                 : 
    1039 ECB             :                 /*
    1040                 :                  * For READY/SYNCDONE states we know the tablesync slot has
    1041                 :                  * already been dropped by the tablesync worker.
    1042                 :                  *
    1043                 :                  * For other states, there is no certainty, maybe the slot
    1044                 :                  * does not exist yet. Also, if we fail after removing some of
    1045                 :                  * the slots, next time, it will again try to drop already
    1046                 :                  * dropped slots and fail. For these reasons, we allow
    1047                 :                  * missing_ok = true for the drop.
    1048                 :                  */
    1049 UIC           0 :                 ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
    1050 ECB             :                                                 syncslotname, sizeof(syncslotname));
    1051 UIC           0 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1052                 :             }
    1053                 :         }
    1054                 :     }
    1055               0 :     PG_FINALLY();
    1056                 :     {
    1057 GIC          38 :         walrcv_disconnect(wrconn);
    1058                 :     }
    1059              38 :     PG_END_TRY();
    1060 ECB             : 
    1061 GIC          38 :     if (rel)
    1062              15 :         table_close(rel, NoLock);
    1063 CBC          38 : }
    1064 ECB             : 
    1065                 : /*
    1066                 :  * Alter the existing subscription.
    1067                 :  */
    1068                 : ObjectAddress
    1069 CBC         198 : AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    1070                 :                   bool isTopLevel)
    1071                 : {
    1072                 :     Relation    rel;
    1073                 :     ObjectAddress myself;
    1074                 :     bool        nulls[Natts_pg_subscription];
    1075                 :     bool        replaces[Natts_pg_subscription];
    1076                 :     Datum       values[Natts_pg_subscription];
    1077                 :     HeapTuple   tup;
    1078                 :     Oid         subid;
    1079 GIC         198 :     bool        update_tuple = false;
    1080                 :     Subscription *sub;
    1081                 :     Form_pg_subscription form;
    1082                 :     bits32      supported_opts;
    1083             198 :     SubOpts     opts = {0};
    1084                 : 
    1085             198 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1086                 : 
    1087                 :     /* Fetch the existing tuple. */
    1088 CBC         198 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
    1089 ECB             :                               CStringGetDatum(stmt->subname));
    1090                 : 
    1091 GIC         198 :     if (!HeapTupleIsValid(tup))
    1092 CBC           3 :         ereport(ERROR,
    1093                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1094 ECB             :                  errmsg("subscription \"%s\" does not exist",
    1095                 :                         stmt->subname)));
    1096                 : 
    1097 CBC         195 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1098 GIC         195 :     subid = form->oid;
    1099 ECB             : 
    1100                 :     /* must be owner */
    1101 GNC         195 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1102 UIC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1103               0 :                        stmt->subname);
    1104                 : 
    1105 CBC         195 :     sub = GetSubscription(subid, false);
    1106                 : 
    1107                 :     /*
    1108                 :      * Don't allow non-superuser modification of a subscription with
    1109                 :      * password_required=false.
    1110                 :      */
    1111 GNC         195 :     if (!sub->passwordrequired && !superuser())
    1112 UNC           0 :         ereport(ERROR,
    1113                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1114                 :                          errmsg("password_required=false is superuser-only"),
    1115                 :                          errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1116                 : 
    1117                 :     /* Lock the subscription so nobody else can do anything with it. */
    1118 GIC         195 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1119                 : 
    1120                 :     /* Form a new tuple. */
    1121             195 :     memset(values, 0, sizeof(values));
    1122             195 :     memset(nulls, false, sizeof(nulls));
    1123             195 :     memset(replaces, false, sizeof(replaces));
    1124                 : 
    1125             195 :     switch (stmt->kind)
    1126                 :     {
    1127              78 :         case ALTER_SUBSCRIPTION_OPTIONS:
    1128                 :             {
    1129 GBC          78 :                 supported_opts = (SUBOPT_SLOT_NAME |
    1130                 :                                   SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
    1131                 :                                   SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
    1132                 :                                   SUBOPT_PASSWORD_REQUIRED |
    1133                 :                                   SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
    1134                 : 
    1135 GIC          78 :                 parse_subscription_options(pstate, stmt->options,
    1136 ECB             :                                            supported_opts, &opts);
    1137                 : 
    1138 GIC          66 :                 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
    1139                 :                 {
    1140                 :                     /*
    1141                 :                      * The subscription must be disabled to allow slot_name as
    1142                 :                      * 'none', otherwise, the apply worker will repeatedly try
    1143                 :                      * to stream the data using that slot_name which neither
    1144                 :                      * exists on the publisher nor the user will be allowed to
    1145                 :                      * create it.
    1146                 :                      */
    1147              28 :                     if (sub->enabled && !opts.slot_name)
    1148 UIC           0 :                         ereport(ERROR,
    1149 ECB             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1150                 :                                  errmsg("cannot set %s for enabled subscription",
    1151                 :                                         "slot_name = NONE")));
    1152 EUB             : 
    1153 GIC          28 :                     if (opts.slot_name)
    1154 GBC           3 :                         values[Anum_pg_subscription_subslotname - 1] =
    1155 GIC           3 :                             DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    1156                 :                     else
    1157              25 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
    1158              28 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
    1159                 :                 }
    1160                 : 
    1161              66 :                 if (opts.synchronous_commit)
    1162                 :                 {
    1163               3 :                     values[Anum_pg_subscription_subsynccommit - 1] =
    1164               3 :                         CStringGetTextDatum(opts.synchronous_commit);
    1165               3 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
    1166 EUB             :                 }
    1167                 : 
    1168 GBC          66 :                 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
    1169                 :                 {
    1170 GIC           9 :                     values[Anum_pg_subscription_subbinary - 1] =
    1171               9 :                         BoolGetDatum(opts.binary);
    1172 GBC           9 :                     replaces[Anum_pg_subscription_subbinary - 1] = true;
    1173                 :                 }
    1174 ECB             : 
    1175 GIC          66 :                 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
    1176 ECB             :                 {
    1177 GIC          15 :                     values[Anum_pg_subscription_substream - 1] =
    1178 GNC          15 :                         CharGetDatum(opts.streaming);
    1179 CBC          15 :                     replaces[Anum_pg_subscription_substream - 1] = true;
    1180 ECB             :                 }
    1181                 : 
    1182 GIC          66 :                 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
    1183                 :                 {
    1184                 :                     values[Anum_pg_subscription_subdisableonerr - 1]
    1185               3 :                         = BoolGetDatum(opts.disableonerr);
    1186 ECB             :                     replaces[Anum_pg_subscription_subdisableonerr - 1]
    1187 GIC           3 :                         = true;
    1188                 :                 }
    1189                 : 
    1190 GNC          66 :                 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
    1191                 :                 {
    1192                 :                     /* Non-superuser may not disable password_required. */
    1193               6 :                     if (!opts.passwordrequired && !superuser())
    1194 UNC           0 :                         ereport(ERROR,
    1195                 :                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1196                 :                                  errmsg("password_required=false is superuser-only"),
    1197                 :                                  errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1198                 : 
    1199                 :                     values[Anum_pg_subscription_subpasswordrequired - 1]
    1200 GNC           6 :                         = BoolGetDatum(opts.passwordrequired);
    1201                 :                     replaces[Anum_pg_subscription_subpasswordrequired - 1]
    1202               6 :                         = true;
    1203                 :                 }
    1204                 : 
    1205              66 :                 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
    1206                 :                 {
    1207               3 :                     values[Anum_pg_subscription_suborigin - 1] =
    1208               3 :                         CStringGetTextDatum(opts.origin);
    1209               3 :                     replaces[Anum_pg_subscription_suborigin - 1] = true;
    1210                 :                 }
    1211                 : 
    1212 GIC          66 :                 update_tuple = true;
    1213              66 :                 break;
    1214                 :             }
    1215                 : 
    1216              21 :         case ALTER_SUBSCRIPTION_ENABLED:
    1217                 :             {
    1218 CBC          21 :                 parse_subscription_options(pstate, stmt->options,
    1219                 :                                            SUBOPT_ENABLED, &opts);
    1220 GIC          21 :                 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
    1221                 : 
    1222 CBC          21 :                 if (!sub->slotname && opts.enabled)
    1223 GIC           3 :                     ereport(ERROR,
    1224 ECB             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1225                 :                              errmsg("cannot enable subscription that does not have a slot name")));
    1226                 : 
    1227 CBC          18 :                 values[Anum_pg_subscription_subenabled - 1] =
    1228 GIC          18 :                     BoolGetDatum(opts.enabled);
    1229              18 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
    1230 ECB             : 
    1231 CBC          18 :                 if (opts.enabled)
    1232 GIC          10 :                     ApplyLauncherWakeupAtCommit();
    1233                 : 
    1234              18 :                 update_tuple = true;
    1235              18 :                 break;
    1236 ECB             :             }
    1237                 : 
    1238 GIC           7 :         case ALTER_SUBSCRIPTION_CONNECTION:
    1239                 :             /* Load the library providing us libpq calls. */
    1240 CBC           7 :             load_file("libpqwalreceiver", false);
    1241 EUB             :             /* Check the connection info string. */
    1242 GNC           7 :             walrcv_check_conninfo(stmt->conninfo,
    1243                 :                                   sub->passwordrequired && !superuser_arg(sub->owner));
    1244                 : 
    1245 CBC           4 :             values[Anum_pg_subscription_subconninfo - 1] =
    1246 GIC           4 :                 CStringGetTextDatum(stmt->conninfo);
    1247               4 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
    1248               4 :             update_tuple = true;
    1249               4 :             break;
    1250                 : 
    1251 CBC          24 :         case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    1252 EUB             :             {
    1253 GIC          24 :                 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
    1254              24 :                 parse_subscription_options(pstate, stmt->options,
    1255                 :                                            supported_opts, &opts);
    1256                 : 
    1257              24 :                 values[Anum_pg_subscription_subpublications - 1] =
    1258 CBC          24 :                     publicationListToArray(stmt->publication);
    1259 GIC          24 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1260                 : 
    1261 CBC          24 :                 update_tuple = true;
    1262 ECB             : 
    1263                 :                 /* Refresh if user asked us to. */
    1264 GIC          24 :                 if (opts.refresh)
    1265 ECB             :                 {
    1266 GIC          21 :                     if (!sub->enabled)
    1267 LBC           0 :                         ereport(ERROR,
    1268                 :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1269 ECB             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1270                 :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
    1271                 : 
    1272                 :                     /*
    1273                 :                      * See ALTER_SUBSCRIPTION_REFRESH for details why this is
    1274                 :                      * not allowed.
    1275                 :                      */
    1276 GIC          21 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1277 UIC           0 :                         ereport(ERROR,
    1278 ECB             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1279                 :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1280                 :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1281                 : 
    1282 GIC          21 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1283                 : 
    1284                 :                     /* Make sure refresh sees the new list of publications. */
    1285              15 :                     sub->publications = stmt->publication;
    1286                 : 
    1287 CBC          15 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1288 EUB             :                                               stmt->publication);
    1289                 :                 }
    1290                 : 
    1291 GIC          18 :                 break;
    1292                 :             }
    1293 ECB             : 
    1294 CBC          27 :         case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
    1295 ECB             :         case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
    1296                 :             {
    1297                 :                 List       *publist;
    1298 CBC          27 :                 bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
    1299                 : 
    1300 GIC          27 :                 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
    1301 CBC          27 :                 parse_subscription_options(pstate, stmt->options,
    1302                 :                                            supported_opts, &opts);
    1303 ECB             : 
    1304 CBC          27 :                 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
    1305               9 :                 values[Anum_pg_subscription_subpublications - 1] =
    1306 GIC           9 :                     publicationListToArray(publist);
    1307               9 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
    1308 ECB             : 
    1309 GIC           9 :                 update_tuple = true;
    1310 ECB             : 
    1311                 :                 /* Refresh if user asked us to. */
    1312 CBC           9 :                 if (opts.refresh)
    1313                 :                 {
    1314                 :                     /* We only need to validate user specified publications. */
    1315               3 :                     List       *validate_publications = (isadd) ? stmt->publication : NULL;
    1316                 : 
    1317               3 :                     if (!sub->enabled)
    1318 LBC           0 :                         ereport(ERROR,
    1319 ECB             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1320                 :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
    1321                 :                         /* translator: %s is an SQL ALTER command */
    1322                 :                                  errhint("Use %s instead.",
    1323                 :                                          isadd ?
    1324                 :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
    1325                 :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
    1326                 : 
    1327                 :                     /*
    1328                 :                      * See ALTER_SUBSCRIPTION_REFRESH for details why this is
    1329                 :                      * not allowed.
    1330                 :                      */
    1331 GIC           3 :                     if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1332 UIC           0 :                         ereport(ERROR,
    1333 ECB             :                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1334 EUB             :                                  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
    1335                 :                         /* translator: %s is an SQL ALTER command */
    1336                 :                                  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
    1337                 :                                          isadd ?
    1338                 :                                          "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
    1339                 :                                          "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
    1340 ECB             : 
    1341 GIC           3 :                     PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
    1342 ECB             : 
    1343                 :                     /* Refresh the new list of publications. */
    1344 GIC           3 :                     sub->publications = publist;
    1345 ECB             : 
    1346 GIC           3 :                     AlterSubscription_refresh(sub, opts.copy_data,
    1347 ECB             :                                               validate_publications);
    1348                 :                 }
    1349                 : 
    1350 GIC           9 :                 break;
    1351                 :             }
    1352 ECB             : 
    1353 CBC          26 :         case ALTER_SUBSCRIPTION_REFRESH:
    1354                 :             {
    1355 GIC          26 :                 if (!sub->enabled)
    1356 CBC           3 :                     ereport(ERROR,
    1357                 :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1358 ECB             :                              errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
    1359                 : 
    1360 CBC          23 :                 parse_subscription_options(pstate, stmt->options,
    1361                 :                                            SUBOPT_COPY_DATA, &opts);
    1362 ECB             : 
    1363                 :                 /*
    1364                 :                  * The subscription option "two_phase" requires that
    1365                 :                  * replication has passed the initial table synchronization
    1366                 :                  * phase before the two_phase becomes properly enabled.
    1367                 :                  *
    1368                 :                  * But, having reached this two-phase commit "enabled" state
    1369                 :                  * we must not allow any subsequent table initialization to
    1370                 :                  * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
    1371                 :                  * when the user had requested two_phase = on mode.
    1372                 :                  *
    1373                 :                  * The exception to this restriction is when copy_data =
    1374                 :                  * false, because when copy_data is false the tablesync will
    1375                 :                  * start already in READY state and will exit directly without
    1376                 :                  * doing anything.
    1377                 :                  *
    1378                 :                  * For more details see comments atop worker.c.
    1379                 :                  */
    1380 CBC          23 :                 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
    1381 UIC           0 :                     ereport(ERROR,
    1382 ECB             :                             (errcode(ERRCODE_SYNTAX_ERROR),
    1383                 :                              errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
    1384                 :                              errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
    1385                 : 
    1386 CBC          23 :                 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
    1387 ECB             : 
    1388 CBC          20 :                 AlterSubscription_refresh(sub, opts.copy_data, NULL);
    1389 ECB             : 
    1390 GIC          20 :                 break;
    1391 ECB             :             }
    1392                 : 
    1393 CBC          12 :         case ALTER_SUBSCRIPTION_SKIP:
    1394 ECB             :             {
    1395 GIC          12 :                 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
    1396                 : 
    1397 ECB             :                 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
    1398 CBC           9 :                 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
    1399 ECB             : 
    1400                 :                 /*
    1401                 :                  * If the user sets subskiplsn, we do a sanity check to make
    1402 EUB             :                  * sure that the specified LSN is a probable value.
    1403                 :                  */
    1404 GIC           9 :                 if (!XLogRecPtrIsInvalid(opts.lsn))
    1405                 :                 {
    1406                 :                     RepOriginId originid;
    1407                 :                     char        originname[NAMEDATALEN];
    1408                 :                     XLogRecPtr  remote_lsn;
    1409                 : 
    1410 GNC           6 :                     ReplicationOriginNameForLogicalRep(subid, InvalidOid,
    1411                 :                                                        originname, sizeof(originname));
    1412 CBC           6 :                     originid = replorigin_by_name(originname, false);
    1413 GBC           6 :                     remote_lsn = replorigin_get_progress(originid, false);
    1414                 : 
    1415                 :                     /* Check the given LSN is at least a future LSN */
    1416 GIC           6 :                     if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
    1417 UIC           0 :                         ereport(ERROR,
    1418 ECB             :                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1419                 :                                  errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
    1420                 :                                         LSN_FORMAT_ARGS(opts.lsn),
    1421                 :                                         LSN_FORMAT_ARGS(remote_lsn))));
    1422                 :                 }
    1423                 : 
    1424 GIC           9 :                 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
    1425               9 :                 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    1426                 : 
    1427 CBC           9 :                 update_tuple = true;
    1428 GIC           9 :                 break;
    1429                 :             }
    1430 ECB             : 
    1431 UIC           0 :         default:
    1432               0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
    1433                 :                  stmt->kind);
    1434 ECB             :     }
    1435                 : 
    1436                 :     /* Update the catalog if needed. */
    1437 CBC         144 :     if (update_tuple)
    1438                 :     {
    1439 GIC         124 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1440 ECB             :                                 replaces);
    1441                 : 
    1442 CBC         124 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    1443 ECB             : 
    1444 GIC         124 :         heap_freetuple(tup);
    1445 ECB             :     }
    1446                 : 
    1447 GIC         144 :     table_close(rel, RowExclusiveLock);
    1448 ECB             : 
    1449 GIC         144 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1450                 : 
    1451 CBC         144 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
    1452                 : 
    1453                 :     /* Wake up related replication workers to handle this change quickly. */
    1454 GNC         144 :     LogicalRepWorkersWakeupAtCommit(subid);
    1455                 : 
    1456 CBC         144 :     return myself;
    1457 EUB             : }
    1458                 : 
    1459                 : /*
    1460                 :  * Drop a subscription
    1461                 :  */
    1462                 : void
    1463 GIC          78 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    1464                 : {
    1465                 :     Relation    rel;
    1466                 :     ObjectAddress myself;
    1467                 :     HeapTuple   tup;
    1468                 :     Oid         subid;
    1469                 :     Oid         subowner;
    1470                 :     Datum       datum;
    1471 ECB             :     bool        isnull;
    1472 EUB             :     char       *subname;
    1473                 :     char       *conninfo;
    1474                 :     char       *slotname;
    1475                 :     List       *subworkers;
    1476                 :     ListCell   *lc;
    1477                 :     char        originname[NAMEDATALEN];
    1478 GIC          78 :     char       *err = NULL;
    1479                 :     WalReceiverConn *wrconn;
    1480                 :     Form_pg_subscription form;
    1481 ECB             :     List       *rstates;
    1482                 :     bool        must_use_password;
    1483                 : 
    1484                 :     /*
    1485                 :      * Lock pg_subscription with AccessExclusiveLock to ensure that the
    1486                 :      * launcher doesn't restart new worker during dropping the subscription
    1487                 :      */
    1488 GIC          78 :     rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
    1489                 : 
    1490              78 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
    1491 CBC          78 :                           CStringGetDatum(stmt->subname));
    1492                 : 
    1493 GIC          78 :     if (!HeapTupleIsValid(tup))
    1494 ECB             :     {
    1495 GIC           6 :         table_close(rel, NoLock);
    1496 ECB             : 
    1497 CBC           6 :         if (!stmt->missing_ok)
    1498 GIC           3 :             ereport(ERROR,
    1499                 :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1500                 :                      errmsg("subscription \"%s\" does not exist",
    1501 ECB             :                             stmt->subname)));
    1502                 :         else
    1503 GIC           3 :             ereport(NOTICE,
    1504                 :                     (errmsg("subscription \"%s\" does not exist, skipping",
    1505                 :                             stmt->subname)));
    1506                 : 
    1507              38 :         return;
    1508                 :     }
    1509                 : 
    1510              72 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1511              72 :     subid = form->oid;
    1512 GNC          72 :     subowner = form->subowner;
    1513              72 :     must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
    1514                 : 
    1515                 :     /* must be owner */
    1516              72 :     if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
    1517 UIC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1518               0 :                        stmt->subname);
    1519                 : 
    1520                 :     /* DROP hook for the subscription being removed */
    1521 GIC          72 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
    1522                 : 
    1523 ECB             :     /*
    1524 EUB             :      * Lock the subscription so nobody else can do anything with it (including
    1525                 :      * the replication workers).
    1526                 :      */
    1527 GIC          72 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
    1528                 : 
    1529 ECB             :     /* Get subname */
    1530 GNC          72 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    1531                 :                                    Anum_pg_subscription_subname);
    1532 CBC          72 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
    1533                 : 
    1534                 :     /* Get conninfo */
    1535 GNC          72 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
    1536                 :                                    Anum_pg_subscription_subconninfo);
    1537 GIC          72 :     conninfo = TextDatumGetCString(datum);
    1538                 : 
    1539 ECB             :     /* Get slotname */
    1540 GIC          72 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
    1541                 :                             Anum_pg_subscription_subslotname, &isnull);
    1542              72 :     if (!isnull)
    1543              37 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
    1544                 :     else
    1545 CBC          35 :         slotname = NULL;
    1546                 : 
    1547                 :     /*
    1548                 :      * Since dropping a replication slot is not transactional, the replication
    1549                 :      * slot stays dropped even if the transaction rolls back.  So we cannot
    1550                 :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
    1551 ECB             :      * replication slot.  Also, in this case, we report a message for dropping
    1552                 :      * the subscription to the cumulative stats system.
    1553                 :      *
    1554                 :      * XXX The command name should really be something like "DROP SUBSCRIPTION
    1555                 :      * of a subscription that is associated with a replication slot", but we
    1556                 :      * don't have the proper facilities for that.
    1557                 :      */
    1558 GBC          72 :     if (slotname)
    1559 GIC          37 :         PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
    1560                 : 
    1561              69 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
    1562              69 :     EventTriggerSQLDropAddObject(&myself, true, true);
    1563                 : 
    1564                 :     /* Remove the tuple from catalog. */
    1565 CBC          69 :     CatalogTupleDelete(rel, &tup->t_self);
    1566 ECB             : 
    1567 GIC          69 :     ReleaseSysCache(tup);
    1568 ECB             : 
    1569                 :     /*
    1570                 :      * Stop all the subscription workers immediately.
    1571                 :      *
    1572 EUB             :      * This is necessary if we are dropping the replication slot, so that the
    1573                 :      * slot becomes accessible.
    1574                 :      *
    1575                 :      * It is also necessary if the subscription is disabled and was disabled
    1576                 :      * in the same transaction.  Then the workers haven't seen the disabling
    1577                 :      * yet and will still be running, leading to hangs later when we want to
    1578 ECB             :      * drop the replication origin.  If the subscription was disabled before
    1579                 :      * this transaction, then there shouldn't be any workers left, so this
    1580                 :      * won't make a difference.
    1581                 :      *
    1582                 :      * New workers won't be started because we hold an exclusive lock on the
    1583                 :      * subscription till the end of the transaction.
    1584                 :      */
    1585 CBC          69 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1586 GIC          69 :     subworkers = logicalrep_workers_find(subid, false);
    1587              69 :     LWLockRelease(LogicalRepWorkerLock);
    1588 CBC         106 :     foreach(lc, subworkers)
    1589                 :     {
    1590              37 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
    1591                 : 
    1592              37 :         logicalrep_worker_stop(w->subid, w->relid);
    1593                 :     }
    1594 GIC          69 :     list_free(subworkers);
    1595 ECB             : 
    1596                 :     /*
    1597                 :      * Remove the no-longer-useful entry in the launcher's table of apply
    1598                 :      * worker start times.
    1599                 :      *
    1600                 :      * If this transaction rolls back, the launcher might restart a failed
    1601                 :      * apply worker before wal_retrieve_retry_interval milliseconds have
    1602                 :      * elapsed, but that's pretty harmless.
    1603                 :      */
    1604 GNC          69 :     ApplyLauncherForgetWorkerStartTime(subid);
    1605                 : 
    1606                 :     /*
    1607 ECB             :      * Cleanup of tablesync replication origins.
    1608                 :      *
    1609                 :      * Any READY-state relations would already have dealt with clean-ups.
    1610                 :      *
    1611                 :      * Note that the state can't change because we have already stopped both
    1612                 :      * the apply and tablesync workers and they can't restart because of
    1613                 :      * exclusive lock on the subscription.
    1614                 :      */
    1615 GNC          69 :     rstates = GetSubscriptionRelations(subid, true);
    1616 GIC          70 :     foreach(lc, rstates)
    1617                 :     {
    1618               1 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    1619               1 :         Oid         relid = rstate->relid;
    1620                 : 
    1621                 :         /* Only cleanup resources of tablesync workers */
    1622               1 :         if (!OidIsValid(relid))
    1623 UIC           0 :             continue;
    1624                 : 
    1625                 :         /*
    1626                 :          * Drop the tablesync's origin tracking if exists.
    1627                 :          *
    1628                 :          * It is possible that the origin is not yet created for tablesync
    1629 ECB             :          * worker so passing missing_ok = true. This can happen for the states
    1630                 :          * before SUBREL_STATE_FINISHEDCOPY.
    1631                 :          */
    1632 GNC           1 :         ReplicationOriginNameForLogicalRep(subid, relid, originname,
    1633                 :                                            sizeof(originname));
    1634 GIC           1 :         replorigin_drop_by_name(originname, true, false);
    1635                 :     }
    1636                 : 
    1637                 :     /* Clean up dependencies */
    1638              69 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
    1639 ECB             : 
    1640                 :     /* Remove any associated relation synchronization states. */
    1641 CBC          69 :     RemoveSubscriptionRel(subid, InvalidOid);
    1642 ECB             : 
    1643                 :     /* Remove the origin tracking if exists. */
    1644 GNC          69 :     ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
    1645 GIC          69 :     replorigin_drop_by_name(originname, true, false);
    1646 ECB             : 
    1647                 :     /*
    1648                 :      * If there is no slot associated with the subscription, we can finish
    1649                 :      * here.
    1650                 :      */
    1651 GIC          69 :     if (!slotname && rstates == NIL)
    1652                 :     {
    1653              35 :         table_close(rel, NoLock);
    1654 CBC          35 :         return;
    1655                 :     }
    1656                 : 
    1657                 :     /*
    1658 ECB             :      * Try to acquire the connection necessary for dropping slots.
    1659                 :      *
    1660                 :      * Note: If the slotname is NONE/NULL then we allow the command to finish
    1661                 :      * and users need to manually cleanup the apply and tablesync worker slots
    1662                 :      * later.
    1663                 :      *
    1664                 :      * This has to be at the end because otherwise if there is an error while
    1665                 :      * doing the database operations we won't be able to rollback dropped
    1666                 :      * slot.
    1667                 :      */
    1668 GBC          34 :     load_file("libpqwalreceiver", false);
    1669 EUB             : 
    1670 GNC          34 :     wrconn = walrcv_connect(conninfo, true, must_use_password,
    1671                 :                             subname, &err);
    1672 GIC          34 :     if (wrconn == NULL)
    1673 ECB             :     {
    1674 UIC           0 :         if (!slotname)
    1675                 :         {
    1676                 :             /* be tidy */
    1677               0 :             list_free(rstates);
    1678               0 :             table_close(rel, NoLock);
    1679 LBC           0 :             return;
    1680                 :         }
    1681                 :         else
    1682 ECB             :         {
    1683 UIC           0 :             ReportSlotConnectionError(rstates, subid, slotname, err);
    1684 ECB             :         }
    1685                 :     }
    1686                 : 
    1687 CBC          34 :     PG_TRY();
    1688                 :     {
    1689              35 :         foreach(lc, rstates)
    1690                 :         {
    1691 GIC           1 :             SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    1692 CBC           1 :             Oid         relid = rstate->relid;
    1693                 : 
    1694 ECB             :             /* Only cleanup resources of tablesync workers */
    1695 CBC           1 :             if (!OidIsValid(relid))
    1696 UIC           0 :                 continue;
    1697 ECB             : 
    1698                 :             /*
    1699                 :              * Drop the tablesync slots associated with removed tables.
    1700                 :              *
    1701                 :              * For SYNCDONE/READY states, the tablesync slot is known to have
    1702                 :              * already been dropped by the tablesync worker.
    1703                 :              *
    1704                 :              * For other states, there is no certainty, maybe the slot does
    1705                 :              * not exist yet. Also, if we fail after removing some of the
    1706                 :              * slots, next time, it will again try to drop already dropped
    1707                 :              * slots and fail. For these reasons, we allow missing_ok = true
    1708                 :              * for the drop.
    1709                 :              */
    1710 CBC           1 :             if (rstate->state != SUBREL_STATE_SYNCDONE)
    1711 ECB             :             {
    1712 GIC           1 :                 char        syncslotname[NAMEDATALEN] = {0};
    1713 ECB             : 
    1714 CBC           1 :                 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    1715                 :                                                 sizeof(syncslotname));
    1716 GIC           1 :                 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
    1717 ECB             :             }
    1718                 :         }
    1719                 : 
    1720 GIC          34 :         list_free(rstates);
    1721                 : 
    1722                 :         /*
    1723                 :          * If there is a slot associated with the subscription, then drop the
    1724                 :          * replication slot at the publisher.
    1725                 :          */
    1726              34 :         if (slotname)
    1727              34 :             ReplicationSlotDropAtPubNode(wrconn, slotname, false);
    1728                 :     }
    1729 UIC           0 :     PG_FINALLY();
    1730                 :     {
    1731 GIC          34 :         walrcv_disconnect(wrconn);
    1732                 :     }
    1733              34 :     PG_END_TRY();
    1734                 : 
    1735                 :     /*
    1736                 :      * Tell the cumulative stats system that the subscription is getting
    1737 ECB             :      * dropped.
    1738                 :      */
    1739 CBC          34 :     pgstat_drop_subscription(subid);
    1740 ECB             : 
    1741 GIC          34 :     table_close(rel, NoLock);
    1742 ECB             : }
    1743                 : 
    1744                 : /*
    1745                 :  * Drop the replication slot at the publisher node using the replication
    1746                 :  * connection.
    1747                 :  *
    1748                 :  * missing_ok - if true then only issue a LOG message if the slot doesn't
    1749                 :  * exist.
    1750                 :  */
    1751                 : void
    1752 GIC         189 : ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
    1753                 : {
    1754                 :     StringInfoData cmd;
    1755                 : 
    1756 CBC         189 :     Assert(wrconn);
    1757                 : 
    1758 GIC         189 :     load_file("libpqwalreceiver", false);
    1759                 : 
    1760             189 :     initStringInfo(&cmd);
    1761             189 :     appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
    1762                 : 
    1763             189 :     PG_TRY();
    1764                 :     {
    1765                 :         WalRcvExecResult *res;
    1766                 : 
    1767 CBC         189 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    1768 ECB             : 
    1769 GIC         189 :         if (res->status == WALRCV_OK_COMMAND)
    1770 ECB             :         {
    1771                 :             /* NOTICE. Success. */
    1772 GIC         189 :             ereport(NOTICE,
    1773                 :                     (errmsg("dropped replication slot \"%s\" on publisher",
    1774 ECB             :                             slotname)));
    1775 EUB             :         }
    1776 UIC           0 :         else if (res->status == WALRCV_ERROR &&
    1777               0 :                  missing_ok &&
    1778               0 :                  res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
    1779                 :         {
    1780                 :             /* LOG. Error, but missing_ok = true. */
    1781               0 :             ereport(LOG,
    1782                 :                     (errmsg("could not drop replication slot \"%s\" on publisher: %s",
    1783                 :                             slotname, res->err)));
    1784 ECB             :         }
    1785                 :         else
    1786                 :         {
    1787                 :             /* ERROR. */
    1788 UIC           0 :             ereport(ERROR,
    1789                 :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1790 ECB             :                      errmsg("could not drop replication slot \"%s\" on publisher: %s",
    1791                 :                             slotname, res->err)));
    1792                 :         }
    1793                 : 
    1794 GIC         189 :         walrcv_clear_result(res);
    1795                 :     }
    1796 LBC           0 :     PG_FINALLY();
    1797 ECB             :     {
    1798 GIC         189 :         pfree(cmd.data);
    1799                 :     }
    1800             189 :     PG_END_TRY();
    1801             189 : }
    1802                 : 
    1803 ECB             : /*
    1804                 :  * Internal workhorse for changing a subscription owner
    1805                 :  */
    1806                 : static void
    1807 GIC           6 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1808                 : {
    1809                 :     Form_pg_subscription form;
    1810                 :     AclResult   aclresult;
    1811                 : 
    1812               6 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1813                 : 
    1814               6 :     if (form->subowner == newOwnerId)
    1815 UIC           0 :         return;
    1816                 : 
    1817 GNC           6 :     if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
    1818 UIC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
    1819               0 :                        NameStr(form->subname));
    1820                 : 
    1821                 :     /*
    1822                 :      * Don't allow non-superuser modification of a subscription with
    1823                 :      * password_required=false.
    1824                 :      */
    1825 GNC           6 :     if (!form->subpasswordrequired && !superuser())
    1826 LBC           0 :         ereport(ERROR,
    1827                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1828                 :                          errmsg("password_required=false is superuser-only"),
    1829                 :                          errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
    1830                 : 
    1831                 :     /* Must be able to become new owner */
    1832 GNC           6 :     check_can_set_role(GetUserId(), newOwnerId);
    1833                 : 
    1834                 :     /*
    1835                 :      * current owner must have CREATE on database
    1836                 :      *
    1837                 :      * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
    1838                 :      * other object types behave differently (e.g. you can't give a table to
    1839                 :      * a user who lacks CREATE privileges on a schema).
    1840                 :      */
    1841               3 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
    1842                 :                                 GetUserId(), ACL_CREATE);
    1843               3 :     if (aclresult != ACLCHECK_OK)
    1844 UNC           0 :         aclcheck_error(aclresult, OBJECT_DATABASE,
    1845               0 :                        get_database_name(MyDatabaseId));
    1846                 : 
    1847 GIC           3 :     form->subowner = newOwnerId;
    1848 GBC           3 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1849 EUB             : 
    1850                 :     /* Update owner dependency reference */
    1851 GIC           3 :     changeDependencyOnOwner(SubscriptionRelationId,
    1852                 :                             form->oid,
    1853                 :                             newOwnerId);
    1854 EUB             : 
    1855 GIC           3 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    1856                 :                               form->oid, 0);
    1857                 : 
    1858                 :     /* Wake up related background processes to handle this change quickly. */
    1859 CBC           3 :     ApplyLauncherWakeupAtCommit();
    1860 GNC           3 :     LogicalRepWorkersWakeupAtCommit(form->oid);
    1861                 : }
    1862 ECB             : 
    1863                 : /*
    1864                 :  * Change subscription owner -- by name
    1865                 :  */
    1866                 : ObjectAddress
    1867 GIC           6 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    1868 ECB             : {
    1869 EUB             :     Oid         subid;
    1870                 :     HeapTuple   tup;
    1871                 :     Relation    rel;
    1872                 :     ObjectAddress address;
    1873                 :     Form_pg_subscription form;
    1874                 : 
    1875 GIC           6 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1876                 : 
    1877               6 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
    1878                 :                               CStringGetDatum(name));
    1879                 : 
    1880               6 :     if (!HeapTupleIsValid(tup))
    1881 UIC           0 :         ereport(ERROR,
    1882                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1883 ECB             :                  errmsg("subscription \"%s\" does not exist", name)));
    1884                 : 
    1885 CBC           6 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1886 GIC           6 :     subid = form->oid;
    1887 ECB             : 
    1888 GIC           6 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    1889 ECB             : 
    1890 GIC           3 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    1891                 : 
    1892               3 :     heap_freetuple(tup);
    1893 ECB             : 
    1894 GIC           3 :     table_close(rel, RowExclusiveLock);
    1895                 : 
    1896               3 :     return address;
    1897                 : }
    1898                 : 
    1899 ECB             : /*
    1900                 :  * Change subscription owner -- by OID
    1901                 :  */
    1902 EUB             : void
    1903 UIC           0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    1904 ECB             : {
    1905                 :     HeapTuple   tup;
    1906                 :     Relation    rel;
    1907                 : 
    1908 UIC           0 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    1909                 : 
    1910               0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    1911                 : 
    1912 LBC           0 :     if (!HeapTupleIsValid(tup))
    1913 UIC           0 :         ereport(ERROR,
    1914 ECB             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1915                 :                  errmsg("subscription with OID %u does not exist", subid)));
    1916                 : 
    1917 UIC           0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    1918                 : 
    1919               0 :     heap_freetuple(tup);
    1920                 : 
    1921               0 :     table_close(rel, RowExclusiveLock);
    1922               0 : }
    1923                 : 
    1924                 : /*
    1925                 :  * Check and log a warning if the publisher has subscribed to the same table
    1926                 :  * from some other publisher. This check is required only if "copy_data = true"
    1927                 :  * and "origin = none" for CREATE SUBSCRIPTION and
    1928                 :  * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
    1929                 :  * having origin might have been copied.
    1930                 :  *
    1931                 :  * This check need not be performed on the tables that are already added
    1932                 :  * because incremental sync for those tables will happen through WAL and the
    1933                 :  * origin of the data can be identified from the WAL records.
    1934                 :  *
    1935                 :  * subrel_local_oids contains the list of relation oids that are already
    1936                 :  * present on the subscriber.
    1937                 :  */
    1938                 : static void
    1939 GNC         109 : check_publications_origin(WalReceiverConn *wrconn, List *publications,
    1940                 :                           bool copydata, char *origin, Oid *subrel_local_oids,
    1941                 :                           int subrel_count, char *subname)
    1942                 : {
    1943                 :     WalRcvExecResult *res;
    1944                 :     StringInfoData cmd;
    1945                 :     TupleTableSlot *slot;
    1946             109 :     Oid         tableRow[1] = {TEXTOID};
    1947             109 :     List       *publist = NIL;
    1948                 :     int         i;
    1949                 : 
    1950             211 :     if (!copydata || !origin ||
    1951             102 :         (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
    1952             103 :         return;
    1953                 : 
    1954               6 :     initStringInfo(&cmd);
    1955               6 :     appendStringInfoString(&cmd,
    1956                 :                            "SELECT DISTINCT P.pubname AS pubname\n"
    1957                 :                            "FROM pg_publication P,\n"
    1958                 :                            "    LATERAL pg_get_publication_tables(P.pubname) GPT\n"
    1959                 :                            "    JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
    1960                 :                            "    pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
    1961                 :                            "WHERE C.oid = GPT.relid AND P.pubname IN (");
    1962               6 :     get_publications_str(publications, &cmd, true);
    1963               6 :     appendStringInfoString(&cmd, ")\n");
    1964                 : 
    1965                 :     /*
    1966                 :      * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
    1967                 :      * the list of relation oids that are already present on the subscriber.
    1968                 :      * This check should be skipped for these tables.
    1969                 :      */
    1970               9 :     for (i = 0; i < subrel_count; i++)
    1971                 :     {
    1972               3 :         Oid         relid = subrel_local_oids[i];
    1973               3 :         char       *schemaname = get_namespace_name(get_rel_namespace(relid));
    1974               3 :         char       *tablename = get_rel_name(relid);
    1975                 : 
    1976               3 :         appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
    1977                 :                          schemaname, tablename);
    1978                 :     }
    1979                 : 
    1980               6 :     res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
    1981               6 :     pfree(cmd.data);
    1982                 : 
    1983               6 :     if (res->status != WALRCV_OK_TUPLES)
    1984 UNC           0 :         ereport(ERROR,
    1985                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1986                 :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    1987                 :                         res->err)));
    1988                 : 
    1989                 :     /* Process tables. */
    1990 GNC           6 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1991               8 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1992                 :     {
    1993                 :         char       *pubname;
    1994                 :         bool        isnull;
    1995                 : 
    1996               2 :         pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    1997               2 :         Assert(!isnull);
    1998                 : 
    1999               2 :         ExecClearTuple(slot);
    2000               2 :         publist = list_append_unique(publist, makeString(pubname));
    2001                 :     }
    2002                 : 
    2003                 :     /*
    2004                 :      * Log a warning if the publisher has subscribed to the same table from
    2005                 :      * some other publisher. We cannot know the origin of data during the
    2006                 :      * initial sync. Data origins can be found only from the WAL by looking at
    2007                 :      * the origin id.
    2008                 :      *
    2009                 :      * XXX: For simplicity, we don't check whether the table has any data or
    2010                 :      * not. If the table doesn't have any data then we don't need to
    2011                 :      * distinguish between data having origin and data not having origin so we
    2012                 :      * can avoid logging a warning in that case.
    2013                 :      */
    2014               6 :     if (publist)
    2015                 :     {
    2016               2 :         StringInfo  pubnames = makeStringInfo();
    2017                 : 
    2018                 :         /* Prepare the list of publication(s) for warning message. */
    2019               2 :         get_publications_str(publist, pubnames, false);
    2020               2 :         ereport(WARNING,
    2021                 :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2022                 :                 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
    2023                 :                        subname),
    2024                 :                 errdetail_plural("Subscribed publication %s is subscribing to other publications.",
    2025                 :                                  "Subscribed publications %s are subscribing to other publications.",
    2026                 :                                  list_length(publist), pubnames->data),
    2027                 :                 errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
    2028                 :     }
    2029                 : 
    2030               6 :     ExecDropSingleTupleTableSlot(slot);
    2031                 : 
    2032               6 :     walrcv_clear_result(res);
    2033                 : }
    2034                 : 
    2035                 : /*
    2036 ECB             :  * Get the list of tables which belong to specified publications on the
    2037                 :  * publisher connection.
    2038                 :  *
    2039                 :  * Note that we don't support the case where the column list is different for
    2040                 :  * the same table in different publications to avoid sending unwanted column
    2041                 :  * information for some of the rows. This can happen when both the column
    2042                 :  * list and row filter are specified for different publications.
    2043                 :  */
    2044                 : static List *
    2045 CBC         109 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
    2046                 : {
    2047 ECB             :     WalRcvExecResult *res;
    2048                 :     StringInfoData cmd;
    2049                 :     TupleTableSlot *slot;
    2050 GNC         109 :     Oid         tableRow[3] = {TEXTOID, TEXTOID, InvalidOid};
    2051 CBC         109 :     List       *tablelist = NIL;
    2052 GNC         109 :     int         server_version = walrcv_server_version(wrconn);
    2053             109 :     bool        check_columnlist = (server_version >= 150000);
    2054 ECB             : 
    2055 GIC         109 :     initStringInfo(&cmd);
    2056 ECB             : 
    2057                 :     /* Get the list of tables from the publisher. */
    2058 GNC         109 :     if (server_version >= 160000)
    2059                 :     {
    2060                 :         StringInfoData pub_names;
    2061 EUB             : 
    2062 GNC         109 :         tableRow[2] = INT2VECTOROID;
    2063             109 :         initStringInfo(&pub_names);
    2064             109 :         get_publications_str(publications, &pub_names, true);
    2065                 : 
    2066                 :         /*
    2067                 :          * From version 16, we allowed passing multiple publications to the
    2068                 :          * function pg_get_publication_tables. This helped to filter out the
    2069                 :          * partition table whose ancestor is also published in this
    2070                 :          * publication array.
    2071                 :          *
    2072                 :          * Join pg_get_publication_tables with pg_publication to exclude
    2073                 :          * non-existing publications.
    2074                 :          *
    2075                 :          * Note that attrs are always stored in sorted order so we don't need
    2076                 :          * to worry if different publications have specified them in a
    2077                 :          * different order. See publication_translate_columns.
    2078                 :          */
    2079             109 :         appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
    2080                 :                          "       FROM pg_class c\n"
    2081                 :                          "         JOIN pg_namespace n ON n.oid = c.relnamespace\n"
    2082                 :                          "         JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
    2083                 :                          "                FROM pg_publication\n"
    2084                 :                          "                WHERE pubname IN ( %s )) AS gpt\n"
    2085                 :                          "             ON gpt.relid = c.oid\n",
    2086                 :                          pub_names.data);
    2087                 : 
    2088             109 :         pfree(pub_names.data);
    2089                 :     }
    2090                 :     else
    2091                 :     {
    2092 UNC           0 :         tableRow[2] = NAMEARRAYOID;
    2093               0 :         appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
    2094                 : 
    2095                 :         /* Get column lists for each relation if the publisher supports it */
    2096               0 :         if (check_columnlist)
    2097               0 :             appendStringInfoString(&cmd, ", t.attnames\n");
    2098                 : 
    2099               0 :         appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
    2100                 :                                " WHERE t.pubname IN (");
    2101               0 :         get_publications_str(publications, &cmd, true);
    2102               0 :         appendStringInfoChar(&cmd, ')');
    2103                 :     }
    2104 EUB             : 
    2105 GIC         109 :     res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
    2106             109 :     pfree(cmd.data);
    2107                 : 
    2108             109 :     if (res->status != WALRCV_OK_TUPLES)
    2109 UIC           0 :         ereport(ERROR,
    2110                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    2111 EUB             :                  errmsg("could not receive list of replicated tables from the publisher: %s",
    2112                 :                         res->err)));
    2113                 : 
    2114                 :     /* Process tables. */
    2115 GIC         109 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    2116             330 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    2117 ECB             :     {
    2118                 :         char       *nspname;
    2119 EUB             :         char       *relname;
    2120                 :         bool        isnull;
    2121 ECB             :         RangeVar   *rv;
    2122                 : 
    2123 CBC         222 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    2124             222 :         Assert(!isnull);
    2125 GIC         222 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    2126             222 :         Assert(!isnull);
    2127                 : 
    2128             222 :         rv = makeRangeVar(nspname, relname, -1);
    2129                 : 
    2130 CBC         222 :         if (check_columnlist && list_member(tablelist, rv))
    2131 GIC           1 :             ereport(ERROR,
    2132                 :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2133                 :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    2134                 :                            nspname, relname));
    2135 ECB             :         else
    2136 GIC         221 :             tablelist = lappend(tablelist, rv);
    2137 ECB             : 
    2138 GBC         221 :         ExecClearTuple(slot);
    2139                 :     }
    2140 CBC         108 :     ExecDropSingleTupleTableSlot(slot);
    2141 EUB             : 
    2142 GBC         108 :     walrcv_clear_result(res);
    2143                 : 
    2144 GIC         108 :     return tablelist;
    2145                 : }
    2146                 : 
    2147                 : /*
    2148 ECB             :  * This is to report the connection failure while dropping replication slots.
    2149 EUB             :  * Here, we report the WARNING for all tablesync slots so that user can drop
    2150                 :  * them manually, if required.
    2151                 :  */
    2152                 : static void
    2153 UIC           0 : ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
    2154                 : {
    2155 ECB             :     ListCell   *lc;
    2156                 : 
    2157 UIC           0 :     foreach(lc, rstates)
    2158                 :     {
    2159               0 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
    2160               0 :         Oid         relid = rstate->relid;
    2161                 : 
    2162                 :         /* Only cleanup resources of tablesync workers */
    2163               0 :         if (!OidIsValid(relid))
    2164 LBC           0 :             continue;
    2165                 : 
    2166 ECB             :         /*
    2167 EUB             :          * Caller needs to ensure that relstate doesn't change underneath us.
    2168                 :          * See DropSubscription where we get the relstates.
    2169                 :          */
    2170 LBC           0 :         if (rstate->state != SUBREL_STATE_SYNCDONE)
    2171 ECB             :         {
    2172 UIC           0 :             char        syncslotname[NAMEDATALEN] = {0};
    2173                 : 
    2174 LBC           0 :             ReplicationSlotNameForTablesync(subid, relid, syncslotname,
    2175                 :                                             sizeof(syncslotname));
    2176 UIC           0 :             elog(WARNING, "could not drop tablesync replication slot \"%s\"",
    2177                 :                  syncslotname);
    2178 ECB             :         }
    2179                 :     }
    2180                 : 
    2181 UIC           0 :     ereport(ERROR,
    2182 ECB             :             (errcode(ERRCODE_CONNECTION_FAILURE),
    2183                 :              errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
    2184                 :                     slotname, err),
    2185                 :     /* translator: %s is an SQL ALTER command */
    2186                 :              errhint("Use %s to disassociate the subscription from the slot.",
    2187                 :                      "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
    2188                 : }
    2189                 : 
    2190                 : /*
    2191                 :  * Check for duplicates in the given list of publications and error out if
    2192                 :  * found one.  Add publications to datums as text datums, if datums is not
    2193                 :  * NULL.
    2194                 :  */
    2195                 : static void
    2196 GIC         174 : check_duplicates_in_publist(List *publist, Datum *datums)
    2197                 : {
    2198 ECB             :     ListCell   *cell;
    2199 GIC         174 :     int         j = 0;
    2200 ECB             : 
    2201 GIC         409 :     foreach(cell, publist)
    2202                 :     {
    2203 CBC         244 :         char       *name = strVal(lfirst(cell));
    2204 EUB             :         ListCell   *pcell;
    2205                 : 
    2206 GIC         380 :         foreach(pcell, publist)
    2207                 :         {
    2208 CBC         380 :             char       *pname = strVal(lfirst(pcell));
    2209 ECB             : 
    2210 GIC         380 :             if (pcell == cell)
    2211 CBC         235 :                 break;
    2212                 : 
    2213             145 :             if (strcmp(name, pname) == 0)
    2214 GIC           9 :                 ereport(ERROR,
    2215 ECB             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    2216                 :                          errmsg("publication name \"%s\" used more than once",
    2217                 :                                 pname)));
    2218                 :         }
    2219                 : 
    2220 GIC         235 :         if (datums)
    2221             192 :             datums[j++] = CStringGetTextDatum(name);
    2222                 :     }
    2223             165 : }
    2224                 : 
    2225                 : /*
    2226 EUB             :  * Merge current subscription's publications and user-specified publications
    2227                 :  * from ADD/DROP PUBLICATIONS.
    2228                 :  *
    2229                 :  * If addpub is true, we will add the list of publications into oldpublist.
    2230                 :  * Otherwise, we will delete the list of publications from oldpublist.  The
    2231                 :  * returned list is a copy, oldpublist itself is not changed.
    2232                 :  *
    2233                 :  * subname is the subscription name, for error messages.
    2234                 :  */
    2235                 : static List *
    2236 GBC          27 : merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
    2237                 : {
    2238                 :     ListCell   *lc;
    2239                 : 
    2240              27 :     oldpublist = list_copy(oldpublist);
    2241                 : 
    2242              27 :     check_duplicates_in_publist(newpublist, NULL);
    2243                 : 
    2244              46 :     foreach(lc, newpublist)
    2245 EUB             :     {
    2246 GIC          34 :         char       *name = strVal(lfirst(lc));
    2247                 :         ListCell   *lc2;
    2248              34 :         bool        found = false;
    2249                 : 
    2250              67 :         foreach(lc2, oldpublist)
    2251                 :         {
    2252              55 :             char       *pubname = strVal(lfirst(lc2));
    2253                 : 
    2254              55 :             if (strcmp(name, pubname) == 0)
    2255                 :             {
    2256              22 :                 found = true;
    2257              22 :                 if (addpub)
    2258               6 :                     ereport(ERROR,
    2259                 :                             (errcode(ERRCODE_DUPLICATE_OBJECT),
    2260                 :                              errmsg("publication \"%s\" is already in subscription \"%s\"",
    2261                 :                                     name, subname)));
    2262 ECB             :                 else
    2263 GIC          16 :                     oldpublist = foreach_delete_current(oldpublist, lc2);
    2264                 : 
    2265              16 :                 break;
    2266                 :             }
    2267                 :         }
    2268                 : 
    2269 CBC          28 :         if (addpub && !found)
    2270               9 :             oldpublist = lappend(oldpublist, makeString(name));
    2271 GIC          19 :         else if (!addpub && !found)
    2272               3 :             ereport(ERROR,
    2273 ECB             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2274                 :                      errmsg("publication \"%s\" is not in subscription \"%s\"",
    2275                 :                             name, subname)));
    2276                 :     }
    2277                 : 
    2278                 :     /*
    2279                 :      * XXX Probably no strong reason for this, but for now it's to make ALTER
    2280                 :      * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
    2281                 :      */
    2282 GIC          12 :     if (!oldpublist)
    2283               3 :         ereport(ERROR,
    2284                 :                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    2285 ECB             :                  errmsg("cannot drop all the publications from a subscription")));
    2286                 : 
    2287 GIC           9 :     return oldpublist;
    2288                 : }
    2289                 : 
    2290                 : /*
    2291                 :  * Extract the streaming mode value from a DefElem.  This is like
    2292                 :  * defGetBoolean() but also accepts the special value of "parallel".
    2293                 :  */
    2294                 : char
    2295 GNC          65 : defGetStreamingMode(DefElem *def)
    2296                 : {
    2297                 :     /*
    2298                 :      * If no parameter value given, assume "true" is meant.
    2299                 :      */
    2300              65 :     if (!def->arg)
    2301 UNC           0 :         return LOGICALREP_STREAM_ON;
    2302                 : 
    2303                 :     /*
    2304                 :      * Allow 0, 1, "false", "true", "off", "on" or "parallel".
    2305                 :      */
    2306 GNC          65 :     switch (nodeTag(def->arg))
    2307                 :     {
    2308 UNC           0 :         case T_Integer:
    2309               0 :             switch (intVal(def->arg))
    2310                 :             {
    2311               0 :                 case 0:
    2312               0 :                     return LOGICALREP_STREAM_OFF;
    2313               0 :                 case 1:
    2314               0 :                     return LOGICALREP_STREAM_ON;
    2315               0 :                 default:
    2316                 :                     /* otherwise, error out below */
    2317               0 :                     break;
    2318                 :             }
    2319               0 :             break;
    2320 GNC          65 :         default:
    2321                 :             {
    2322              65 :                 char       *sval = defGetString(def);
    2323                 : 
    2324                 :                 /*
    2325                 :                  * The set of strings accepted here should match up with the
    2326                 :                  * grammar's opt_boolean_or_string production.
    2327                 :                  */
    2328             127 :                 if (pg_strcasecmp(sval, "false") == 0 ||
    2329              62 :                     pg_strcasecmp(sval, "off") == 0)
    2330               3 :                     return LOGICALREP_STREAM_OFF;
    2331             115 :                 if (pg_strcasecmp(sval, "true") == 0 ||
    2332              53 :                     pg_strcasecmp(sval, "on") == 0)
    2333              44 :                     return LOGICALREP_STREAM_ON;
    2334              18 :                 if (pg_strcasecmp(sval, "parallel") == 0)
    2335              15 :                     return LOGICALREP_STREAM_PARALLEL;
    2336                 :             }
    2337               3 :             break;
    2338                 :     }
    2339                 : 
    2340               3 :     ereport(ERROR,
    2341                 :             (errcode(ERRCODE_SYNTAX_ERROR),
    2342                 :              errmsg("%s requires a Boolean value or \"parallel\"",
    2343                 :                     def->defname)));
    2344                 :     return LOGICALREP_STREAM_OFF;   /* keep compiler quiet */
    2345                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a