LCOV - differential code coverage report
Current view: top level - src/backend/commands - dbcommands.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 80.1 % 1041 834 12 38 144 13 53 430 82 269 134 499 7 28
Current Date: 2023-04-08 15:15:32 Functions: 85.7 % 28 24 4 24 4 23 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * dbcommands.c
       4                 :  *      Database management commands (create/drop database).
       5                 :  *
       6                 :  * Note: database creation/destruction commands use exclusive locks on
       7                 :  * the database objects (as expressed by LockSharedObject()) to avoid
       8                 :  * stepping on each others' toes.  Formerly we used table-level locks
       9                 :  * on pg_database, but that's too coarse-grained.
      10                 :  *
      11                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      12                 :  * Portions Copyright (c) 1994, Regents of the University of California
      13                 :  *
      14                 :  *
      15                 :  * IDENTIFICATION
      16                 :  *    src/backend/commands/dbcommands.c
      17                 :  *
      18                 :  *-------------------------------------------------------------------------
      19                 :  */
      20                 : #include "postgres.h"
      21                 : 
      22                 : #include <fcntl.h>
      23                 : #include <unistd.h>
      24                 : #include <sys/stat.h>
      25                 : 
      26                 : #include "access/genam.h"
      27                 : #include "access/heapam.h"
      28                 : #include "access/htup_details.h"
      29                 : #include "access/multixact.h"
      30                 : #include "access/tableam.h"
      31                 : #include "access/xact.h"
      32                 : #include "access/xloginsert.h"
      33                 : #include "access/xlogrecovery.h"
      34                 : #include "access/xlogutils.h"
      35                 : #include "catalog/catalog.h"
      36                 : #include "catalog/dependency.h"
      37                 : #include "catalog/indexing.h"
      38                 : #include "catalog/objectaccess.h"
      39                 : #include "catalog/pg_authid.h"
      40                 : #include "catalog/pg_collation.h"
      41                 : #include "catalog/pg_database.h"
      42                 : #include "catalog/pg_db_role_setting.h"
      43                 : #include "catalog/pg_subscription.h"
      44                 : #include "catalog/pg_tablespace.h"
      45                 : #include "commands/comment.h"
      46                 : #include "commands/dbcommands.h"
      47                 : #include "commands/dbcommands_xlog.h"
      48                 : #include "commands/defrem.h"
      49                 : #include "commands/seclabel.h"
      50                 : #include "commands/tablespace.h"
      51                 : #include "common/file_perm.h"
      52                 : #include "mb/pg_wchar.h"
      53                 : #include "miscadmin.h"
      54                 : #include "pgstat.h"
      55                 : #include "postmaster/bgwriter.h"
      56                 : #include "replication/slot.h"
      57                 : #include "storage/copydir.h"
      58                 : #include "storage/fd.h"
      59                 : #include "storage/ipc.h"
      60                 : #include "storage/lmgr.h"
      61                 : #include "storage/md.h"
      62                 : #include "storage/procarray.h"
      63                 : #include "storage/smgr.h"
      64                 : #include "utils/acl.h"
      65                 : #include "utils/builtins.h"
      66                 : #include "utils/fmgroids.h"
      67                 : #include "utils/guc.h"
      68                 : #include "utils/pg_locale.h"
      69                 : #include "utils/relmapper.h"
      70                 : #include "utils/snapmgr.h"
      71                 : #include "utils/syscache.h"
      72                 : 
      73                 : /*
      74                 :  * Create database strategy.
      75                 :  *
      76                 :  * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
      77                 :  * copied block.
      78                 :  *
      79                 :  * CREATEDB_FILE_COPY will simply perform a file system level copy of the
      80                 :  * database and log a single record for each tablespace copied. To make this
      81                 :  * safe, it also triggers checkpoints before and after the operation.
      82                 :  */
      83                 : typedef enum CreateDBStrategy
      84                 : {
      85                 :     CREATEDB_WAL_LOG,
      86                 :     CREATEDB_FILE_COPY
      87                 : } CreateDBStrategy;
      88                 : 
      89                 : typedef struct
      90                 : {
      91                 :     Oid         src_dboid;      /* source (template) DB */
      92                 :     Oid         dest_dboid;     /* DB we are trying to create */
      93                 :     CreateDBStrategy strategy;  /* create db strategy */
      94                 : } createdb_failure_params;
      95                 : 
      96                 : typedef struct
      97                 : {
      98                 :     Oid         dest_dboid;     /* DB we are trying to move */
      99                 :     Oid         dest_tsoid;     /* tablespace we are trying to move to */
     100                 : } movedb_failure_params;
     101                 : 
     102                 : /*
     103                 :  * Information about a relation to be copied when creating a database.
     104                 :  */
     105                 : typedef struct CreateDBRelInfo
     106                 : {
     107                 :     RelFileLocator rlocator;    /* physical relation identifier */
     108                 :     Oid         reloid;         /* relation oid */
     109                 :     bool        permanent;      /* relation is permanent or unlogged */
     110                 : } CreateDBRelInfo;
     111                 : 
     112                 : 
     113                 : /* non-export function prototypes */
     114                 : static void createdb_failure_callback(int code, Datum arg);
     115                 : static void movedb(const char *dbname, const char *tblspcname);
     116                 : static void movedb_failure_callback(int code, Datum arg);
     117                 : static bool get_db_info(const char *name, LOCKMODE lockmode,
     118                 :                         Oid *dbIdP, Oid *ownerIdP,
     119                 :                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
     120                 :                         TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
     121                 :                         Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbIculocale,
     122                 :                         char **dbIcurules,
     123                 :                         char *dbLocProvider,
     124                 :                         char **dbCollversion);
     125                 : static void remove_dbtablespaces(Oid db_id);
     126                 : static bool check_db_file_conflict(Oid db_id);
     127                 : static int  errdetail_busy_db(int notherbackends, int npreparedxacts);
     128                 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     129                 :                                       Oid dst_tsid);
     130                 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
     131                 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
     132                 :                                            Oid dbid, char *srcpath,
     133                 :                                            List *rlocatorlist, Snapshot snapshot);
     134                 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
     135                 :                                                        Oid tbid, Oid dbid,
     136                 :                                                        char *srcpath);
     137                 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
     138                 :                                     bool isRedo);
     139                 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
     140                 :                                         Oid src_tsid, Oid dst_tsid);
     141                 : static void recovery_create_dbdir(char *path, bool only_tblspc);
     142                 : 
     143                 : /*
     144                 :  * Create a new database using the WAL_LOG strategy.
     145                 :  *
     146                 :  * Each copied block is separately written to the write-ahead log.
     147                 :  */
     148                 : static void
     149 GIC         184 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
     150 ECB             :                           Oid src_tsid, Oid dst_tsid)
     151                 : {
     152                 :     char       *srcpath;
     153                 :     char       *dstpath;
     154 GNC         184 :     List       *rlocatorlist = NULL;
     155 ECB             :     ListCell   *cell;
     156                 :     LockRelId   srcrelid;
     157                 :     LockRelId   dstrelid;
     158                 :     RelFileLocator srcrlocator;
     159                 :     RelFileLocator dstrlocator;
     160                 :     CreateDBRelInfo *relinfo;
     161                 : 
     162                 :     /* Get source and destination database paths. */
     163 GIC         184 :     srcpath = GetDatabasePath(src_dboid, src_tsid);
     164 CBC         184 :     dstpath = GetDatabasePath(dst_dboid, dst_tsid);
     165 ECB             : 
     166                 :     /* Create database directory and write PG_VERSION file. */
     167 GIC         184 :     CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
     168 ECB             : 
     169                 :     /* Copy relmap file from source database to the destination database. */
     170 GIC         184 :     RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
     171 ECB             : 
     172                 :     /* Get list of relfilelocators to copy from the source database. */
     173 GNC         184 :     rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
     174             184 :     Assert(rlocatorlist != NIL);
     175 ECB             : 
     176                 :     /*
     177                 :      * Database IDs will be the same for all relations so set them before
     178                 :      * entering the loop.
     179                 :      */
     180 GIC         184 :     srcrelid.dbId = src_dboid;
     181 CBC         184 :     dstrelid.dbId = dst_dboid;
     182 ECB             : 
     183                 :     /* Loop over our list of relfilelocators and copy each one. */
     184 GNC       41045 :     foreach(cell, rlocatorlist)
     185 ECB             :     {
     186 GIC       40861 :         relinfo = lfirst(cell);
     187 GNC       40861 :         srcrlocator = relinfo->rlocator;
     188 ECB             : 
     189                 :         /*
     190                 :          * If the relation is from the source db's default tablespace then we
     191                 :          * need to create it in the destination db's default tablespace.
     192                 :          * Otherwise, we need to create in the same tablespace as it is in the
     193                 :          * source database.
     194                 :          */
     195 GNC       40861 :         if (srcrlocator.spcOid == src_tsid)
     196           40861 :             dstrlocator.spcOid = dst_tsid;
     197 ECB             :         else
     198 UNC           0 :             dstrlocator.spcOid = srcrlocator.spcOid;
     199 EUB             : 
     200 GNC       40861 :         dstrlocator.dbOid = dst_dboid;
     201           40861 :         dstrlocator.relNumber = srcrlocator.relNumber;
     202 ECB             : 
     203                 :         /*
     204                 :          * Acquire locks on source and target relations before copying.
     205                 :          *
     206                 :          * We typically do not read relation data into shared_buffers without
     207                 :          * holding a relation lock. It's unclear what could go wrong if we
     208                 :          * skipped it in this case, because nobody can be modifying either the
     209                 :          * source or destination database at this point, and we have locks on
     210                 :          * both databases, too, but let's take the conservative route.
     211                 :          */
     212 GIC       40861 :         dstrelid.relId = srcrelid.relId = relinfo->reloid;
     213 CBC       40861 :         LockRelationId(&srcrelid, AccessShareLock);
     214           40861 :         LockRelationId(&dstrelid, AccessShareLock);
     215 ECB             : 
     216                 :         /* Copy relation storage from source to the destination. */
     217 GNC       40861 :         CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
     218 ECB             : 
     219                 :         /* Release the relation locks. */
     220 GIC       40861 :         UnlockRelationId(&srcrelid, AccessShareLock);
     221 CBC       40861 :         UnlockRelationId(&dstrelid, AccessShareLock);
     222 ECB             :     }
     223                 : 
     224 GIC         184 :     pfree(srcpath);
     225 CBC         184 :     pfree(dstpath);
     226 GNC         184 :     list_free_deep(rlocatorlist);
     227 CBC         184 : }
     228 ECB             : 
     229                 : /*
     230                 :  * Scan the pg_class table in the source database to identify the relations
     231                 :  * that need to be copied to the destination database.
     232                 :  *
     233                 :  * This is an exception to the usual rule that cross-database access is
     234                 :  * not possible. We can make it work here because we know that there are no
     235                 :  * connections to the source database and (since there can't be prepared
     236                 :  * transactions touching that database) no in-doubt tuples either. This
     237                 :  * means that we don't need to worry about pruning removing anything from
     238                 :  * under us, and we don't need to be too picky about our snapshot either.
     239                 :  * As long as it sees all previously-committed XIDs as committed and all
     240                 :  * aborted XIDs as aborted, we should be fine: nothing else is possible
     241                 :  * here.
     242                 :  *
     243                 :  * We can't rely on the relcache for anything here, because that only knows
     244                 :  * about the database to which we are connected, and can't handle access to
     245                 :  * other databases. That also means we can't rely on the heap scan
     246                 :  * infrastructure, which would be a bad idea anyway since it might try
     247                 :  * to do things like HOT pruning which we definitely can't do safely in
     248                 :  * a database to which we're not even connected.
     249                 :  */
     250                 : static List *
     251 GIC         184 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
     252 ECB             : {
     253                 :     RelFileLocator rlocator;
     254                 :     BlockNumber nblocks;
     255                 :     BlockNumber blkno;
     256                 :     Buffer      buf;
     257                 :     RelFileNumber relfilenumber;
     258                 :     Page        page;
     259 GNC         184 :     List       *rlocatorlist = NIL;
     260 ECB             :     LockRelId   relid;
     261                 :     Snapshot    snapshot;
     262                 :     SMgrRelation    smgr;
     263                 :     BufferAccessStrategy bstrategy;
     264                 : 
     265                 :     /* Get pg_class relfilenumber. */
     266 GNC         184 :     relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     267                 :                                                           RelationRelationId);
     268                 : 
     269                 :     /* Don't read data into shared_buffers without holding a relation lock. */
     270 GIC         184 :     relid.dbId = dbid;
     271 CBC         184 :     relid.relId = RelationRelationId;
     272             184 :     LockRelationId(&relid, AccessShareLock);
     273 ECB             : 
     274                 :     /* Prepare a RelFileLocator for the pg_class relation. */
     275 GNC         184 :     rlocator.spcOid = tbid;
     276             184 :     rlocator.dbOid = dbid;
     277             184 :     rlocator.relNumber = relfilenumber;
     278 ECB             : 
     279 GNC         184 :     smgr = smgropen(rlocator, InvalidBackendId);
     280 CBC         184 :     nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
     281             184 :     smgrclose(smgr);
     282 ECB             : 
     283                 :     /* Use a buffer access strategy since this is a bulk read operation. */
     284 GIC         184 :     bstrategy = GetAccessStrategy(BAS_BULKREAD);
     285 ECB             : 
     286                 :     /*
     287                 :      * As explained in the function header comments, we need a snapshot that
     288                 :      * will see all committed transactions as committed, and our transaction
     289                 :      * snapshot - or the active snapshot - might not be new enough for that,
     290                 :      * but the return value of GetLatestSnapshot() should work fine.
     291                 :      */
     292 GIC         184 :     snapshot = GetLatestSnapshot();
     293 ECB             : 
     294                 :     /* Process the relation block by block. */
     295 GIC        2772 :     for (blkno = 0; blkno < nblocks; blkno++)
     296 ECB             :     {
     297 GIC        2588 :         CHECK_FOR_INTERRUPTS();
     298 ECB             : 
     299 GNC        2588 :         buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
     300 ECB             :                                         RBM_NORMAL, bstrategy, true);
     301                 : 
     302 GIC        2588 :         LockBuffer(buf, BUFFER_LOCK_SHARE);
     303 CBC        2588 :         page = BufferGetPage(buf);
     304            2588 :         if (PageIsNew(page) || PageIsEmpty(page))
     305 ECB             :         {
     306 UIC           0 :             UnlockReleaseBuffer(buf);
     307 UBC           0 :             continue;
     308 EUB             :         }
     309                 : 
     310                 :         /* Append relevant pg_class tuples for current page to rlocatorlist. */
     311 GNC        2588 :         rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
     312                 :                                                      srcpath, rlocatorlist,
     313                 :                                                      snapshot);
     314                 : 
     315 GIC        2588 :         UnlockReleaseBuffer(buf);
     316 ECB             :     }
     317                 : 
     318                 :     /* Release relation lock. */
     319 GIC         184 :     UnlockRelationId(&relid, AccessShareLock);
     320 ECB             : 
     321 GNC         184 :     return rlocatorlist;
     322 ECB             : }
     323                 : 
     324                 : /*
     325                 :  * Scan one page of the source database's pg_class relation and add relevant
     326                 :  * entries to rlocatorlist. The return value is the updated list.
     327                 :  */
     328                 : static List *
     329 GIC        2588 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
     330                 :                               char *srcpath, List *rlocatorlist,
     331                 :                               Snapshot snapshot)
     332                 : {
     333            2588 :     BlockNumber blkno = BufferGetBlockNumber(buf);
     334 ECB             :     OffsetNumber offnum;
     335                 :     OffsetNumber maxoff;
     336                 :     HeapTupleData tuple;
     337                 : 
     338 GIC        2588 :     maxoff = PageGetMaxOffsetNumber(page);
     339 ECB             : 
     340                 :     /* Loop over offsets. */
     341 GIC        2588 :     for (offnum = FirstOffsetNumber;
     342 CBC      131298 :          offnum <= maxoff;
     343          128710 :          offnum = OffsetNumberNext(offnum))
     344 ECB             :     {
     345                 :         ItemId      itemid;
     346                 : 
     347 GIC      128710 :         itemid = PageGetItemId(page, offnum);
     348 ECB             : 
     349                 :         /* Nothing to do if slot is empty or already dead. */
     350 GIC      128710 :         if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
     351 CBC       92258 :             ItemIdIsRedirected(itemid))
     352           52052 :             continue;
     353 ECB             : 
     354 GIC       76658 :         Assert(ItemIdIsNormal(itemid));
     355 CBC       76658 :         ItemPointerSet(&(tuple.t_self), blkno, offnum);
     356 ECB             : 
     357                 :         /* Initialize a HeapTupleData structure. */
     358 GIC       76658 :         tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
     359 CBC       76658 :         tuple.t_len = ItemIdGetLength(itemid);
     360           76658 :         tuple.t_tableOid = RelationRelationId;
     361 ECB             : 
     362                 :         /* Skip tuples that are not visible to this snapshot. */
     363 GIC       76658 :         if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
     364 ECB             :         {
     365                 :             CreateDBRelInfo *relinfo;
     366                 : 
     367                 :             /*
     368                 :              * ScanSourceDatabasePgClassTuple is in charge of constructing a
     369                 :              * CreateDBRelInfo object for this tuple, but can also decide that
     370                 :              * this tuple isn't something we need to copy. If we do need to
     371                 :              * copy the relation, add it to the list.
     372                 :              */
     373 GIC       76005 :             relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
     374 ECB             :                                                      srcpath);
     375 GIC       76005 :             if (relinfo != NULL)
     376 GNC       40861 :                 rlocatorlist = lappend(rlocatorlist, relinfo);
     377 ECB             :         }
     378                 :     }
     379                 : 
     380 GNC        2588 :     return rlocatorlist;
     381 ECB             : }
     382                 : 
     383                 : /*
     384                 :  * Decide whether a certain pg_class tuple represents something that
     385                 :  * needs to be copied from the source database to the destination database,
     386                 :  * and if so, construct a CreateDBRelInfo for it.
     387                 :  *
     388                 :  * Visibility checks are handled by the caller, so our job here is just
     389                 :  * to assess the data stored in the tuple.
     390                 :  */
     391                 : CreateDBRelInfo *
     392 GIC       76005 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
     393 ECB             :                                char *srcpath)
     394                 : {
     395                 :     CreateDBRelInfo *relinfo;
     396                 :     Form_pg_class classForm;
     397 GNC       76005 :     RelFileNumber relfilenumber = InvalidRelFileNumber;
     398 ECB             : 
     399 GIC       76005 :     classForm = (Form_pg_class) GETSTRUCT(tuple);
     400 ECB             : 
     401                 :     /*
     402                 :      * Return NULL if this object does not need to be copied.
     403                 :      *
     404                 :      * Shared objects don't need to be copied, because they are shared.
     405                 :      * Objects without storage can't be copied, because there's nothing to
     406                 :      * copy. Temporary relations don't need to be copied either, because they
     407                 :      * are inaccessible outside of the session that created them, which must
     408                 :      * be gone already, and couldn't connect to a different database if it
     409                 :      * still existed. autovacuum will eventually remove the pg_class entries
     410                 :      * as well.
     411                 :      */
     412 GIC       76005 :     if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
     413 CBC       66805 :         !RELKIND_HAS_STORAGE(classForm->relkind) ||
     414           40861 :         classForm->relpersistence == RELPERSISTENCE_TEMP)
     415           35144 :         return NULL;
     416 ECB             : 
     417                 :     /*
     418                 :      * If relfilenumber is valid then directly use it.  Otherwise, consult the
     419                 :      * relmap.
     420                 :      */
     421 GNC       40861 :     if (RelFileNumberIsValid(classForm->relfilenode))
     422           37733 :         relfilenumber = classForm->relfilenode;
     423 ECB             :     else
     424 GNC        3128 :         relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
     425                 :                                                               classForm->oid);
     426                 : 
     427                 :     /* We must have a valid relfilenumber. */
     428           40861 :     if (!RelFileNumberIsValid(relfilenumber))
     429 UNC           0 :         elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
     430 EUB             :              classForm->oid);
     431                 : 
     432                 :     /* Prepare a rel info element and add it to the list. */
     433 GIC       40861 :     relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
     434 CBC       40861 :     if (OidIsValid(classForm->reltablespace))
     435 UNC           0 :         relinfo->rlocator.spcOid = classForm->reltablespace;
     436 EUB             :     else
     437 GNC       40861 :         relinfo->rlocator.spcOid = tbid;
     438 ECB             : 
     439 GNC       40861 :     relinfo->rlocator.dbOid = dbid;
     440           40861 :     relinfo->rlocator.relNumber = relfilenumber;
     441 CBC       40861 :     relinfo->reloid = classForm->oid;
     442 ECB             : 
     443                 :     /* Temporary relations were rejected above. */
     444 GIC       40861 :     Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
     445 CBC       40861 :     relinfo->permanent =
     446           40861 :         (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
     447 ECB             : 
     448 GIC       40861 :     return relinfo;
     449 ECB             : }
     450                 : 
     451                 : /*
     452                 :  * Create database directory and write out the PG_VERSION file in the database
     453                 :  * path.  If isRedo is true, it's okay for the database directory to exist
     454                 :  * already.
     455                 :  */
     456                 : static void
     457 GIC         202 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
     458 ECB             : {
     459                 :     int         fd;
     460                 :     int         nbytes;
     461                 :     char        versionfile[MAXPGPATH];
     462                 :     char        buf[16];
     463                 : 
     464                 :     /*
     465                 :      * Prepare version data before starting a critical section.
     466                 :      *
     467                 :      * Note that we don't have to copy this from the source database; there's
     468                 :      * only one legal value.
     469                 :      */
     470 GIC         202 :     sprintf(buf, "%s\n", PG_MAJORVERSION);
     471 CBC         202 :     nbytes = strlen(PG_MAJORVERSION) + 1;
     472 ECB             : 
     473                 :     /* If we are not in WAL replay then write the WAL. */
     474 GIC         202 :     if (!isRedo)
     475 ECB             :     {
     476                 :         xl_dbase_create_wal_log_rec xlrec;
     477                 :         XLogRecPtr  lsn;
     478                 : 
     479 GIC         184 :         START_CRIT_SECTION();
     480 ECB             : 
     481 GIC         184 :         xlrec.db_id = dbid;
     482 CBC         184 :         xlrec.tablespace_id = tsid;
     483 ECB             : 
     484 GIC         184 :         XLogBeginInsert();
     485 CBC         184 :         XLogRegisterData((char *) (&xlrec),
     486 ECB             :                          sizeof(xl_dbase_create_wal_log_rec));
     487                 : 
     488 GIC         184 :         lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
     489 ECB             : 
     490                 :         /* As always, WAL must hit the disk before the data update does. */
     491 GIC         184 :         XLogFlush(lsn);
     492 ECB             :     }
     493                 : 
     494                 :     /* Create database directory. */
     495 GIC         202 :     if (MakePGDirectory(dbpath) < 0)
     496 ECB             :     {
     497                 :         /* Failure other than already exists or not in WAL replay? */
     498 GIC           8 :         if (errno != EEXIST || !isRedo)
     499 LBC           0 :             ereport(ERROR,
     500 EUB             :                     (errcode_for_file_access(),
     501                 :                      errmsg("could not create directory \"%s\": %m", dbpath)));
     502                 :     }
     503                 : 
     504                 :     /*
     505                 :      * Create PG_VERSION file in the database path.  If the file already
     506                 :      * exists and we are in WAL replay then try again to open it in write
     507                 :      * mode.
     508                 :      */
     509 GIC         202 :     snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
     510 ECB             : 
     511 GIC         202 :     fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
     512 CBC         202 :     if (fd < 0 && errno == EEXIST && isRedo)
     513               8 :         fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
     514 ECB             : 
     515 GIC         202 :     if (fd < 0)
     516 LBC           0 :         ereport(ERROR,
     517 EUB             :                 (errcode_for_file_access(),
     518                 :                  errmsg("could not create file \"%s\": %m", versionfile)));
     519                 : 
     520                 :     /* Write PG_MAJORVERSION in the PG_VERSION file. */
     521 GIC         202 :     pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
     522 CBC         202 :     errno = 0;
     523             202 :     if ((int) write(fd, buf, nbytes) != nbytes)
     524 ECB             :     {
     525                 :         /* If write didn't set errno, assume problem is no disk space. */
     526 UIC           0 :         if (errno == 0)
     527 UBC           0 :             errno = ENOSPC;
     528               0 :         ereport(ERROR,
     529 EUB             :                 (errcode_for_file_access(),
     530                 :                  errmsg("could not write to file \"%s\": %m", versionfile)));
     531                 :     }
     532 GIC         202 :     pgstat_report_wait_end();
     533 ECB             : 
     534                 :     /* Close the version file. */
     535 GIC         202 :     CloseTransientFile(fd);
     536 ECB             : 
     537                 :     /* Critical section done. */
     538 GIC         202 :     if (!isRedo)
     539 CBC         184 :         END_CRIT_SECTION();
     540             202 : }
     541 ECB             : 
     542                 : /*
     543                 :  * Create a new database using the FILE_COPY strategy.
     544                 :  *
     545                 :  * Copy each tablespace at the filesystem level, and log a single WAL record
     546                 :  * for each tablespace copied.  This requires a checkpoint before and after the
     547                 :  * copy, which may be expensive, but it does greatly reduce WAL generation
     548                 :  * if the copied database is large.
     549                 :  */
     550                 : static void
     551 GIC         613 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
     552 ECB             :                             Oid dst_tsid)
     553                 : {
     554                 :     TableScanDesc scan;
     555                 :     Relation    rel;
     556                 :     HeapTuple   tuple;
     557                 : 
     558                 :     /*
     559                 :      * Force a checkpoint before starting the copy. This will force all dirty
     560                 :      * buffers, including those of unlogged tables, out to disk, to ensure
     561                 :      * source database is up-to-date on disk for the copy.
     562                 :      * FlushDatabaseBuffers() would suffice for that, but we also want to
     563                 :      * process any pending unlink requests. Otherwise, if a checkpoint
     564                 :      * happened while we're copying files, a file might be deleted just when
     565                 :      * we're about to copy it, causing the lstat() call in copydir() to fail
     566                 :      * with ENOENT.
     567                 :      */
     568 GIC         613 :     RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
     569 ECB             :                       CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
     570                 : 
     571                 :     /*
     572                 :      * Iterate through all tablespaces of the template database, and copy each
     573                 :      * one to the new database.
     574                 :      */
     575 GIC         613 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
     576 CBC         613 :     scan = table_beginscan_catalog(rel, 0, NULL);
     577            1857 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     578 ECB             :     {
     579 GIC        1244 :         Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
     580 CBC        1244 :         Oid         srctablespace = spaceform->oid;
     581 ECB             :         Oid         dsttablespace;
     582                 :         char       *srcpath;
     583                 :         char       *dstpath;
     584                 :         struct stat st;
     585                 : 
     586                 :         /* No need to copy global tablespace */
     587 GIC        1244 :         if (srctablespace == GLOBALTABLESPACE_OID)
     588 CBC         631 :             continue;
     589 ECB             : 
     590 GIC         631 :         srcpath = GetDatabasePath(src_dboid, srctablespace);
     591 ECB             : 
     592 GIC        1244 :         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
     593 CBC         613 :             directory_is_empty(srcpath))
     594 ECB             :         {
     595                 :             /* Assume we can ignore it */
     596 GIC          18 :             pfree(srcpath);
     597 CBC          18 :             continue;
     598 ECB             :         }
     599                 : 
     600 GIC         613 :         if (srctablespace == src_tsid)
     601 CBC         613 :             dsttablespace = dst_tsid;
     602 ECB             :         else
     603 UIC           0 :             dsttablespace = srctablespace;
     604 EUB             : 
     605 GIC         613 :         dstpath = GetDatabasePath(dst_dboid, dsttablespace);
     606 ECB             : 
     607                 :         /*
     608                 :          * Copy this subdirectory to the new location
     609                 :          *
     610                 :          * We don't need to copy subdirectories
     611                 :          */
     612 GIC         613 :         copydir(srcpath, dstpath, false);
     613 ECB             : 
     614                 :         /* Record the filesystem change in XLOG */
     615                 :         {
     616                 :             xl_dbase_create_file_copy_rec xlrec;
     617                 : 
     618 GIC         613 :             xlrec.db_id = dst_dboid;
     619 CBC         613 :             xlrec.tablespace_id = dsttablespace;
     620             613 :             xlrec.src_db_id = src_dboid;
     621             613 :             xlrec.src_tablespace_id = srctablespace;
     622 ECB             : 
     623 GIC         613 :             XLogBeginInsert();
     624 CBC         613 :             XLogRegisterData((char *) &xlrec,
     625 ECB             :                              sizeof(xl_dbase_create_file_copy_rec));
     626                 : 
     627 GIC         613 :             (void) XLogInsert(RM_DBASE_ID,
     628 ECB             :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
     629                 :         }
     630 GIC         613 :         pfree(srcpath);
     631 CBC         613 :         pfree(dstpath);
     632 ECB             :     }
     633 GIC         613 :     table_endscan(scan);
     634 CBC         613 :     table_close(rel, AccessShareLock);
     635 ECB             : 
     636                 :     /*
     637                 :      * We force a checkpoint before committing.  This effectively means that
     638                 :      * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
     639                 :      * replayed (at least not in ordinary crash recovery; we still have to
     640                 :      * make the XLOG entry for the benefit of PITR operations). This avoids
     641                 :      * two nasty scenarios:
     642                 :      *
     643                 :      * #1: When PITR is off, we don't XLOG the contents of newly created
     644                 :      * indexes; therefore the drop-and-recreate-whole-directory behavior of
     645                 :      * DBASE_CREATE replay would lose such indexes.
     646                 :      *
     647                 :      * #2: Since we have to recopy the source database during DBASE_CREATE
     648                 :      * replay, we run the risk of copying changes in it that were committed
     649                 :      * after the original CREATE DATABASE command but before the system crash
     650                 :      * that led to the replay.  This is at least unexpected and at worst could
     651                 :      * lead to inconsistencies, eg duplicate table names.
     652                 :      *
     653                 :      * (Both of these were real bugs in releases 8.0 through 8.0.3.)
     654                 :      *
     655                 :      * In PITR replay, the first of these isn't an issue, and the second is
     656                 :      * only a risk if the CREATE DATABASE and subsequent template database
     657                 :      * change both occur while a base backup is being taken. There doesn't
     658                 :      * seem to be much we can do about that except document it as a
     659                 :      * limitation.
     660                 :      *
     661                 :      * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
     662                 :      * strategy that avoids these problems.
     663                 :      */
     664 GIC         613 :     RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
     665 CBC         613 : }
     666 ECB             : 
     667                 : /*
     668                 :  * CREATE DATABASE
     669                 :  */
     670                 : Oid
     671 GIC         808 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
     672 ECB             : {
     673                 :     Oid         src_dboid;
     674                 :     Oid         src_owner;
     675 GIC         808 :     int         src_encoding = -1;
     676 CBC         808 :     char       *src_collate = NULL;
     677             808 :     char       *src_ctype = NULL;
     678             808 :     char       *src_iculocale = NULL;
     679 GNC         808 :     char       *src_icurules = NULL;
     680 CBC         808 :     char        src_locprovider = '\0';
     681             808 :     char       *src_collversion = NULL;
     682 ECB             :     bool        src_istemplate;
     683                 :     bool        src_allowconn;
     684 GIC         808 :     TransactionId src_frozenxid = InvalidTransactionId;
     685             808 :     MultiXactId src_minmxid = InvalidMultiXactId;
     686 ECB             :     Oid         src_deftablespace;
     687                 :     volatile Oid dst_deftablespace;
     688                 :     Relation    pg_database_rel;
     689                 :     HeapTuple   tuple;
     690 GNC         808 :     Datum       new_record[Natts_pg_database] = {0};
     691             808 :     bool        new_record_nulls[Natts_pg_database] = {0};
     692 CBC         808 :     Oid         dboid = InvalidOid;
     693 ECB             :     Oid         datdba;
     694                 :     ListCell   *option;
     695 GIC         808 :     DefElem    *dtablespacename = NULL;
     696             808 :     DefElem    *downer = NULL;
     697 CBC         808 :     DefElem    *dtemplate = NULL;
     698             808 :     DefElem    *dencoding = NULL;
     699             808 :     DefElem    *dlocale = NULL;
     700             808 :     DefElem    *dcollate = NULL;
     701             808 :     DefElem    *dctype = NULL;
     702             808 :     DefElem    *diculocale = NULL;
     703 GNC         808 :     DefElem    *dicurules = NULL;
     704 CBC         808 :     DefElem    *dlocprovider = NULL;
     705             808 :     DefElem    *distemplate = NULL;
     706             808 :     DefElem    *dallowconnections = NULL;
     707             808 :     DefElem    *dconnlimit = NULL;
     708             808 :     DefElem    *dcollversion = NULL;
     709             808 :     DefElem    *dstrategy = NULL;
     710             808 :     char       *dbname = stmt->dbname;
     711             808 :     char       *dbowner = NULL;
     712             808 :     const char *dbtemplate = NULL;
     713             808 :     char       *dbcollate = NULL;
     714             808 :     char       *dbctype = NULL;
     715             808 :     char       *dbiculocale = NULL;
     716 GNC         808 :     char       *dbicurules = NULL;
     717 CBC         808 :     char        dblocprovider = '\0';
     718 ECB             :     char       *canonname;
     719 CBC         808 :     int         encoding = -1;
     720             808 :     bool        dbistemplate = false;
     721             808 :     bool        dballowconnections = true;
     722 GIC         808 :     int         dbconnlimit = -1;
     723 CBC         808 :     char       *dbcollversion = NULL;
     724 ECB             :     int         notherbackends;
     725                 :     int         npreparedxacts;
     726 CBC         808 :     CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
     727 ECB             :     createdb_failure_params fparms;
     728                 : 
     729                 :     /* Extract options from the statement node tree */
     730 CBC        2880 :     foreach(option, stmt->options)
     731                 :     {
     732 GIC        2072 :         DefElem    *defel = (DefElem *) lfirst(option);
     733                 : 
     734 CBC        2072 :         if (strcmp(defel->defname, "tablespace") == 0)
     735                 :         {
     736               8 :             if (dtablespacename)
     737 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     738 CBC           8 :             dtablespacename = defel;
     739                 :         }
     740            2064 :         else if (strcmp(defel->defname, "owner") == 0)
     741 EUB             :         {
     742 CBC           1 :             if (downer)
     743 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     744 CBC           1 :             downer = defel;
     745                 :         }
     746            2063 :         else if (strcmp(defel->defname, "template") == 0)
     747 EUB             :         {
     748 CBC         119 :             if (dtemplate)
     749 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     750 CBC         119 :             dtemplate = defel;
     751                 :         }
     752            1944 :         else if (strcmp(defel->defname, "encoding") == 0)
     753 EUB             :         {
     754 CBC          23 :             if (dencoding)
     755 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     756 CBC          23 :             dencoding = defel;
     757                 :         }
     758            1921 :         else if (strcmp(defel->defname, "locale") == 0)
     759 EUB             :         {
     760 CBC          17 :             if (dlocale)
     761 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     762 CBC          17 :             dlocale = defel;
     763                 :         }
     764            1904 :         else if (strcmp(defel->defname, "lc_collate") == 0)
     765 EUB             :         {
     766 CBC           7 :             if (dcollate)
     767 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     768 CBC           7 :             dcollate = defel;
     769                 :         }
     770            1897 :         else if (strcmp(defel->defname, "lc_ctype") == 0)
     771 EUB             :         {
     772 CBC           7 :             if (dctype)
     773 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     774 CBC           7 :             dctype = defel;
     775                 :         }
     776            1890 :         else if (strcmp(defel->defname, "icu_locale") == 0)
     777 EUB             :         {
     778 CBC          19 :             if (diculocale)
     779 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     780 CBC          19 :             diculocale = defel;
     781                 :         }
     782 GNC        1871 :         else if (strcmp(defel->defname, "icu_rules") == 0)
     783                 :         {
     784 UNC           0 :             if (dicurules)
     785               0 :                 errorConflictingDefElem(defel, pstate);
     786               0 :             dicurules = defel;
     787                 :         }
     788 CBC        1871 :         else if (strcmp(defel->defname, "locale_provider") == 0)
     789 EUB             :         {
     790 CBC          22 :             if (dlocprovider)
     791 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     792 CBC          22 :             dlocprovider = defel;
     793                 :         }
     794 GBC        1849 :         else if (strcmp(defel->defname, "is_template") == 0)
     795 EUB             :         {
     796 GBC         305 :             if (distemplate)
     797 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     798 CBC         305 :             distemplate = defel;
     799                 :         }
     800            1544 :         else if (strcmp(defel->defname, "allow_connections") == 0)
     801 EUB             :         {
     802 CBC         303 :             if (dallowconnections)
     803 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     804 CBC         303 :             dallowconnections = defel;
     805                 :         }
     806            1241 :         else if (strcmp(defel->defname, "connection_limit") == 0)
     807 EUB             :         {
     808 LBC           0 :             if (dconnlimit)
     809 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     810 LBC           0 :             dconnlimit = defel;
     811                 :         }
     812 CBC        1241 :         else if (strcmp(defel->defname, "collation_version") == 0)
     813 EUB             :         {
     814 CBC           6 :             if (dcollversion)
     815 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     816 CBC           6 :             dcollversion = defel;
     817                 :         }
     818 GBC        1235 :         else if (strcmp(defel->defname, "location") == 0)
     819 EUB             :         {
     820 UBC           0 :             ereport(WARNING,
     821                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     822 ECB             :                      errmsg("LOCATION is not supported anymore"),
     823                 :                      errhint("Consider using tablespaces instead."),
     824                 :                      parser_errposition(pstate, defel->location)));
     825 EUB             :         }
     826 CBC        1235 :         else if (strcmp(defel->defname, "oid") == 0)
     827                 :         {
     828             615 :             dboid = defGetObjectId(defel);
     829                 : 
     830 EUB             :             /*
     831                 :              * We don't normally permit new databases to be created with
     832                 :              * system-assigned OIDs. pg_upgrade tries to preserve database
     833                 :              * OIDs, so we can't allow any database to be created with an OID
     834                 :              * that might be in use in a freshly-initialized cluster created
     835                 :              * by some future version. We assume all such OIDs will be from
     836 ECB             :              * the system-managed OID range.
     837                 :              *
     838                 :              * As an exception, however, we permit any OID to be assigned when
     839                 :              * allow_system_table_mods=on (so that initdb can assign system
     840                 :              * OIDs to template0 and postgres) or when performing a binary
     841                 :              * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
     842                 :              * in the source cluster).
     843                 :              */
     844 GIC         615 :             if (dboid < FirstNormalObjectId &&
     845             608 :                 !allowSystemTableMods && !IsBinaryUpgrade)
     846 UIC           0 :                 ereport(ERROR,
     847                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
     848                 :                         errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
     849                 :         }
     850 GIC         620 :         else if (strcmp(defel->defname, "strategy") == 0)
     851                 :         {
     852             620 :             if (dstrategy)
     853 UIC           0 :                 errorConflictingDefElem(defel, pstate);
     854 CBC         620 :             dstrategy = defel;
     855 ECB             :         }
     856 EUB             :         else
     857 UIC           0 :             ereport(ERROR,
     858                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
     859                 :                      errmsg("option \"%s\" not recognized", defel->defname),
     860 ECB             :                      parser_errposition(pstate, defel->location)));
     861                 :     }
     862                 : 
     863 GBC         808 :     if (downer && downer->arg)
     864 CBC           1 :         dbowner = defGetString(downer);
     865 GIC         808 :     if (dtemplate && dtemplate->arg)
     866             119 :         dbtemplate = defGetString(dtemplate);
     867 GBC         808 :     if (dencoding && dencoding->arg)
     868                 :     {
     869                 :         const char *encoding_name;
     870                 : 
     871 GIC          23 :         if (IsA(dencoding->arg, Integer))
     872                 :         {
     873 LBC           0 :             encoding = defGetInt32(dencoding);
     874               0 :             encoding_name = pg_encoding_to_char(encoding);
     875               0 :             if (strcmp(encoding_name, "") == 0 ||
     876               0 :                 pg_valid_server_encoding(encoding_name) < 0)
     877               0 :                 ereport(ERROR,
     878                 :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     879                 :                          errmsg("%d is not a valid encoding code",
     880                 :                                 encoding),
     881 ECB             :                          parser_errposition(pstate, dencoding->location)));
     882                 :         }
     883 EUB             :         else
     884                 :         {
     885 GBC          23 :             encoding_name = defGetString(dencoding);
     886              23 :             encoding = pg_valid_server_encoding(encoding_name);
     887              23 :             if (encoding < 0)
     888 UIC           0 :                 ereport(ERROR,
     889                 :                         (errcode(ERRCODE_UNDEFINED_OBJECT),
     890                 :                          errmsg("%s is not a valid encoding name",
     891                 :                                 encoding_name),
     892                 :                          parser_errposition(pstate, dencoding->location)));
     893                 :         }
     894                 :     }
     895 CBC         808 :     if (dlocale && dlocale->arg)
     896 ECB             :     {
     897 CBC          17 :         dbcollate = defGetString(dlocale);
     898 GBC          17 :         dbctype = defGetString(dlocale);
     899                 :     }
     900 GIC         808 :     if (dcollate && dcollate->arg)
     901               7 :         dbcollate = defGetString(dcollate);
     902             808 :     if (dctype && dctype->arg)
     903               7 :         dbctype = defGetString(dctype);
     904             808 :     if (diculocale && diculocale->arg)
     905 CBC          19 :         dbiculocale = defGetString(diculocale);
     906 GNC         808 :     if (dicurules && dicurules->arg)
     907 UNC           0 :         dbicurules = defGetString(dicurules);
     908 GIC         808 :     if (dlocprovider && dlocprovider->arg)
     909 ECB             :     {
     910 CBC          22 :         char       *locproviderstr = defGetString(dlocprovider);
     911                 : 
     912              22 :         if (pg_strcasecmp(locproviderstr, "icu") == 0)
     913              20 :             dblocprovider = COLLPROVIDER_ICU;
     914               2 :         else if (pg_strcasecmp(locproviderstr, "libc") == 0)
     915               1 :             dblocprovider = COLLPROVIDER_LIBC;
     916 ECB             :         else
     917 CBC           1 :             ereport(ERROR,
     918 ECB             :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
     919 EUB             :                      errmsg("unrecognized locale provider: %s",
     920 ECB             :                             locproviderstr)));
     921                 :     }
     922 CBC         807 :     if (distemplate && distemplate->arg)
     923 GIC         305 :         dbistemplate = defGetBoolean(distemplate);
     924 CBC         807 :     if (dallowconnections && dallowconnections->arg)
     925             303 :         dballowconnections = defGetBoolean(dallowconnections);
     926             807 :     if (dconnlimit && dconnlimit->arg)
     927 ECB             :     {
     928 UIC           0 :         dbconnlimit = defGetInt32(dconnlimit);
     929 LBC           0 :         if (dbconnlimit < -1)
     930 UIC           0 :             ereport(ERROR,
     931                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     932                 :                      errmsg("invalid connection limit: %d", dbconnlimit)));
     933                 :     }
     934 CBC         807 :     if (dcollversion)
     935               6 :         dbcollversion = defGetString(dcollversion);
     936 ECB             : 
     937                 :     /* obtain OID of proposed owner */
     938 CBC         807 :     if (dbowner)
     939 GIC           1 :         datdba = get_role_oid(dbowner, false);
     940 EUB             :     else
     941 GBC         806 :         datdba = GetUserId();
     942 EUB             : 
     943                 :     /*
     944                 :      * To create a database, must have createdb privilege and must be able to
     945                 :      * become the target role (this does not imply that the target role itself
     946 ECB             :      * must have createdb privilege).  The latter provision guards against
     947                 :      * "giveaway" attacks.  Note that a superuser will always have both of
     948                 :      * these privileges a fortiori.
     949                 :      */
     950 CBC         807 :     if (!have_createdb_privilege())
     951               3 :         ereport(ERROR,
     952                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     953 ECB             :                  errmsg("permission denied to create database")));
     954                 : 
     955 GNC         804 :     check_can_set_role(GetUserId(), datdba);
     956                 : 
     957                 :     /*
     958                 :      * Lookup database (template) to be cloned, and obtain share lock on it.
     959                 :      * ShareLock allows two CREATE DATABASEs to work from the same template
     960                 :      * concurrently, while ensuring no one is busy dropping it in parallel
     961                 :      * (which would be Very Bad since we'd likely get an incomplete copy
     962 ECB             :      * without knowing it).  This also prevents any new connections from being
     963                 :      * made to the source until we finish copying it, so we can be sure it
     964                 :      * won't change underneath us.
     965                 :      */
     966 GIC         804 :     if (!dbtemplate)
     967 CBC         686 :         dbtemplate = "template1"; /* Default template database name */
     968                 : 
     969 GIC         804 :     if (!get_db_info(dbtemplate, ShareLock,
     970                 :                      &src_dboid, &src_owner, &src_encoding,
     971                 :                      &src_istemplate, &src_allowconn,
     972                 :                      &src_frozenxid, &src_minmxid, &src_deftablespace,
     973                 :                      &src_collate, &src_ctype, &src_iculocale, &src_icurules, &src_locprovider,
     974                 :                      &src_collversion))
     975 UIC           0 :         ereport(ERROR,
     976                 :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
     977                 :                  errmsg("template database \"%s\" does not exist",
     978 ECB             :                         dbtemplate)));
     979                 : 
     980                 :     /*
     981                 :      * Permission check: to copy a DB that's not marked datistemplate, you
     982                 :      * must be superuser or the owner thereof.
     983                 :      */
     984 GIC         804 :     if (!src_istemplate)
     985                 :     {
     986 GNC           6 :         if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
     987 UBC           0 :             ereport(ERROR,
     988                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     989                 :                      errmsg("permission denied to copy database \"%s\"",
     990                 :                             dbtemplate)));
     991                 :     }
     992                 : 
     993                 :     /* Validate the database creation strategy. */
     994 GIC         804 :     if (dstrategy && dstrategy->arg)
     995                 :     {
     996 ECB             :         char       *strategy;
     997                 : 
     998 CBC         620 :         strategy = defGetString(dstrategy);
     999 GBC         620 :         if (strcmp(strategy, "wal_log") == 0)
    1000 GIC           6 :             dbstrategy = CREATEDB_WAL_LOG;
    1001             614 :         else if (strcmp(strategy, "file_copy") == 0)
    1002             613 :             dbstrategy = CREATEDB_FILE_COPY;
    1003                 :         else
    1004               1 :             ereport(ERROR,
    1005                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1006 ECB             :                      errmsg("invalid create database strategy \"%s\"", strategy),
    1007                 :                      errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
    1008                 :     }
    1009                 : 
    1010                 :     /* If encoding or locales are defaulted, use source's setting */
    1011 CBC         803 :     if (encoding < 0)
    1012             780 :         encoding = src_encoding;
    1013             803 :     if (dbcollate == NULL)
    1014             779 :         dbcollate = src_collate;
    1015 GIC         803 :     if (dbctype == NULL)
    1016 CBC         779 :         dbctype = src_ctype;
    1017 GIC         803 :     if (dblocprovider == '\0')
    1018             782 :         dblocprovider = src_locprovider;
    1019             803 :     if (dbiculocale == NULL && dblocprovider == COLLPROVIDER_ICU)
    1020             768 :         dbiculocale = src_iculocale;
    1021 GNC         803 :     if (dbicurules == NULL && dblocprovider == COLLPROVIDER_ICU)
    1022             787 :         dbicurules = src_icurules;
    1023                 : 
    1024                 :     /* Some encodings are client only */
    1025 CBC         803 :     if (!PG_VALID_BE_ENCODING(encoding))
    1026 LBC           0 :         ereport(ERROR,
    1027 ECB             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1028                 :                  errmsg("invalid server encoding %d", encoding)));
    1029                 : 
    1030                 :     /* Check that the chosen locales are valid, and get canonical spellings */
    1031 CBC         803 :     if (!check_locale(LC_COLLATE, dbcollate, &canonname))
    1032               1 :         ereport(ERROR,
    1033 ECB             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1034                 :                  errmsg("invalid locale name: \"%s\"", dbcollate)));
    1035 CBC         802 :     dbcollate = canonname;
    1036             802 :     if (!check_locale(LC_CTYPE, dbctype, &canonname))
    1037 GIC           1 :         ereport(ERROR,
    1038                 :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1039 ECB             :                  errmsg("invalid locale name: \"%s\"", dbctype)));
    1040 GBC         801 :     dbctype = canonname;
    1041                 : 
    1042 GIC         801 :     check_encoding_locale_matches(encoding, dbcollate, dbctype);
    1043                 : 
    1044             801 :     if (dblocprovider == COLLPROVIDER_ICU)
    1045 ECB             :     {
    1046 CBC         787 :         if (!(is_encoding_supported_by_icu(encoding)))
    1047 GIC           1 :             ereport(ERROR,
    1048                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1049 ECB             :                      errmsg("encoding \"%s\" is not supported with ICU provider",
    1050                 :                             pg_encoding_to_char(encoding))));
    1051                 : 
    1052                 :         /*
    1053                 :          * This would happen if template0 uses the libc provider but the new
    1054                 :          * database uses icu.
    1055                 :          */
    1056 CBC         786 :         if (!dbiculocale)
    1057 GIC           1 :             ereport(ERROR,
    1058 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1059                 :                      errmsg("ICU locale must be specified")));
    1060                 : 
    1061                 :         /*
    1062                 :          * During binary upgrade, or when the locale came from the template
    1063                 :          * database, preserve locale string. Otherwise, canonicalize to a
    1064                 :          * language tag.
    1065                 :          */
    1066 GNC         785 :         if (!IsBinaryUpgrade && dbiculocale != src_iculocale)
    1067                 :         {
    1068              13 :             char *langtag = icu_language_tag(dbiculocale,
    1069                 :                                              icu_validation_level);
    1070                 : 
    1071              13 :             if (langtag && strcmp(dbiculocale, langtag) != 0)
    1072                 :             {
    1073               2 :                 ereport(NOTICE,
    1074                 :                         (errmsg("using standard form \"%s\" for locale \"%s\"",
    1075                 :                                 langtag, dbiculocale)));
    1076                 : 
    1077               2 :                 dbiculocale = langtag;
    1078                 :             }
    1079                 :         }
    1080                 : 
    1081             785 :         icu_validate_locale(dbiculocale);
    1082                 :     }
    1083                 :     else
    1084                 :     {
    1085 GIC          14 :         if (dbiculocale)
    1086 UIC           0 :             ereport(ERROR,
    1087                 :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1088                 :                      errmsg("ICU locale cannot be specified unless locale provider is ICU")));
    1089                 : 
    1090 GNC          14 :         if (dbicurules)
    1091 UNC           0 :             ereport(ERROR,
    1092                 :                     (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    1093                 :                      errmsg("ICU rules cannot be specified unless locale provider is ICU")));
    1094                 :     }
    1095 ECB             : 
    1096                 :     /*
    1097                 :      * Check that the new encoding and locale settings match the source
    1098                 :      * database.  We insist on this because we simply copy the source data ---
    1099                 :      * any non-ASCII data would be wrongly encoded, and any indexes sorted
    1100                 :      * according to the source locale would be wrong.
    1101                 :      *
    1102                 :      * However, we assume that template0 doesn't contain any non-ASCII data
    1103                 :      * nor any indexes that depend on collation or ctype, so template0 can be
    1104                 :      * used as template for creating a database with any encoding or locale.
    1105                 :      */
    1106 GIC         798 :     if (strcmp(dbtemplate, "template0") != 0)
    1107 ECB             :     {
    1108 GIC         692 :         if (encoding != src_encoding)
    1109 UIC           0 :             ereport(ERROR,
    1110 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1111                 :                      errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
    1112                 :                             pg_encoding_to_char(encoding),
    1113                 :                             pg_encoding_to_char(src_encoding)),
    1114                 :                      errhint("Use the same encoding as in the template database, or use template0 as template.")));
    1115                 : 
    1116 CBC         692 :         if (strcmp(dbcollate, src_collate) != 0)
    1117 UIC           0 :             ereport(ERROR,
    1118                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1119                 :                      errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
    1120 ECB             :                             dbcollate, src_collate),
    1121                 :                      errhint("Use the same collation as in the template database, or use template0 as template.")));
    1122                 : 
    1123 GIC         692 :         if (strcmp(dbctype, src_ctype) != 0)
    1124 LBC           0 :             ereport(ERROR,
    1125 EUB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1126                 :                      errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
    1127                 :                             dbctype, src_ctype),
    1128                 :                      errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
    1129 ECB             : 
    1130 GBC         692 :         if (dblocprovider != src_locprovider)
    1131 UIC           0 :             ereport(ERROR,
    1132                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1133                 :                      errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
    1134                 :                             collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
    1135                 :                      errhint("Use the same locale provider as in the template database, or use template0 as template.")));
    1136                 : 
    1137 GIC         692 :         if (dblocprovider == COLLPROVIDER_ICU)
    1138                 :         {
    1139                 :             char       *val1;
    1140                 :             char       *val2;
    1141                 : 
    1142             680 :             Assert(dbiculocale);
    1143             680 :             Assert(src_iculocale);
    1144             680 :             if (strcmp(dbiculocale, src_iculocale) != 0)
    1145 UIC           0 :                 ereport(ERROR,
    1146                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1147                 :                          errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
    1148 ECB             :                                 dbiculocale, src_iculocale),
    1149                 :                          errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
    1150                 : 
    1151 GNC         680 :             val1 = dbicurules;
    1152             680 :             if (!val1)
    1153             680 :                 val1 = "";
    1154             680 :             val2 = src_icurules;
    1155             680 :             if (!val2)
    1156             680 :                 val2 = "";
    1157             680 :             if (strcmp(val1, val2) != 0)
    1158 UNC           0 :                 ereport(ERROR,
    1159                 :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1160                 :                          errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
    1161                 :                                 val1, val2),
    1162                 :                          errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
    1163 ECB             :         }
    1164 EUB             :     }
    1165                 : 
    1166                 :     /*
    1167                 :      * If we got a collation version for the template database, check that it
    1168                 :      * matches the actual OS collation version.  Otherwise error; the user
    1169                 :      * needs to fix the template database first.  Don't complain if a
    1170                 :      * collation version was specified explicitly as a statement option; that
    1171 ECB             :      * is used by pg_upgrade to reproduce the old state exactly.
    1172 EUB             :      *
    1173                 :      * (If the template database has no collation version, then either the
    1174                 :      * platform/provider does not support collation versioning, or it's
    1175                 :      * template0, for which we stipulate that it does not contain
    1176                 :      * collation-using objects.)
    1177                 :      */
    1178 CBC         798 :     if (src_collversion && !dcollversion)
    1179 EUB             :     {
    1180                 :         char       *actual_versionstr;
    1181                 : 
    1182 GIC         385 :         actual_versionstr = get_collation_actual_version(dblocprovider, dblocprovider == COLLPROVIDER_ICU ? dbiculocale : dbcollate);
    1183             385 :         if (!actual_versionstr)
    1184 UIC           0 :             ereport(ERROR,
    1185 ECB             :                     (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
    1186 EUB             :                             dbtemplate)));
    1187                 : 
    1188 GIC         385 :         if (strcmp(actual_versionstr, src_collversion) != 0)
    1189 UIC           0 :             ereport(ERROR,
    1190                 :                     (errmsg("template database \"%s\" has a collation version mismatch",
    1191                 :                             dbtemplate),
    1192 ECB             :                      errdetail("The template database was created using collation version %s, "
    1193                 :                                "but the operating system provides version %s.",
    1194                 :                                src_collversion, actual_versionstr),
    1195                 :                      errhint("Rebuild all objects in the template database that use the default collation and run "
    1196                 :                              "ALTER DATABASE %s REFRESH COLLATION VERSION, "
    1197                 :                              "or build PostgreSQL with the right library version.",
    1198                 :                              quote_identifier(dbtemplate))));
    1199                 :     }
    1200 EUB             : 
    1201 GIC         798 :     if (dbcollversion == NULL)
    1202             792 :         dbcollversion = src_collversion;
    1203                 : 
    1204                 :     /*
    1205                 :      * Normally, we copy the collation version from the template database.
    1206 ECB             :      * This last resort only applies if the template database does not have a
    1207                 :      * collation version, which is normally only the case for template0.
    1208                 :      */
    1209 CBC         798 :     if (dbcollversion == NULL)
    1210             407 :         dbcollversion = get_collation_actual_version(dblocprovider, dblocprovider == COLLPROVIDER_ICU ? dbiculocale : dbcollate);
    1211 ECB             : 
    1212                 :     /* Resolve default tablespace for new database */
    1213 GBC         798 :     if (dtablespacename && dtablespacename->arg)
    1214 GIC           8 :     {
    1215                 :         char       *tablespacename;
    1216                 :         AclResult   aclresult;
    1217                 : 
    1218               8 :         tablespacename = defGetString(dtablespacename);
    1219               8 :         dst_deftablespace = get_tablespace_oid(tablespacename, false);
    1220                 :         /* check permissions */
    1221 GNC           8 :         aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
    1222                 :                                            ACL_CREATE);
    1223 GIC           8 :         if (aclresult != ACLCHECK_OK)
    1224 UIC           0 :             aclcheck_error(aclresult, OBJECT_TABLESPACE,
    1225                 :                            tablespacename);
    1226                 : 
    1227                 :         /* pg_global must never be the default tablespace */
    1228 GIC           8 :         if (dst_deftablespace == GLOBALTABLESPACE_OID)
    1229 UIC           0 :             ereport(ERROR,
    1230                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1231                 :                      errmsg("pg_global cannot be used as default tablespace")));
    1232                 : 
    1233 ECB             :         /*
    1234                 :          * If we are trying to change the default tablespace of the template,
    1235                 :          * we require that the template not have any files in the new default
    1236                 :          * tablespace.  This is necessary because otherwise the copied
    1237                 :          * database would contain pg_class rows that refer to its default
    1238                 :          * tablespace both explicitly (by OID) and implicitly (as zero), which
    1239 EUB             :          * would cause problems.  For example another CREATE DATABASE using
    1240                 :          * the copied database as template, and trying to change its default
    1241                 :          * tablespace again, would yield outright incorrect results (it would
    1242                 :          * improperly move tables to the new default tablespace that should
    1243 ECB             :          * stay in the same tablespace).
    1244 EUB             :          */
    1245 GIC           8 :         if (dst_deftablespace != src_deftablespace)
    1246                 :         {
    1247                 :             char       *srcpath;
    1248                 :             struct stat st;
    1249                 : 
    1250               8 :             srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
    1251                 : 
    1252               8 :             if (stat(srcpath, &st) == 0 &&
    1253 UIC           0 :                 S_ISDIR(st.st_mode) &&
    1254               0 :                 !directory_is_empty(srcpath))
    1255               0 :                 ereport(ERROR,
    1256 ECB             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1257                 :                          errmsg("cannot assign new default tablespace \"%s\"",
    1258                 :                                 tablespacename),
    1259                 :                          errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
    1260                 :                                    dbtemplate)));
    1261 GIC           8 :             pfree(srcpath);
    1262                 :         }
    1263                 :     }
    1264 ECB             :     else
    1265                 :     {
    1266                 :         /* Use template database's default tablespace */
    1267 GIC         790 :         dst_deftablespace = src_deftablespace;
    1268 ECB             :         /* Note there is no additional permission check in this path */
    1269                 :     }
    1270                 : 
    1271                 :     /*
    1272                 :      * If built with appropriate switch, whine when regression-testing
    1273                 :      * conventions for database names are violated.  But don't complain during
    1274                 :      * initdb.
    1275                 :      */
    1276                 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1277                 :     if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
    1278                 :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1279 EUB             : #endif
    1280                 : 
    1281                 :     /*
    1282                 :      * Check for db name conflict.  This is just to give a more friendly error
    1283 ECB             :      * message than "unique index violation".  There's a race condition but
    1284 EUB             :      * we're willing to accept the less friendly message in that case.
    1285                 :      */
    1286 GIC         798 :     if (OidIsValid(get_database_oid(dbname, true)))
    1287               1 :         ereport(ERROR,
    1288                 :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1289                 :                  errmsg("database \"%s\" already exists", dbname)));
    1290                 : 
    1291                 :     /*
    1292                 :      * The source DB can't have any active backends, except this one
    1293                 :      * (exception is to allow CREATE DB while connected to template1).
    1294                 :      * Otherwise we might copy inconsistent data.
    1295                 :      *
    1296                 :      * This should be last among the basic error checks, because it involves
    1297                 :      * potential waiting; we may as well throw an error first if we're gonna
    1298                 :      * throw one.
    1299                 :      */
    1300 CBC         797 :     if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
    1301 UIC           0 :         ereport(ERROR,
    1302                 :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1303                 :                  errmsg("source database \"%s\" is being accessed by other users",
    1304                 :                         dbtemplate),
    1305 ECB             :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1306                 : 
    1307                 :     /*
    1308 EUB             :      * Select an OID for the new database, checking that it doesn't have a
    1309                 :      * filename conflict with anything already existing in the tablespace
    1310                 :      * directories.
    1311                 :      */
    1312 GIC         797 :     pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1313                 : 
    1314                 :     /*
    1315                 :      * If database OID is configured, check if the OID is already in use or
    1316 ECB             :      * data directory already exists.
    1317                 :      */
    1318 GIC         797 :     if (OidIsValid(dboid))
    1319                 :     {
    1320             615 :         char       *existing_dbname = get_database_name(dboid);
    1321                 : 
    1322 CBC         615 :         if (existing_dbname != NULL)
    1323 UIC           0 :             ereport(ERROR,
    1324                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1325                 :                     errmsg("database OID %u is already in use by database \"%s\"",
    1326                 :                            dboid, existing_dbname));
    1327                 : 
    1328 GIC         615 :         if (check_db_file_conflict(dboid))
    1329 UIC           0 :             ereport(ERROR,
    1330                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
    1331                 :                     errmsg("data directory with the specified OID %u already exists", dboid));
    1332                 :     }
    1333                 :     else
    1334                 :     {
    1335                 :         /* Select an OID for the new database if is not explicitly configured. */
    1336                 :         do
    1337                 :         {
    1338 GIC         182 :             dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
    1339                 :                                        Anum_pg_database_oid);
    1340             182 :         } while (check_db_file_conflict(dboid));
    1341 ECB             :     }
    1342                 : 
    1343                 :     /*
    1344                 :      * Insert a new tuple into pg_database.  This establishes our ownership of
    1345                 :      * the new database name (anyone else trying to insert the same name will
    1346                 :      * block on the unique index, and fail after we commit).
    1347                 :      */
    1348                 : 
    1349 GIC         797 :     Assert((dblocprovider == COLLPROVIDER_ICU && dbiculocale) ||
    1350                 :            (dblocprovider != COLLPROVIDER_ICU && !dbiculocale));
    1351                 : 
    1352                 :     /* Form tuple */
    1353 GBC         797 :     new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
    1354 GIC         797 :     new_record[Anum_pg_database_datname - 1] =
    1355             797 :         DirectFunctionCall1(namein, CStringGetDatum(dbname));
    1356             797 :     new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
    1357             797 :     new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
    1358             797 :     new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
    1359             797 :     new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    1360             797 :     new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    1361             797 :     new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    1362             797 :     new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
    1363             797 :     new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
    1364 CBC         797 :     new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
    1365 GIC         797 :     new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
    1366             797 :     new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
    1367             797 :     if (dbiculocale)
    1368             784 :         new_record[Anum_pg_database_daticulocale - 1] = CStringGetTextDatum(dbiculocale);
    1369                 :     else
    1370 CBC          13 :         new_record_nulls[Anum_pg_database_daticulocale - 1] = true;
    1371 GNC         797 :     if (dbicurules)
    1372 UNC           0 :         new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
    1373                 :     else
    1374 GNC         797 :         new_record_nulls[Anum_pg_database_daticurules - 1] = true;
    1375 GIC         797 :     if (dbcollversion)
    1376 CBC         792 :         new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
    1377                 :     else
    1378               5 :         new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
    1379 EUB             : 
    1380                 :     /*
    1381                 :      * We deliberately set datacl to default (NULL), rather than copying it
    1382                 :      * from the template database.  Copying it would be a bad idea when the
    1383                 :      * owner is not the same as the template's owner.
    1384 ECB             :      */
    1385 GBC         797 :     new_record_nulls[Anum_pg_database_datacl - 1] = true;
    1386                 : 
    1387 GIC         797 :     tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
    1388                 :                             new_record, new_record_nulls);
    1389                 : 
    1390             797 :     CatalogTupleInsert(pg_database_rel, tuple);
    1391                 : 
    1392                 :     /*
    1393                 :      * Now generate additional catalog entries associated with the new DB
    1394 ECB             :      */
    1395                 : 
    1396                 :     /* Register owner dependency */
    1397 GIC         797 :     recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
    1398                 : 
    1399                 :     /* Create pg_shdepend entries for objects within database */
    1400             797 :     copyTemplateDependencies(src_dboid, dboid);
    1401                 : 
    1402                 :     /* Post creation hook for new database */
    1403             797 :     InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
    1404                 : 
    1405 ECB             :     /*
    1406                 :      * If we're going to be reading data for the to-be-created database into
    1407                 :      * shared_buffers, take a lock on it. Nobody should know that this
    1408                 :      * database exists yet, but it's good to maintain the invariant that an
    1409                 :      * AccessExclusiveLock on the database is sufficient to drop all
    1410                 :      * of its buffers without worrying about more being read later.
    1411                 :      *
    1412                 :      * Note that we need to do this before entering the
    1413                 :      * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
    1414                 :      * expects this lock to be held already.
    1415                 :      */
    1416 CBC         797 :     if (dbstrategy == CREATEDB_WAL_LOG)
    1417             184 :         LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
    1418 ECB             : 
    1419                 :     /*
    1420                 :      * Once we start copying subdirectories, we need to be able to clean 'em
    1421                 :      * up if we fail.  Use an ENSURE block to make sure this happens.  (This
    1422                 :      * is not a 100% solution, because of the possibility of failure during
    1423                 :      * transaction commit after we leave this routine, but it should handle
    1424                 :      * most scenarios.)
    1425                 :      */
    1426 CBC         797 :     fparms.src_dboid = src_dboid;
    1427             797 :     fparms.dest_dboid = dboid;
    1428 GBC         797 :     fparms.strategy = dbstrategy;
    1429                 : 
    1430 CBC         797 :     PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1431 ECB             :                             PointerGetDatum(&fparms));
    1432                 :     {
    1433                 :         /*
    1434                 :          * If the user has asked to create a database with WAL_LOG strategy
    1435                 :          * then call CreateDatabaseUsingWalLog, which will copy the database
    1436                 :          * at the block level and it will WAL log each copied block.
    1437                 :          * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
    1438                 :          * database file by file.
    1439                 :          */
    1440 GIC         797 :         if (dbstrategy == CREATEDB_WAL_LOG)
    1441 CBC         184 :             CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
    1442                 :                                       dst_deftablespace);
    1443 ECB             :         else
    1444 GIC         613 :             CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
    1445                 :                                         dst_deftablespace);
    1446 ECB             : 
    1447                 :         /*
    1448                 :          * Close pg_database, but keep lock till commit.
    1449                 :          */
    1450 GIC         797 :         table_close(pg_database_rel, NoLock);
    1451                 : 
    1452                 :         /*
    1453 ECB             :          * Force synchronous commit, thus minimizing the window between
    1454                 :          * creation of the database files and committal of the transaction. If
    1455                 :          * we crash before committing, we'll have a DB that's taking up disk
    1456                 :          * space but is not in pg_database, which is not good.
    1457                 :          */
    1458 GIC         797 :         ForceSyncCommit();
    1459 ECB             :     }
    1460 GIC         797 :     PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
    1461                 :                                 PointerGetDatum(&fparms));
    1462                 : 
    1463             797 :     return dboid;
    1464                 : }
    1465                 : 
    1466                 : /*
    1467                 :  * Check whether chosen encoding matches chosen locale settings.  This
    1468                 :  * restriction is necessary because libc's locale-specific code usually
    1469                 :  * fails when presented with data in an encoding it's not expecting. We
    1470                 :  * allow mismatch in four cases:
    1471                 :  *
    1472 ECB             :  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
    1473                 :  * which works with any encoding.
    1474                 :  *
    1475                 :  * 2. locale encoding = -1, which means that we couldn't determine the
    1476                 :  * locale's encoding and have to trust the user to get it right.
    1477                 :  *
    1478                 :  * 3. selected encoding is UTF8 and platform is win32. This is because
    1479                 :  * UTF8 is a pseudo codepage that is supported in all locales since it's
    1480                 :  * converted to UTF16 before being used.
    1481                 :  *
    1482                 :  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
    1483                 :  * is risky but we have historically allowed it --- notably, the
    1484                 :  * regression tests require it.
    1485                 :  *
    1486                 :  * Note: if you change this policy, fix initdb to match.
    1487                 :  */
    1488                 : void
    1489 GIC         830 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
    1490                 : {
    1491             830 :     int         ctype_encoding = pg_get_encoding_from_locale(ctype, true);
    1492             830 :     int         collate_encoding = pg_get_encoding_from_locale(collate, true);
    1493                 : 
    1494             833 :     if (!(ctype_encoding == encoding ||
    1495               9 :           ctype_encoding == PG_SQL_ASCII ||
    1496 ECB             :           ctype_encoding == -1 ||
    1497                 : #ifdef WIN32
    1498                 :           encoding == PG_UTF8 ||
    1499                 : #endif
    1500 CBC           3 :           (encoding == PG_SQL_ASCII && superuser())))
    1501 UIC           0 :         ereport(ERROR,
    1502                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1503                 :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1504                 :                         pg_encoding_to_char(encoding),
    1505                 :                         ctype),
    1506 ECB             :                  errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
    1507                 :                            pg_encoding_to_char(ctype_encoding))));
    1508                 : 
    1509 GIC         833 :     if (!(collate_encoding == encoding ||
    1510               9 :           collate_encoding == PG_SQL_ASCII ||
    1511                 :           collate_encoding == -1 ||
    1512                 : #ifdef WIN32
    1513                 :           encoding == PG_UTF8 ||
    1514 ECB             : #endif
    1515 GIC           3 :           (encoding == PG_SQL_ASCII && superuser())))
    1516 LBC           0 :         ereport(ERROR,
    1517                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1518                 :                  errmsg("encoding \"%s\" does not match locale \"%s\"",
    1519 ECB             :                         pg_encoding_to_char(encoding),
    1520                 :                         collate),
    1521                 :                  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
    1522                 :                            pg_encoding_to_char(collate_encoding))));
    1523 GIC         830 : }
    1524                 : 
    1525                 : /* Error cleanup callback for createdb */
    1526                 : static void
    1527 UIC           0 : createdb_failure_callback(int code, Datum arg)
    1528                 : {
    1529               0 :     createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
    1530                 : 
    1531                 :     /*
    1532                 :      * If we were copying database at block levels then drop pages for the
    1533                 :      * destination database that are in the shared buffer cache.  And tell
    1534                 :      * checkpointer to forget any pending fsync and unlink requests for files
    1535                 :      * in the database.  The reasoning behind doing this is same as explained
    1536                 :      * in dropdb function.  But unlike dropdb we don't need to call
    1537                 :      * pgstat_drop_database because this database is still not created so
    1538                 :      * there should not be any stat for this.
    1539                 :      */
    1540               0 :     if (fparms->strategy == CREATEDB_WAL_LOG)
    1541                 :     {
    1542               0 :         DropDatabaseBuffers(fparms->dest_dboid);
    1543               0 :         ForgetDatabaseSyncRequests(fparms->dest_dboid);
    1544                 : 
    1545 ECB             :         /* Release lock on the target database. */
    1546 UIC           0 :         UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
    1547 ECB             :                            AccessShareLock);
    1548                 :     }
    1549                 : 
    1550                 :     /*
    1551                 :      * Release lock on source database before doing recursive remove. This is
    1552                 :      * not essential but it seems desirable to release the lock as soon as
    1553                 :      * possible.
    1554                 :      */
    1555 UIC           0 :     UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
    1556 ECB             : 
    1557 EUB             :     /* Throw away any successfully copied subdirectories */
    1558 UIC           0 :     remove_dbtablespaces(fparms->dest_dboid);
    1559               0 : }
    1560                 : 
    1561                 : 
    1562                 : /*
    1563                 :  * DROP DATABASE
    1564                 :  */
    1565 ECB             : void
    1566 CBC          35 : dropdb(const char *dbname, bool missing_ok, bool force)
    1567                 : {
    1568                 :     Oid         db_id;
    1569                 :     bool        db_istemplate;
    1570                 :     Relation    pgdbrel;
    1571 ECB             :     HeapTuple   tup;
    1572 EUB             :     int         notherbackends;
    1573                 :     int         npreparedxacts;
    1574                 :     int         nslots,
    1575                 :                 nslots_active;
    1576                 :     int         nsubscriptions;
    1577                 : 
    1578                 :     /*
    1579 ECB             :      * Look up the target database's OID, and get exclusive lock on it. We
    1580                 :      * need this to ensure that no new backend starts up in the target
    1581                 :      * database while we are deleting it (see postinit.c), and that no one is
    1582                 :      * using it as a CREATE DATABASE template or trying to delete it for
    1583 EUB             :      * themselves.
    1584                 :      */
    1585 GBC          35 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    1586                 : 
    1587 GIC          35 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
    1588                 :                      &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1589                 :     {
    1590              14 :         if (!missing_ok)
    1591                 :         {
    1592               6 :             ereport(ERROR,
    1593                 :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    1594                 :                      errmsg("database \"%s\" does not exist", dbname)));
    1595                 :         }
    1596 EUB             :         else
    1597                 :         {
    1598                 :             /* Close pg_database, release the lock, since we changed nothing */
    1599 GBC           8 :             table_close(pgdbrel, RowExclusiveLock);
    1600 GIC           8 :             ereport(NOTICE,
    1601                 :                     (errmsg("database \"%s\" does not exist, skipping",
    1602 EUB             :                             dbname)));
    1603 GIC           8 :             return;
    1604                 :         }
    1605                 :     }
    1606                 : 
    1607                 :     /*
    1608                 :      * Permission checks
    1609                 :      */
    1610 GNC          21 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1611 UBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1612                 :                        dbname);
    1613                 : 
    1614 EUB             :     /* DROP hook for the database being removed */
    1615 GBC          21 :     InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
    1616                 : 
    1617                 :     /*
    1618                 :      * Disallow dropping a DB that is marked istemplate.  This is just to
    1619                 :      * prevent people from accidentally dropping template0 or template1; they
    1620                 :      * can do so if they're really determined ...
    1621                 :      */
    1622 CBC          21 :     if (db_istemplate)
    1623 UIC           0 :         ereport(ERROR,
    1624                 :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1625                 :                  errmsg("cannot drop a template database")));
    1626                 : 
    1627                 :     /* Obviously can't drop my own database */
    1628 GIC          21 :     if (db_id == MyDatabaseId)
    1629 UIC           0 :         ereport(ERROR,
    1630                 :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1631                 :                  errmsg("cannot drop the currently open database")));
    1632                 : 
    1633                 :     /*
    1634                 :      * Check whether there are active logical slots that refer to the
    1635                 :      * to-be-dropped database. The database lock we are holding prevents the
    1636                 :      * creation of new slots using the database or existing slots becoming
    1637                 :      * active.
    1638                 :      */
    1639 GIC          21 :     (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
    1640              21 :     if (nslots_active)
    1641 ECB             :     {
    1642 GIC           1 :         ereport(ERROR,
    1643 ECB             :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1644                 :                  errmsg("database \"%s\" is used by an active logical replication slot",
    1645                 :                         dbname),
    1646                 :                  errdetail_plural("There is %d active slot.",
    1647                 :                                   "There are %d active slots.",
    1648                 :                                   nslots_active, nslots_active)));
    1649                 :     }
    1650                 : 
    1651                 :     /*
    1652                 :      * Check if there are subscriptions defined in the target database.
    1653                 :      *
    1654                 :      * We can't drop them automatically because they might be holding
    1655                 :      * resources in other databases/instances.
    1656                 :      */
    1657 GIC          20 :     if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
    1658 UIC           0 :         ereport(ERROR,
    1659 ECB             :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1660                 :                  errmsg("database \"%s\" is being used by logical replication subscription",
    1661                 :                         dbname),
    1662                 :                  errdetail_plural("There is %d subscription.",
    1663                 :                                   "There are %d subscriptions.",
    1664                 :                                   nsubscriptions, nsubscriptions)));
    1665                 : 
    1666                 : 
    1667 EUB             :     /*
    1668                 :      * Attempt to terminate all existing connections to the target database if
    1669                 :      * the user has requested to do so.
    1670                 :      */
    1671 CBC          20 :     if (force)
    1672 GIC           1 :         TerminateOtherDBBackends(db_id);
    1673                 : 
    1674                 :     /*
    1675                 :      * Check for other backends in the target database.  (Because we hold the
    1676                 :      * database lock, no new ones can start after this.)
    1677                 :      *
    1678 ECB             :      * As in CREATE DATABASE, check this after other error conditions.
    1679 EUB             :      */
    1680 GIC          20 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1681 UIC           0 :         ereport(ERROR,
    1682                 :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1683                 :                  errmsg("database \"%s\" is being accessed by other users",
    1684 ECB             :                         dbname),
    1685 EUB             :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1686                 : 
    1687                 :     /*
    1688                 :      * Remove the database's tuple from pg_database.
    1689                 :      */
    1690 GIC          20 :     tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
    1691              20 :     if (!HeapTupleIsValid(tup))
    1692 UIC           0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1693                 : 
    1694 GIC          20 :     CatalogTupleDelete(pgdbrel, &tup->t_self);
    1695 ECB             : 
    1696 CBC          20 :     ReleaseSysCache(tup);
    1697                 : 
    1698 ECB             :     /*
    1699                 :      * Delete any comments or security labels associated with the database.
    1700                 :      */
    1701 GIC          20 :     DeleteSharedComments(db_id, DatabaseRelationId);
    1702              20 :     DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
    1703                 : 
    1704                 :     /*
    1705                 :      * Remove settings associated with this database
    1706                 :      */
    1707              20 :     DropSetting(db_id, InvalidOid);
    1708                 : 
    1709                 :     /*
    1710                 :      * Remove shared dependency references for the database.
    1711                 :      */
    1712              20 :     dropDatabaseDependencies(db_id);
    1713 ECB             : 
    1714 EUB             :     /*
    1715                 :      * Drop db-specific replication slots.
    1716                 :      */
    1717 GIC          20 :     ReplicationSlotsDropDBSlots(db_id);
    1718                 : 
    1719                 :     /*
    1720                 :      * Drop pages for this database that are in the shared buffer cache. This
    1721                 :      * is important to ensure that no remaining backend tries to write out a
    1722                 :      * dirty buffer to the dead database later...
    1723                 :      */
    1724              20 :     DropDatabaseBuffers(db_id);
    1725                 : 
    1726                 :     /*
    1727 ECB             :      * Tell the cumulative stats system to forget it immediately, too.
    1728                 :      */
    1729 GIC          20 :     pgstat_drop_database(db_id);
    1730                 : 
    1731                 :     /*
    1732                 :      * Tell checkpointer to forget any pending fsync and unlink requests for
    1733                 :      * files in the database; else the fsyncs will fail at next checkpoint, or
    1734                 :      * worse, it will delete files that belong to a newly created database
    1735                 :      * with the same OID.
    1736 ECB             :      */
    1737 GBC          20 :     ForgetDatabaseSyncRequests(db_id);
    1738                 : 
    1739                 :     /*
    1740                 :      * Force a checkpoint to make sure the checkpointer has received the
    1741                 :      * message sent by ForgetDatabaseSyncRequests.
    1742                 :      */
    1743 GIC          20 :     RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    1744                 : 
    1745                 :     /* Close all smgr fds in all backends. */
    1746 CBC          20 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    1747 ECB             : 
    1748 EUB             :     /*
    1749                 :      * Remove all tablespace subdirs belonging to the database.
    1750 ECB             :      */
    1751 GIC          20 :     remove_dbtablespaces(db_id);
    1752 ECB             : 
    1753                 :     /*
    1754                 :      * Close pg_database, but keep lock till commit.
    1755                 :      */
    1756 GIC          20 :     table_close(pgdbrel, NoLock);
    1757 ECB             : 
    1758                 :     /*
    1759                 :      * Force synchronous commit, thus minimizing the window between removal of
    1760                 :      * the database files and committal of the transaction. If we crash before
    1761                 :      * committing, we'll have a DB that's gone on disk but still there
    1762                 :      * according to pg_database, which is not good.
    1763                 :      */
    1764 GIC          20 :     ForceSyncCommit();
    1765                 : }
    1766                 : 
    1767                 : 
    1768 ECB             : /*
    1769                 :  * Rename database
    1770                 :  */
    1771                 : ObjectAddress
    1772 UIC           0 : RenameDatabase(const char *oldname, const char *newname)
    1773 ECB             : {
    1774                 :     Oid         db_id;
    1775                 :     HeapTuple   newtup;
    1776                 :     Relation    rel;
    1777                 :     int         notherbackends;
    1778                 :     int         npreparedxacts;
    1779                 :     ObjectAddress address;
    1780                 : 
    1781                 :     /*
    1782                 :      * Look up the target database's OID, and get exclusive lock on it. We
    1783                 :      * need this for the same reasons as DROP DATABASE.
    1784                 :      */
    1785 LBC           0 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    1786                 : 
    1787 UIC           0 :     if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
    1788                 :                      NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
    1789               0 :         ereport(ERROR,
    1790                 :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1791                 :                  errmsg("database \"%s\" does not exist", oldname)));
    1792                 : 
    1793 ECB             :     /* must be owner */
    1794 UNC           0 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1795 UIC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1796                 :                        oldname);
    1797                 : 
    1798                 :     /* must have createdb rights */
    1799 LBC           0 :     if (!have_createdb_privilege())
    1800 UIC           0 :         ereport(ERROR,
    1801                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1802 ECB             :                  errmsg("permission denied to rename database")));
    1803                 : 
    1804                 :     /*
    1805                 :      * If built with appropriate switch, whine when regression-testing
    1806                 :      * conventions for database names are violated.
    1807                 :      */
    1808                 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1809                 :     if (strstr(newname, "regression") == NULL)
    1810                 :         elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
    1811                 : #endif
    1812                 : 
    1813                 :     /*
    1814                 :      * Make sure the new name doesn't exist.  See notes for same error in
    1815                 :      * CREATE DATABASE.
    1816                 :      */
    1817 UIC           0 :     if (OidIsValid(get_database_oid(newname, true)))
    1818               0 :         ereport(ERROR,
    1819                 :                 (errcode(ERRCODE_DUPLICATE_DATABASE),
    1820 ECB             :                  errmsg("database \"%s\" already exists", newname)));
    1821                 : 
    1822                 :     /*
    1823                 :      * XXX Client applications probably store the current database somewhere,
    1824                 :      * so renaming it could cause confusion.  On the other hand, there may not
    1825                 :      * be an actual problem besides a little confusion, so think about this
    1826                 :      * and decide.
    1827                 :      */
    1828 UBC           0 :     if (db_id == MyDatabaseId)
    1829 UIC           0 :         ereport(ERROR,
    1830                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1831                 :                  errmsg("current database cannot be renamed")));
    1832                 : 
    1833                 :     /*
    1834                 :      * Make sure the database does not have active sessions.  This is the same
    1835                 :      * concern as above, but applied to other sessions.
    1836                 :      *
    1837                 :      * As in CREATE DATABASE, check this after other error conditions.
    1838                 :      */
    1839               0 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1840               0 :         ereport(ERROR,
    1841 EUB             :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1842                 :                  errmsg("database \"%s\" is being accessed by other users",
    1843                 :                         oldname),
    1844                 :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1845                 : 
    1846                 :     /* rename */
    1847 UIC           0 :     newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
    1848               0 :     if (!HeapTupleIsValid(newtup))
    1849               0 :         elog(ERROR, "cache lookup failed for database %u", db_id);
    1850 UBC           0 :     namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
    1851               0 :     CatalogTupleUpdate(rel, &newtup->t_self, newtup);
    1852                 : 
    1853 UIC           0 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    1854                 : 
    1855 UBC           0 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    1856 EUB             : 
    1857                 :     /*
    1858                 :      * Close pg_database, but keep lock till commit.
    1859                 :      */
    1860 UIC           0 :     table_close(rel, NoLock);
    1861                 : 
    1862               0 :     return address;
    1863                 : }
    1864                 : 
    1865                 : 
    1866                 : /*
    1867                 :  * ALTER DATABASE SET TABLESPACE
    1868                 :  */
    1869                 : static void
    1870 GIC           5 : movedb(const char *dbname, const char *tblspcname)
    1871                 : {
    1872                 :     Oid         db_id;
    1873 EUB             :     Relation    pgdbrel;
    1874                 :     int         notherbackends;
    1875                 :     int         npreparedxacts;
    1876                 :     HeapTuple   oldtuple,
    1877                 :                 newtuple;
    1878                 :     Oid         src_tblspcoid,
    1879                 :                 dst_tblspcoid;
    1880                 :     ScanKeyData scankey;
    1881                 :     SysScanDesc sysscan;
    1882                 :     AclResult   aclresult;
    1883                 :     char       *src_dbpath;
    1884                 :     char       *dst_dbpath;
    1885                 :     DIR        *dstdir;
    1886                 :     struct dirent *xlde;
    1887                 :     movedb_failure_params fparms;
    1888                 : 
    1889                 :     /*
    1890                 :      * Look up the target database's OID, and get exclusive lock on it. We
    1891                 :      * need this to ensure that no new backend starts up in the database while
    1892                 :      * we are moving it, and that no one is using it as a CREATE DATABASE
    1893                 :      * template or trying to delete it.
    1894                 :      */
    1895 GIC           5 :     pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
    1896                 : 
    1897               5 :     if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
    1898                 :                      NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
    1899 UIC           0 :         ereport(ERROR,
    1900 EUB             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    1901                 :                  errmsg("database \"%s\" does not exist", dbname)));
    1902                 : 
    1903                 :     /*
    1904                 :      * We actually need a session lock, so that the lock will persist across
    1905                 :      * the commit/restart below.  (We could almost get away with letting the
    1906                 :      * lock be released at commit, except that someone could try to move
    1907                 :      * relations of the DB back into the old directory while we rmtree() it.)
    1908                 :      */
    1909 GIC           5 :     LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    1910                 :                                AccessExclusiveLock);
    1911                 : 
    1912                 :     /*
    1913 EUB             :      * Permission checks
    1914                 :      */
    1915 GNC           5 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    1916 UIC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    1917                 :                        dbname);
    1918                 : 
    1919                 :     /*
    1920                 :      * Obviously can't move the tables of my own database
    1921                 :      */
    1922 GIC           5 :     if (db_id == MyDatabaseId)
    1923 LBC           0 :         ereport(ERROR,
    1924                 :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1925                 :                  errmsg("cannot change the tablespace of the currently open database")));
    1926                 : 
    1927                 :     /*
    1928                 :      * Get tablespace's oid
    1929                 :      */
    1930 GIC           5 :     dst_tblspcoid = get_tablespace_oid(tblspcname, false);
    1931                 : 
    1932                 :     /*
    1933                 :      * Permission checks
    1934                 :      */
    1935 GNC           5 :     aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
    1936                 :                                        ACL_CREATE);
    1937 GIC           5 :     if (aclresult != ACLCHECK_OK)
    1938 UIC           0 :         aclcheck_error(aclresult, OBJECT_TABLESPACE,
    1939                 :                        tblspcname);
    1940                 : 
    1941                 :     /*
    1942                 :      * pg_global must never be the default tablespace
    1943                 :      */
    1944 GIC           5 :     if (dst_tblspcoid == GLOBALTABLESPACE_OID)
    1945 UIC           0 :         ereport(ERROR,
    1946                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1947                 :                  errmsg("pg_global cannot be used as default tablespace")));
    1948 ECB             : 
    1949                 :     /*
    1950                 :      * No-op if same tablespace
    1951                 :      */
    1952 GBC           5 :     if (src_tblspcoid == dst_tblspcoid)
    1953                 :     {
    1954 UIC           0 :         table_close(pgdbrel, NoLock);
    1955               0 :         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    1956                 :                                      AccessExclusiveLock);
    1957               0 :         return;
    1958                 :     }
    1959                 : 
    1960                 :     /*
    1961                 :      * Check for other backends in the target database.  (Because we hold the
    1962 ECB             :      * database lock, no new ones can start after this.)
    1963                 :      *
    1964                 :      * As in CREATE DATABASE, check this after other error conditions.
    1965                 :      */
    1966 GIC           5 :     if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
    1967 UIC           0 :         ereport(ERROR,
    1968 ECB             :                 (errcode(ERRCODE_OBJECT_IN_USE),
    1969 EUB             :                  errmsg("database \"%s\" is being accessed by other users",
    1970                 :                         dbname),
    1971                 :                  errdetail_busy_db(notherbackends, npreparedxacts)));
    1972                 : 
    1973                 :     /*
    1974                 :      * Get old and new database paths
    1975 ECB             :      */
    1976 GBC           5 :     src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
    1977 GIC           5 :     dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
    1978                 : 
    1979                 :     /*
    1980                 :      * Force a checkpoint before proceeding. This will force all dirty
    1981                 :      * buffers, including those of unlogged tables, out to disk, to ensure
    1982                 :      * source database is up-to-date on disk for the copy.
    1983 ECB             :      * FlushDatabaseBuffers() would suffice for that, but we also want to
    1984                 :      * process any pending unlink requests. Otherwise, the check for existing
    1985                 :      * files in the target directory might fail unnecessarily, not to mention
    1986                 :      * that the copy might fail due to source files getting deleted under it.
    1987                 :      * On Windows, this also ensures that background procs don't hold any open
    1988                 :      * files, which would cause rmdir() to fail.
    1989                 :      */
    1990 CBC           5 :     RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
    1991 EUB             :                       | CHECKPOINT_FLUSH_ALL);
    1992                 : 
    1993                 :     /* Close all smgr fds in all backends. */
    1994 GIC           5 :     WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    1995                 : 
    1996                 :     /*
    1997 ECB             :      * Now drop all buffers holding data of the target database; they should
    1998 EUB             :      * no longer be dirty so DropDatabaseBuffers is safe.
    1999                 :      *
    2000                 :      * It might seem that we could just let these buffers age out of shared
    2001                 :      * buffers naturally, since they should not get referenced anymore.  The
    2002                 :      * problem with that is that if the user later moves the database back to
    2003                 :      * its original tablespace, any still-surviving buffers would appear to
    2004                 :      * contain valid data again --- but they'd be missing any changes made in
    2005 ECB             :      * the database while it was in the new tablespace.  In any case, freeing
    2006                 :      * buffers that should never be used again seems worth the cycles.
    2007 EUB             :      *
    2008                 :      * Note: it'd be sufficient to get rid of buffers matching db_id and
    2009                 :      * src_tblspcoid, but bufmgr.c presently provides no API for that.
    2010                 :      */
    2011 GIC           5 :     DropDatabaseBuffers(db_id);
    2012                 : 
    2013                 :     /*
    2014                 :      * Check for existence of files in the target directory, i.e., objects of
    2015                 :      * this database that are already in the target tablespace.  We can't
    2016                 :      * allow the move in such a case, because we would need to change those
    2017                 :      * relations' pg_class.reltablespace entries to zero, and we don't have
    2018                 :      * access to the DB's pg_class to do so.
    2019 ECB             :      */
    2020 GBC           5 :     dstdir = AllocateDir(dst_dbpath);
    2021 GIC           5 :     if (dstdir != NULL)
    2022                 :     {
    2023 UIC           0 :         while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
    2024                 :         {
    2025               0 :             if (strcmp(xlde->d_name, ".") == 0 ||
    2026               0 :                 strcmp(xlde->d_name, "..") == 0)
    2027               0 :                 continue;
    2028                 : 
    2029 LBC           0 :             ereport(ERROR,
    2030 ECB             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2031                 :                      errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
    2032                 :                             dbname, tblspcname),
    2033                 :                      errhint("You must move them back to the database's default tablespace before using this command.")));
    2034                 :         }
    2035                 : 
    2036 UIC           0 :         FreeDir(dstdir);
    2037                 : 
    2038                 :         /*
    2039                 :          * The directory exists but is empty. We must remove it before using
    2040                 :          * the copydir function.
    2041                 :          */
    2042               0 :         if (rmdir(dst_dbpath) != 0)
    2043 LBC           0 :             elog(ERROR, "could not remove directory \"%s\": %m",
    2044                 :                  dst_dbpath);
    2045                 :     }
    2046                 : 
    2047 ECB             :     /*
    2048                 :      * Use an ENSURE block to make sure we remove the debris if the copy fails
    2049                 :      * (eg, due to out-of-disk-space).  This is not a 100% solution, because
    2050                 :      * of the possibility of failure during transaction commit, but it should
    2051                 :      * handle most scenarios.
    2052                 :      */
    2053 GIC           5 :     fparms.dest_dboid = db_id;
    2054               5 :     fparms.dest_tsoid = dst_tblspcoid;
    2055               5 :     PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2056                 :                             PointerGetDatum(&fparms));
    2057                 :     {
    2058 GNC           5 :         Datum       new_record[Natts_pg_database] = {0};
    2059               5 :         bool        new_record_nulls[Natts_pg_database] = {0};
    2060               5 :         bool        new_record_repl[Natts_pg_database] = {0};
    2061                 : 
    2062                 :         /*
    2063                 :          * Copy files from the old tablespace to the new one
    2064                 :          */
    2065 GIC           5 :         copydir(src_dbpath, dst_dbpath, false);
    2066                 : 
    2067                 :         /*
    2068 ECB             :          * Record the filesystem change in XLOG
    2069                 :          */
    2070                 :         {
    2071                 :             xl_dbase_create_file_copy_rec xlrec;
    2072                 : 
    2073 GIC           5 :             xlrec.db_id = db_id;
    2074               5 :             xlrec.tablespace_id = dst_tblspcoid;
    2075               5 :             xlrec.src_db_id = db_id;
    2076               5 :             xlrec.src_tablespace_id = src_tblspcoid;
    2077 ECB             : 
    2078 CBC           5 :             XLogBeginInsert();
    2079 GIC           5 :             XLogRegisterData((char *) &xlrec,
    2080 EUB             :                              sizeof(xl_dbase_create_file_copy_rec));
    2081                 : 
    2082 GBC           5 :             (void) XLogInsert(RM_DBASE_ID,
    2083 EUB             :                               XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
    2084                 :         }
    2085                 : 
    2086                 :         /*
    2087                 :          * Update the database's pg_database tuple
    2088                 :          */
    2089 GIC           5 :         ScanKeyInit(&scankey,
    2090                 :                     Anum_pg_database_datname,
    2091                 :                     BTEqualStrategyNumber, F_NAMEEQ,
    2092                 :                     CStringGetDatum(dbname));
    2093 GBC           5 :         sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
    2094                 :                                      NULL, 1, &scankey);
    2095 GIC           5 :         oldtuple = systable_getnext(sysscan);
    2096               5 :         if (!HeapTupleIsValid(oldtuple))    /* shouldn't happen... */
    2097 UIC           0 :             ereport(ERROR,
    2098                 :                     (errcode(ERRCODE_UNDEFINED_DATABASE),
    2099 EUB             :                      errmsg("database \"%s\" does not exist", dbname)));
    2100                 : 
    2101 GIC           5 :         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
    2102               5 :         new_record_repl[Anum_pg_database_dattablespace - 1] = true;
    2103                 : 
    2104               5 :         newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
    2105                 :                                      new_record,
    2106 ECB             :                                      new_record_nulls, new_record_repl);
    2107 CBC           5 :         CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
    2108 ECB             : 
    2109 GIC           5 :         InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2110                 : 
    2111 CBC           5 :         systable_endscan(sysscan);
    2112 ECB             : 
    2113                 :         /*
    2114                 :          * Force another checkpoint here.  As in CREATE DATABASE, this is to
    2115                 :          * ensure that we don't have to replay a committed
    2116                 :          * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
    2117                 :          * any unlogged operations done in the new DB tablespace before the
    2118                 :          * next checkpoint.
    2119                 :          */
    2120 GIC           5 :         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
    2121                 : 
    2122                 :         /*
    2123                 :          * Force synchronous commit, thus minimizing the window between
    2124                 :          * copying the database files and committal of the transaction. If we
    2125                 :          * crash before committing, we'll leave an orphaned set of files on
    2126 ECB             :          * disk, which is not fatal but not good either.
    2127                 :          */
    2128 CBC           5 :         ForceSyncCommit();
    2129 ECB             : 
    2130                 :         /*
    2131                 :          * Close pg_database, but keep lock till commit.
    2132                 :          */
    2133 GIC           5 :         table_close(pgdbrel, NoLock);
    2134                 :     }
    2135 CBC           5 :     PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
    2136                 :                                 PointerGetDatum(&fparms));
    2137                 : 
    2138                 :     /*
    2139                 :      * Commit the transaction so that the pg_database update is committed. If
    2140                 :      * we crash while removing files, the database won't be corrupt, we'll
    2141                 :      * just leave some orphaned files in the old directory.
    2142 ECB             :      *
    2143                 :      * (This is OK because we know we aren't inside a transaction block.)
    2144                 :      *
    2145                 :      * XXX would it be safe/better to do this inside the ensure block?  Not
    2146                 :      * convinced it's a good idea; consider elog just after the transaction
    2147                 :      * really commits.
    2148                 :      */
    2149 CBC           5 :     PopActiveSnapshot();
    2150 GBC           5 :     CommitTransactionCommand();
    2151                 : 
    2152                 :     /* Start new transaction for the remaining work; don't need a snapshot */
    2153 GIC           5 :     StartTransactionCommand();
    2154 ECB             : 
    2155                 :     /*
    2156                 :      * Remove files from the old tablespace
    2157                 :      */
    2158 GIC           5 :     if (!rmtree(src_dbpath, true))
    2159 UIC           0 :         ereport(WARNING,
    2160 ECB             :                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
    2161                 :                         src_dbpath)));
    2162                 : 
    2163                 :     /*
    2164                 :      * Record the filesystem change in XLOG
    2165                 :      */
    2166                 :     {
    2167                 :         xl_dbase_drop_rec xlrec;
    2168                 : 
    2169 GIC           5 :         xlrec.db_id = db_id;
    2170               5 :         xlrec.ntablespaces = 1;
    2171                 : 
    2172               5 :         XLogBeginInsert();
    2173 CBC           5 :         XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
    2174 GIC           5 :         XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
    2175                 : 
    2176               5 :         (void) XLogInsert(RM_DBASE_ID,
    2177                 :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    2178                 :     }
    2179                 : 
    2180                 :     /* Now it's safe to release the database lock */
    2181 CBC           5 :     UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
    2182                 :                                  AccessExclusiveLock);
    2183                 : 
    2184 GIC           5 :     pfree(src_dbpath);
    2185               5 :     pfree(dst_dbpath);
    2186 ECB             : }
    2187                 : 
    2188                 : /* Error cleanup callback for movedb */
    2189                 : static void
    2190 UIC           0 : movedb_failure_callback(int code, Datum arg)
    2191                 : {
    2192               0 :     movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
    2193                 :     char       *dstpath;
    2194                 : 
    2195                 :     /* Get rid of anything we managed to copy to the target directory */
    2196               0 :     dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
    2197                 : 
    2198               0 :     (void) rmtree(dstpath, true);
    2199                 : 
    2200               0 :     pfree(dstpath);
    2201               0 : }
    2202 ECB             : 
    2203                 : /*
    2204                 :  * Process options and call dropdb function.
    2205                 :  */
    2206                 : void
    2207 GIC          35 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
    2208                 : {
    2209              35 :     bool        force = false;
    2210                 :     ListCell   *lc;
    2211 ECB             : 
    2212 GBC          48 :     foreach(lc, stmt->options)
    2213                 :     {
    2214 GIC          13 :         DefElem    *opt = (DefElem *) lfirst(lc);
    2215                 : 
    2216              13 :         if (strcmp(opt->defname, "force") == 0)
    2217              13 :             force = true;
    2218                 :         else
    2219 UIC           0 :             ereport(ERROR,
    2220                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2221                 :                      errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
    2222 ECB             :                      parser_errposition(pstate, opt->location)));
    2223                 :     }
    2224                 : 
    2225 CBC          35 :     dropdb(stmt->dbname, stmt->missing_ok, force);
    2226              28 : }
    2227 ECB             : 
    2228                 : /*
    2229                 :  * ALTER DATABASE name ...
    2230                 :  */
    2231                 : Oid
    2232 GIC          10 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    2233                 : {
    2234 ECB             :     Relation    rel;
    2235                 :     Oid         dboid;
    2236                 :     HeapTuple   tuple,
    2237                 :                 newtuple;
    2238                 :     Form_pg_database datform;
    2239                 :     ScanKeyData scankey;
    2240                 :     SysScanDesc scan;
    2241                 :     ListCell   *option;
    2242 GIC          10 :     bool        dbistemplate = false;
    2243 GBC          10 :     bool        dballowconnections = true;
    2244 GIC          10 :     int         dbconnlimit = -1;
    2245 GBC          10 :     DefElem    *distemplate = NULL;
    2246 GIC          10 :     DefElem    *dallowconnections = NULL;
    2247              10 :     DefElem    *dconnlimit = NULL;
    2248              10 :     DefElem    *dtablespace = NULL;
    2249 GNC          10 :     Datum       new_record[Natts_pg_database] = {0};
    2250              10 :     bool        new_record_nulls[Natts_pg_database] = {0};
    2251              10 :     bool        new_record_repl[Natts_pg_database] = {0};
    2252                 : 
    2253 EUB             :     /* Extract options from the statement node tree */
    2254 GBC          20 :     foreach(option, stmt->options)
    2255                 :     {
    2256 GIC          10 :         DefElem    *defel = (DefElem *) lfirst(option);
    2257                 : 
    2258              10 :         if (strcmp(defel->defname, "is_template") == 0)
    2259                 :         {
    2260 CBC           3 :             if (distemplate)
    2261 UIC           0 :                 errorConflictingDefElem(defel, pstate);
    2262 CBC           3 :             distemplate = defel;
    2263                 :         }
    2264 GIC           7 :         else if (strcmp(defel->defname, "allow_connections") == 0)
    2265 ECB             :         {
    2266 GIC           2 :             if (dallowconnections)
    2267 LBC           0 :                 errorConflictingDefElem(defel, pstate);
    2268 GIC           2 :             dallowconnections = defel;
    2269 ECB             :         }
    2270 CBC           5 :         else if (strcmp(defel->defname, "connection_limit") == 0)
    2271                 :         {
    2272 UBC           0 :             if (dconnlimit)
    2273 UIC           0 :                 errorConflictingDefElem(defel, pstate);
    2274               0 :             dconnlimit = defel;
    2275                 :         }
    2276 GIC           5 :         else if (strcmp(defel->defname, "tablespace") == 0)
    2277                 :         {
    2278 CBC           5 :             if (dtablespace)
    2279 LBC           0 :                 errorConflictingDefElem(defel, pstate);
    2280 GIC           5 :             dtablespace = defel;
    2281                 :         }
    2282                 :         else
    2283 UIC           0 :             ereport(ERROR,
    2284                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2285 ECB             :                      errmsg("option \"%s\" not recognized", defel->defname),
    2286                 :                      parser_errposition(pstate, defel->location)));
    2287                 :     }
    2288                 : 
    2289 GIC          10 :     if (dtablespace)
    2290                 :     {
    2291                 :         /*
    2292                 :          * While the SET TABLESPACE syntax doesn't allow any other options,
    2293                 :          * somebody could write "WITH TABLESPACE ...".  Forbid any other
    2294                 :          * options from being specified in that case.
    2295 ECB             :          */
    2296 CBC           5 :         if (list_length(stmt->options) != 1)
    2297 LBC           0 :             ereport(ERROR,
    2298 ECB             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2299                 :                      errmsg("option \"%s\" cannot be specified with other options",
    2300                 :                             dtablespace->defname),
    2301                 :                      parser_errposition(pstate, dtablespace->location)));
    2302                 :         /* this case isn't allowed within a transaction block */
    2303 CBC           5 :         PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
    2304               5 :         movedb(stmt->dbname, defGetString(dtablespace));
    2305 GIC           5 :         return InvalidOid;
    2306                 :     }
    2307 ECB             : 
    2308 GIC           5 :     if (distemplate && distemplate->arg)
    2309 CBC           3 :         dbistemplate = defGetBoolean(distemplate);
    2310 GIC           5 :     if (dallowconnections && dallowconnections->arg)
    2311 CBC           2 :         dballowconnections = defGetBoolean(dallowconnections);
    2312 GIC           5 :     if (dconnlimit && dconnlimit->arg)
    2313 ECB             :     {
    2314 UBC           0 :         dbconnlimit = defGetInt32(dconnlimit);
    2315 LBC           0 :         if (dbconnlimit < -1)
    2316 UIC           0 :             ereport(ERROR,
    2317 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2318                 :                      errmsg("invalid connection limit: %d", dbconnlimit)));
    2319                 :     }
    2320 EUB             : 
    2321 ECB             :     /*
    2322                 :      * Get the old tuple.  We don't need a lock on the database per se,
    2323                 :      * because we're not going to do anything that would mess up incoming
    2324                 :      * connections.
    2325 EUB             :      */
    2326 GBC           5 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2327               5 :     ScanKeyInit(&scankey,
    2328                 :                 Anum_pg_database_datname,
    2329 ECB             :                 BTEqualStrategyNumber, F_NAMEEQ,
    2330 GIC           5 :                 CStringGetDatum(stmt->dbname));
    2331 CBC           5 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2332 EUB             :                               NULL, 1, &scankey);
    2333 CBC           5 :     tuple = systable_getnext(scan);
    2334 GIC           5 :     if (!HeapTupleIsValid(tuple))
    2335 UIC           0 :         ereport(ERROR,
    2336 EUB             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2337                 :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2338                 : 
    2339 GIC           5 :     datform = (Form_pg_database) GETSTRUCT(tuple);
    2340               5 :     dboid = datform->oid;
    2341                 : 
    2342 GNC           5 :     if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
    2343 UIC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2344               0 :                        stmt->dbname);
    2345                 : 
    2346                 :     /*
    2347                 :      * In order to avoid getting locked out and having to go through
    2348                 :      * standalone mode, we refuse to disallow connections to the database
    2349 ECB             :      * we're currently connected to.  Lockout can still happen with concurrent
    2350 EUB             :      * sessions but the likeliness of that is not high enough to worry about.
    2351                 :      */
    2352 GIC           5 :     if (!dballowconnections && dboid == MyDatabaseId)
    2353 UIC           0 :         ereport(ERROR,
    2354                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2355                 :                  errmsg("cannot disallow connections for current database")));
    2356 ECB             : 
    2357                 :     /*
    2358                 :      * Build an updated tuple, perusing the information just obtained
    2359                 :      */
    2360 CBC           5 :     if (distemplate)
    2361 ECB             :     {
    2362 GIC           3 :         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
    2363 GBC           3 :         new_record_repl[Anum_pg_database_datistemplate - 1] = true;
    2364 EUB             :     }
    2365 GBC           5 :     if (dallowconnections)
    2366                 :     {
    2367 GIC           2 :         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    2368               2 :         new_record_repl[Anum_pg_database_datallowconn - 1] = true;
    2369                 :     }
    2370               5 :     if (dconnlimit)
    2371                 :     {
    2372 UIC           0 :         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    2373               0 :         new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
    2374                 :     }
    2375 ECB             : 
    2376 CBC           5 :     newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
    2377                 :                                  new_record_nulls, new_record_repl);
    2378 GIC           5 :     CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
    2379 ECB             : 
    2380 CBC           5 :     InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
    2381                 : 
    2382               5 :     systable_endscan(scan);
    2383 ECB             : 
    2384 EUB             :     /* Close pg_database, but keep lock till commit */
    2385 GIC           5 :     table_close(rel, NoLock);
    2386                 : 
    2387               5 :     return dboid;
    2388 ECB             : }
    2389                 : 
    2390                 : 
    2391                 : /*
    2392 EUB             :  * ALTER DATABASE name REFRESH COLLATION VERSION
    2393                 :  */
    2394                 : ObjectAddress
    2395 GIC           6 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
    2396                 : {
    2397                 :     Relation    rel;
    2398                 :     ScanKeyData scankey;
    2399                 :     SysScanDesc scan;
    2400                 :     Oid         db_id;
    2401 ECB             :     HeapTuple   tuple;
    2402 EUB             :     Form_pg_database datForm;
    2403                 :     ObjectAddress address;
    2404                 :     Datum       datum;
    2405                 :     bool        isnull;
    2406                 :     char       *oldversion;
    2407                 :     char       *newversion;
    2408                 : 
    2409 CBC           6 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2410 GIC           6 :     ScanKeyInit(&scankey,
    2411 ECB             :                 Anum_pg_database_datname,
    2412                 :                 BTEqualStrategyNumber, F_NAMEEQ,
    2413 GIC           6 :                 CStringGetDatum(stmt->dbname));
    2414 CBC           6 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2415                 :                               NULL, 1, &scankey);
    2416               6 :     tuple = systable_getnext(scan);
    2417               6 :     if (!HeapTupleIsValid(tuple))
    2418 UIC           0 :         ereport(ERROR,
    2419 ECB             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2420                 :                  errmsg("database \"%s\" does not exist", stmt->dbname)));
    2421 EUB             : 
    2422 GBC           6 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2423 GIC           6 :     db_id = datForm->oid;
    2424                 : 
    2425 GNC           6 :     if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2426 UIC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2427 LBC           0 :                        stmt->dbname);
    2428                 : 
    2429 CBC           6 :     datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
    2430 GIC           6 :     oldversion = isnull ? NULL : TextDatumGetCString(datum);
    2431 ECB             : 
    2432 GIC           6 :     datum = heap_getattr(tuple, datForm->datlocprovider == COLLPROVIDER_ICU ? Anum_pg_database_daticulocale : Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
    2433               6 :     if (isnull)
    2434 LBC           0 :         elog(ERROR, "unexpected null in pg_database");
    2435 GIC           6 :     newversion = get_collation_actual_version(datForm->datlocprovider, TextDatumGetCString(datum));
    2436 ECB             : 
    2437                 :     /* cannot change from NULL to non-NULL or vice versa */
    2438 GIC           6 :     if ((!oldversion && newversion) || (oldversion && !newversion))
    2439 UIC           0 :         elog(ERROR, "invalid collation version change");
    2440 GIC           6 :     else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
    2441 UIC           0 :     {
    2442               0 :         bool        nulls[Natts_pg_database] = {0};
    2443               0 :         bool        replaces[Natts_pg_database] = {0};
    2444 LBC           0 :         Datum       values[Natts_pg_database] = {0};
    2445                 : 
    2446 UIC           0 :         ereport(NOTICE,
    2447                 :                 (errmsg("changing version from %s to %s",
    2448                 :                         oldversion, newversion)));
    2449                 : 
    2450               0 :         values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
    2451               0 :         replaces[Anum_pg_database_datcollversion - 1] = true;
    2452                 : 
    2453               0 :         tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
    2454                 :                                   values, nulls, replaces);
    2455               0 :         CatalogTupleUpdate(rel, &tuple->t_self, tuple);
    2456               0 :         heap_freetuple(tuple);
    2457                 :     }
    2458 ECB             :     else
    2459 CBC           6 :         ereport(NOTICE,
    2460                 :                 (errmsg("version has not changed")));
    2461                 : 
    2462               6 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2463 ECB             : 
    2464 GIC           6 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2465 ECB             : 
    2466 CBC           6 :     systable_endscan(scan);
    2467 EUB             : 
    2468 GIC           6 :     table_close(rel, NoLock);
    2469                 : 
    2470               6 :     return address;
    2471 ECB             : }
    2472                 : 
    2473                 : 
    2474                 : /*
    2475 EUB             :  * ALTER DATABASE name SET ...
    2476                 :  */
    2477                 : Oid
    2478 CBC         516 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
    2479 ECB             : {
    2480 GIC         516 :     Oid         datid = get_database_oid(stmt->dbname, false);
    2481 ECB             : 
    2482                 :     /*
    2483 EUB             :      * Obtain a lock on the database and make sure it didn't go away in the
    2484 ECB             :      * meantime.
    2485                 :      */
    2486 GIC         516 :     shdepLockAndCheckObject(DatabaseRelationId, datid);
    2487 ECB             : 
    2488 GNC         516 :     if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
    2489 LBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2490 UBC           0 :                        stmt->dbname);
    2491 EUB             : 
    2492 GBC         516 :     AlterSetting(datid, InvalidOid, stmt->setstmt);
    2493 EUB             : 
    2494 GIC         516 :     UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
    2495 EUB             : 
    2496 GIC         516 :     return datid;
    2497                 : }
    2498                 : 
    2499 EUB             : 
    2500                 : /*
    2501                 :  * ALTER DATABASE name OWNER TO newowner
    2502                 :  */
    2503                 : ObjectAddress
    2504 GBC          18 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
    2505 EUB             : {
    2506                 :     Oid         db_id;
    2507                 :     HeapTuple   tuple;
    2508 ECB             :     Relation    rel;
    2509                 :     ScanKeyData scankey;
    2510                 :     SysScanDesc scan;
    2511                 :     Form_pg_database datForm;
    2512                 :     ObjectAddress address;
    2513                 : 
    2514                 :     /*
    2515                 :      * Get the old tuple.  We don't need a lock on the database per se,
    2516                 :      * because we're not going to do anything that would mess up incoming
    2517                 :      * connections.
    2518                 :      */
    2519 CBC          18 :     rel = table_open(DatabaseRelationId, RowExclusiveLock);
    2520 GIC          18 :     ScanKeyInit(&scankey,
    2521                 :                 Anum_pg_database_datname,
    2522                 :                 BTEqualStrategyNumber, F_NAMEEQ,
    2523                 :                 CStringGetDatum(dbname));
    2524              18 :     scan = systable_beginscan(rel, DatabaseNameIndexId, true,
    2525                 :                               NULL, 1, &scankey);
    2526              18 :     tuple = systable_getnext(scan);
    2527 CBC          18 :     if (!HeapTupleIsValid(tuple))
    2528 UIC           0 :         ereport(ERROR,
    2529 ECB             :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    2530                 :                  errmsg("database \"%s\" does not exist", dbname)));
    2531                 : 
    2532 GIC          18 :     datForm = (Form_pg_database) GETSTRUCT(tuple);
    2533              18 :     db_id = datForm->oid;
    2534                 : 
    2535 ECB             :     /*
    2536                 :      * If the new owner is the same as the existing owner, consider the
    2537                 :      * command to have succeeded.  This is to be consistent with other
    2538 EUB             :      * objects.
    2539                 :      */
    2540 GIC          18 :     if (datForm->datdba != newOwnerId)
    2541 ECB             :     {
    2542                 :         Datum       repl_val[Natts_pg_database];
    2543 GNC          12 :         bool        repl_null[Natts_pg_database] = {0};
    2544              12 :         bool        repl_repl[Natts_pg_database] = {0};
    2545 ECB             :         Acl        *newAcl;
    2546                 :         Datum       aclDatum;
    2547                 :         bool        isNull;
    2548                 :         HeapTuple   newtuple;
    2549                 : 
    2550                 :         /* Otherwise, must be owner of the existing object */
    2551 GNC          12 :         if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
    2552 UIC           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
    2553 ECB             :                            dbname);
    2554                 : 
    2555                 :         /* Must be able to become new owner */
    2556 GNC          12 :         check_can_set_role(GetUserId(), newOwnerId);
    2557                 : 
    2558                 :         /*
    2559                 :          * must have createdb rights
    2560                 :          *
    2561                 :          * NOTE: This is different from other alter-owner checks in that the
    2562                 :          * current user is checked for createdb privileges instead of the
    2563                 :          * destination owner.  This is consistent with the CREATE case for
    2564                 :          * databases.  Because superusers will always have this right, we need
    2565                 :          * no special case for them.
    2566                 :          */
    2567 GIC          12 :         if (!have_createdb_privilege())
    2568 LBC           0 :             ereport(ERROR,
    2569 ECB             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2570                 :                      errmsg("permission denied to change owner of database")));
    2571                 : 
    2572 CBC          12 :         repl_repl[Anum_pg_database_datdba - 1] = true;
    2573              12 :         repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
    2574 EUB             : 
    2575                 :         /*
    2576                 :          * Determine the modified ACL for the new owner.  This is only
    2577                 :          * necessary when the ACL is non-null.
    2578 ECB             :          */
    2579 CBC          12 :         aclDatum = heap_getattr(tuple,
    2580                 :                                 Anum_pg_database_datacl,
    2581                 :                                 RelationGetDescr(rel),
    2582                 :                                 &isNull);
    2583 GIC          12 :         if (!isNull)
    2584                 :         {
    2585 UIC           0 :             newAcl = aclnewowner(DatumGetAclP(aclDatum),
    2586 ECB             :                                  datForm->datdba, newOwnerId);
    2587 UIC           0 :             repl_repl[Anum_pg_database_datacl - 1] = true;
    2588               0 :             repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
    2589 ECB             :         }
    2590                 : 
    2591 GIC          12 :         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
    2592              12 :         CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
    2593                 : 
    2594              12 :         heap_freetuple(newtuple);
    2595                 : 
    2596                 :         /* Update owner dependency reference */
    2597 CBC          12 :         changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
    2598 EUB             :     }
    2599                 : 
    2600 GIC          18 :     InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
    2601                 : 
    2602 CBC          18 :     ObjectAddressSet(address, DatabaseRelationId, db_id);
    2603                 : 
    2604 GIC          18 :     systable_endscan(scan);
    2605                 : 
    2606                 :     /* Close pg_database, but keep lock till commit */
    2607              18 :     table_close(rel, NoLock);
    2608                 : 
    2609              18 :     return address;
    2610                 : }
    2611                 : 
    2612                 : 
    2613 ECB             : Datum
    2614 GBC         303 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
    2615                 : {
    2616 GIC         303 :     Oid         dbid = PG_GETARG_OID(0);
    2617                 :     HeapTuple   tp;
    2618 ECB             :     char        datlocprovider;
    2619                 :     Datum       datum;
    2620                 :     char       *version;
    2621                 : 
    2622 GIC         303 :     tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
    2623             303 :     if (!HeapTupleIsValid(tp))
    2624 LBC           0 :         ereport(ERROR,
    2625                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2626                 :                  errmsg("database with OID %u does not exist", dbid)));
    2627                 : 
    2628 CBC         303 :     datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
    2629                 : 
    2630 GNC         303 :     datum = SysCacheGetAttrNotNull(DATABASEOID, tp, datlocprovider == COLLPROVIDER_ICU ? Anum_pg_database_daticulocale : Anum_pg_database_datcollate);
    2631 GBC         303 :     version = get_collation_actual_version(datlocprovider, TextDatumGetCString(datum));
    2632                 : 
    2633 GIC         303 :     ReleaseSysCache(tp);
    2634 ECB             : 
    2635 CBC         303 :     if (version)
    2636 GIC         303 :         PG_RETURN_TEXT_P(cstring_to_text(version));
    2637 ECB             :     else
    2638 UIC           0 :         PG_RETURN_NULL();
    2639                 : }
    2640 ECB             : 
    2641                 : 
    2642                 : /*
    2643                 :  * Helper functions
    2644                 :  */
    2645                 : 
    2646                 : /*
    2647                 :  * Look up info about the database named "name".  If the database exists,
    2648                 :  * obtain the specified lock type on it, fill in any of the remaining
    2649                 :  * parameters that aren't NULL, and return true.  If no such database,
    2650                 :  * return false.
    2651                 :  */
    2652                 : static bool
    2653 GIC         844 : get_db_info(const char *name, LOCKMODE lockmode,
    2654                 :             Oid *dbIdP, Oid *ownerIdP,
    2655                 :             int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
    2656                 :             TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
    2657 ECB             :             Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbIculocale,
    2658                 :             char **dbIcurules,
    2659                 :             char *dbLocProvider,
    2660                 :             char **dbCollversion)
    2661                 : {
    2662 GIC         844 :     bool        result = false;
    2663                 :     Relation    relation;
    2664                 : 
    2665 GNC         844 :     Assert(name);
    2666 ECB             : 
    2667                 :     /* Caller may wish to grab a better lock on pg_database beforehand... */
    2668 GBC         844 :     relation = table_open(DatabaseRelationId, AccessShareLock);
    2669                 : 
    2670                 :     /*
    2671                 :      * Loop covers the rare case where the database is renamed before we can
    2672 ECB             :      * lock it.  We try again just in case we can find a new one of the same
    2673                 :      * name.
    2674                 :      */
    2675                 :     for (;;)
    2676 UIC           0 :     {
    2677 ECB             :         ScanKeyData scanKey;
    2678                 :         SysScanDesc scan;
    2679                 :         HeapTuple   tuple;
    2680                 :         Oid         dbOid;
    2681                 : 
    2682 EUB             :         /*
    2683                 :          * there's no syscache for database-indexed-by-name, so must do it the
    2684                 :          * hard way
    2685                 :          */
    2686 GIC         844 :         ScanKeyInit(&scanKey,
    2687                 :                     Anum_pg_database_datname,
    2688                 :                     BTEqualStrategyNumber, F_NAMEEQ,
    2689                 :                     CStringGetDatum(name));
    2690                 : 
    2691             844 :         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
    2692                 :                                   NULL, 1, &scanKey);
    2693                 : 
    2694             844 :         tuple = systable_getnext(scan);
    2695                 : 
    2696             844 :         if (!HeapTupleIsValid(tuple))
    2697 ECB             :         {
    2698                 :             /* definitely no database of that name */
    2699 GIC          14 :             systable_endscan(scan);
    2700              14 :             break;
    2701                 :         }
    2702                 : 
    2703             830 :         dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
    2704                 : 
    2705             830 :         systable_endscan(scan);
    2706 ECB             : 
    2707                 :         /*
    2708                 :          * Now that we have a database OID, we can try to lock the DB.
    2709                 :          */
    2710 GIC         830 :         if (lockmode != NoLock)
    2711             830 :             LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2712 ECB             : 
    2713                 :         /*
    2714                 :          * And now, re-fetch the tuple by OID.  If it's still there and still
    2715                 :          * the same name, we win; else, drop the lock and loop back to try
    2716                 :          * again.
    2717                 :          */
    2718 GIC         830 :         tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
    2719             830 :         if (HeapTupleIsValid(tuple))
    2720 EUB             :         {
    2721 GIC         830 :             Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
    2722                 : 
    2723             830 :             if (strcmp(name, NameStr(dbform->datname)) == 0)
    2724                 :             {
    2725                 :                 Datum       datum;
    2726                 :                 bool        isnull;
    2727                 : 
    2728                 :                 /* oid of the database */
    2729             830 :                 if (dbIdP)
    2730 CBC         830 :                     *dbIdP = dbOid;
    2731                 :                 /* oid of the owner */
    2732 GIC         830 :                 if (ownerIdP)
    2733             804 :                     *ownerIdP = dbform->datdba;
    2734                 :                 /* character encoding */
    2735 CBC         830 :                 if (encodingP)
    2736 GIC         804 :                     *encodingP = dbform->encoding;
    2737                 :                 /* allowed as template? */
    2738 CBC         830 :                 if (dbIsTemplateP)
    2739 GIC         825 :                     *dbIsTemplateP = dbform->datistemplate;
    2740 ECB             :                 /* allowing connections? */
    2741 GIC         830 :                 if (dbAllowConnP)
    2742             804 :                     *dbAllowConnP = dbform->datallowconn;
    2743 ECB             :                 /* limit of frozen XIDs */
    2744 CBC         830 :                 if (dbFrozenXidP)
    2745 GIC         804 :                     *dbFrozenXidP = dbform->datfrozenxid;
    2746                 :                 /* minimum MultiXactId */
    2747 CBC         830 :                 if (dbMinMultiP)
    2748 GIC         804 :                     *dbMinMultiP = dbform->datminmxid;
    2749 ECB             :                 /* default tablespace for this database */
    2750 GIC         830 :                 if (dbTablespace)
    2751             809 :                     *dbTablespace = dbform->dattablespace;
    2752                 :                 /* default locale settings for this database */
    2753             830 :                 if (dbLocProvider)
    2754 CBC         804 :                     *dbLocProvider = dbform->datlocprovider;
    2755             830 :                 if (dbCollate)
    2756                 :                 {
    2757 GNC         804 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
    2758 GIC         804 :                     *dbCollate = TextDatumGetCString(datum);
    2759                 :                 }
    2760             830 :                 if (dbCtype)
    2761 ECB             :                 {
    2762 GNC         804 :                     datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
    2763 CBC         804 :                     *dbCtype = TextDatumGetCString(datum);
    2764                 :                 }
    2765             830 :                 if (dbIculocale)
    2766                 :                 {
    2767 GIC         804 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticulocale, &isnull);
    2768             804 :                     if (isnull)
    2769              21 :                         *dbIculocale = NULL;
    2770                 :                     else
    2771 CBC         783 :                         *dbIculocale = TextDatumGetCString(datum);
    2772 ECB             :                 }
    2773 GNC         830 :                 if (dbIcurules)
    2774                 :                 {
    2775             804 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
    2776             804 :                     if (isnull)
    2777             804 :                         *dbIcurules = NULL;
    2778                 :                     else
    2779 UNC           0 :                         *dbIcurules = TextDatumGetCString(datum);
    2780                 :                 }
    2781 GIC         830 :                 if (dbCollversion)
    2782 ECB             :                 {
    2783 CBC         804 :                     datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
    2784 GIC         804 :                     if (isnull)
    2785 CBC         416 :                         *dbCollversion = NULL;
    2786 ECB             :                     else
    2787 GIC         388 :                         *dbCollversion = TextDatumGetCString(datum);
    2788 ECB             :                 }
    2789 CBC         830 :                 ReleaseSysCache(tuple);
    2790 GIC         830 :                 result = true;
    2791 CBC         830 :                 break;
    2792 ECB             :             }
    2793                 :             /* can only get here if it was just renamed */
    2794 LBC           0 :             ReleaseSysCache(tuple);
    2795 ECB             :         }
    2796                 : 
    2797 LBC           0 :         if (lockmode != NoLock)
    2798               0 :             UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
    2799                 :     }
    2800 ECB             : 
    2801 CBC         844 :     table_close(relation, AccessShareLock);
    2802                 : 
    2803             844 :     return result;
    2804 ECB             : }
    2805                 : 
    2806                 : /* Check if current user has createdb privileges */
    2807                 : bool
    2808 CBC         834 : have_createdb_privilege(void)
    2809                 : {
    2810             834 :     bool        result = false;
    2811                 :     HeapTuple   utup;
    2812 ECB             : 
    2813                 :     /* Superusers can always do everything */
    2814 GIC         834 :     if (superuser())
    2815 CBC         816 :         return true;
    2816                 : 
    2817              18 :     utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
    2818              18 :     if (HeapTupleIsValid(utup))
    2819 ECB             :     {
    2820 GIC          18 :         result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
    2821 CBC          18 :         ReleaseSysCache(utup);
    2822                 :     }
    2823              18 :     return result;
    2824                 : }
    2825 ECB             : 
    2826                 : /*
    2827                 :  * Remove tablespace directories
    2828                 :  *
    2829 EUB             :  * We don't know what tablespaces db_id is using, so iterate through all
    2830                 :  * tablespaces removing <tablespace>/db_id
    2831 ECB             :  */
    2832                 : static void
    2833 CBC          20 : remove_dbtablespaces(Oid db_id)
    2834 ECB             : {
    2835                 :     Relation    rel;
    2836                 :     TableScanDesc scan;
    2837                 :     HeapTuple   tuple;
    2838 GIC          20 :     List       *ltblspc = NIL;
    2839 ECB             :     ListCell   *cell;
    2840                 :     int         ntblspc;
    2841                 :     int         i;
    2842                 :     Oid        *tablespace_ids;
    2843                 : 
    2844 GBC          20 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    2845 GIC          20 :     scan = table_beginscan_catalog(rel, 0, NULL);
    2846              81 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    2847 EUB             :     {
    2848 GBC          61 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    2849 GIC          61 :         Oid         dsttablespace = spcform->oid;
    2850                 :         char       *dstpath;
    2851 ECB             :         struct stat st;
    2852                 : 
    2853                 :         /* Don't mess with the global tablespace */
    2854 GIC          61 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    2855              41 :             continue;
    2856                 : 
    2857              41 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    2858 ECB             : 
    2859 GIC          41 :         if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
    2860 ECB             :         {
    2861                 :             /* Assume we can ignore it */
    2862 GIC          21 :             pfree(dstpath);
    2863              21 :             continue;
    2864 ECB             :         }
    2865                 : 
    2866 GIC          20 :         if (!rmtree(dstpath, true))
    2867 LBC           0 :             ereport(WARNING,
    2868 ECB             :                     (errmsg("some useless files may be left behind in old database directory \"%s\"",
    2869                 :                             dstpath)));
    2870                 : 
    2871 CBC          20 :         ltblspc = lappend_oid(ltblspc, dsttablespace);
    2872 GIC          20 :         pfree(dstpath);
    2873 ECB             :     }
    2874                 : 
    2875 GIC          20 :     ntblspc = list_length(ltblspc);
    2876              20 :     if (ntblspc == 0)
    2877                 :     {
    2878 UIC           0 :         table_endscan(scan);
    2879               0 :         table_close(rel, AccessShareLock);
    2880               0 :         return;
    2881                 :     }
    2882                 : 
    2883 CBC          20 :     tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
    2884 GIC          20 :     i = 0;
    2885              40 :     foreach(cell, ltblspc)
    2886              20 :         tablespace_ids[i++] = lfirst_oid(cell);
    2887                 : 
    2888 ECB             :     /* Record the filesystem change in XLOG */
    2889                 :     {
    2890                 :         xl_dbase_drop_rec xlrec;
    2891                 : 
    2892 GIC          20 :         xlrec.db_id = db_id;
    2893              20 :         xlrec.ntablespaces = ntblspc;
    2894 ECB             : 
    2895 CBC          20 :         XLogBeginInsert();
    2896              20 :         XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
    2897 GIC          20 :         XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
    2898 ECB             : 
    2899 CBC          20 :         (void) XLogInsert(RM_DBASE_ID,
    2900                 :                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
    2901                 :     }
    2902                 : 
    2903 GIC          20 :     list_free(ltblspc);
    2904 CBC          20 :     pfree(tablespace_ids);
    2905 ECB             : 
    2906 GIC          20 :     table_endscan(scan);
    2907 CBC          20 :     table_close(rel, AccessShareLock);
    2908                 : }
    2909 ECB             : 
    2910                 : /*
    2911                 :  * Check for existing files that conflict with a proposed new DB OID;
    2912                 :  * return true if there are any
    2913                 :  *
    2914                 :  * If there were a subdirectory in any tablespace matching the proposed new
    2915                 :  * OID, we'd get a create failure due to the duplicate name ... and then we'd
    2916                 :  * try to remove that already-existing subdirectory during the cleanup in
    2917 EUB             :  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
    2918                 :  * instead we make this extra check before settling on the OID of the new
    2919                 :  * database.  This exactly parallels what GetNewRelFileNumber() does for table
    2920                 :  * relfilenumber values.
    2921 ECB             :  */
    2922                 : static bool
    2923 GIC         797 : check_db_file_conflict(Oid db_id)
    2924                 : {
    2925 CBC         797 :     bool        result = false;
    2926 ECB             :     Relation    rel;
    2927                 :     TableScanDesc scan;
    2928 EUB             :     HeapTuple   tuple;
    2929                 : 
    2930 GBC         797 :     rel = table_open(TableSpaceRelationId, AccessShareLock);
    2931 GIC         797 :     scan = table_beginscan_catalog(rel, 0, NULL);
    2932            2434 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    2933 ECB             :     {
    2934 CBC        1637 :         Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
    2935            1637 :         Oid         dsttablespace = spcform->oid;
    2936 ECB             :         char       *dstpath;
    2937                 :         struct stat st;
    2938                 : 
    2939                 :         /* Don't mess with the global tablespace */
    2940 GIC        1637 :         if (dsttablespace == GLOBALTABLESPACE_OID)
    2941             797 :             continue;
    2942 ECB             : 
    2943 CBC         840 :         dstpath = GetDatabasePath(db_id, dsttablespace);
    2944                 : 
    2945             840 :         if (lstat(dstpath, &st) == 0)
    2946 ECB             :         {
    2947                 :             /* Found a conflicting file (or directory, whatever) */
    2948 UIC           0 :             pfree(dstpath);
    2949 LBC           0 :             result = true;
    2950 UIC           0 :             break;
    2951                 :         }
    2952                 : 
    2953 CBC         840 :         pfree(dstpath);
    2954 ECB             :     }
    2955                 : 
    2956 CBC         797 :     table_endscan(scan);
    2957             797 :     table_close(rel, AccessShareLock);
    2958                 : 
    2959 GIC         797 :     return result;
    2960                 : }
    2961                 : 
    2962                 : /*
    2963                 :  * Issue a suitable errdetail message for a busy database
    2964                 :  */
    2965                 : static int
    2966 UIC           0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
    2967                 : {
    2968               0 :     if (notherbackends > 0 && npreparedxacts > 0)
    2969                 : 
    2970                 :         /*
    2971                 :          * We don't deal with singular versus plural here, since gettext
    2972                 :          * doesn't support multiple plurals in one string.
    2973 ECB             :          */
    2974 UIC           0 :         errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
    2975 ECB             :                   notherbackends, npreparedxacts);
    2976 UIC           0 :     else if (notherbackends > 0)
    2977               0 :         errdetail_plural("There is %d other session using the database.",
    2978                 :                          "There are %d other sessions using the database.",
    2979                 :                          notherbackends,
    2980 ECB             :                          notherbackends);
    2981                 :     else
    2982 LBC           0 :         errdetail_plural("There is %d prepared transaction using the database.",
    2983                 :                          "There are %d prepared transactions using the database.",
    2984 ECB             :                          npreparedxacts,
    2985                 :                          npreparedxacts);
    2986 UIC           0 :     return 0;                   /* just to keep ereport macro happy */
    2987                 : }
    2988                 : 
    2989                 : /*
    2990 ECB             :  * get_database_oid - given a database name, look up the OID
    2991                 :  *
    2992                 :  * If missing_ok is false, throw an error if database name not found.  If
    2993                 :  * true, just return InvalidOid.
    2994                 :  */
    2995                 : Oid
    2996 GIC        3188 : get_database_oid(const char *dbname, bool missing_ok)
    2997                 : {
    2998 EUB             :     Relation    pg_database;
    2999                 :     ScanKeyData entry[1];
    3000                 :     SysScanDesc scan;
    3001                 :     HeapTuple   dbtuple;
    3002                 :     Oid         oid;
    3003 ECB             : 
    3004                 :     /*
    3005                 :      * There's no syscache for pg_database indexed by name, so we must look
    3006                 :      * the hard way.
    3007                 :      */
    3008 GIC        3188 :     pg_database = table_open(DatabaseRelationId, AccessShareLock);
    3009 CBC        3188 :     ScanKeyInit(&entry[0],
    3010                 :                 Anum_pg_database_datname,
    3011                 :                 BTEqualStrategyNumber, F_NAMEEQ,
    3012                 :                 CStringGetDatum(dbname));
    3013 GIC        3188 :     scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
    3014                 :                               NULL, 1, entry);
    3015                 : 
    3016 GBC        3188 :     dbtuple = systable_getnext(scan);
    3017                 : 
    3018 EUB             :     /* We assume that there can be at most one matching tuple */
    3019 GIC        3188 :     if (HeapTupleIsValid(dbtuple))
    3020            2388 :         oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
    3021                 :     else
    3022             800 :         oid = InvalidOid;
    3023                 : 
    3024 GBC        3188 :     systable_endscan(scan);
    3025 GIC        3188 :     table_close(pg_database, AccessShareLock);
    3026 EUB             : 
    3027 GBC        3188 :     if (!OidIsValid(oid) && !missing_ok)
    3028 GIC           3 :         ereport(ERROR,
    3029                 :                 (errcode(ERRCODE_UNDEFINED_DATABASE),
    3030                 :                  errmsg("database \"%s\" does not exist",
    3031                 :                         dbname)));
    3032 EUB             : 
    3033 GIC        3185 :     return oid;
    3034                 : }
    3035                 : 
    3036 EUB             : 
    3037                 : /*
    3038                 :  * get_database_name - given a database OID, look up the name
    3039                 :  *
    3040                 :  * Returns a palloc'd string, or NULL if no such database.
    3041                 :  */
    3042                 : char *
    3043 GIC       41068 : get_database_name(Oid dbid)
    3044                 : {
    3045                 :     HeapTuple   dbtuple;
    3046 ECB             :     char       *result;
    3047                 : 
    3048 GIC       41068 :     dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
    3049           41068 :     if (HeapTupleIsValid(dbtuple))
    3050                 :     {
    3051           40444 :         result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
    3052           40444 :         ReleaseSysCache(dbtuple);
    3053                 :     }
    3054                 :     else
    3055             624 :         result = NULL;
    3056                 : 
    3057           41068 :     return result;
    3058 ECB             : }
    3059                 : 
    3060                 : /*
    3061                 :  * recovery_create_dbdir()
    3062                 :  *
    3063                 :  * During recovery, there's a case where we validly need to recover a missing
    3064                 :  * tablespace directory so that recovery can continue.  This happens when
    3065                 :  * recovery wants to create a database but the holding tablespace has been
    3066                 :  * removed before the server stopped.  Since we expect that the directory will
    3067                 :  * be gone before reaching recovery consistency, and we have no knowledge about
    3068                 :  * the tablespace other than its OID here, we create a real directory under
    3069                 :  * pg_tblspc here instead of restoring the symlink.
    3070                 :  *
    3071                 :  * If only_tblspc is true, then the requested directory must be in pg_tblspc/
    3072                 :  */
    3073                 : static void
    3074 CBC          18 : recovery_create_dbdir(char *path, bool only_tblspc)
    3075 ECB             : {
    3076                 :     struct stat st;
    3077                 : 
    3078 CBC          18 :     Assert(RecoveryInProgress());
    3079                 : 
    3080 GIC          18 :     if (stat(path, &st) == 0)
    3081              18 :         return;
    3082                 : 
    3083 LBC           0 :     if (only_tblspc && strstr(path, "pg_tblspc/") == NULL)
    3084 UIC           0 :         elog(PANIC, "requested to created invalid directory: %s", path);
    3085                 : 
    3086               0 :     if (reachedConsistency && !allow_in_place_tablespaces)
    3087               0 :         ereport(PANIC,
    3088                 :                 errmsg("missing directory \"%s\"", path));
    3089                 : 
    3090               0 :     elog(reachedConsistency ? WARNING : DEBUG1,
    3091                 :          "creating missing directory: %s", path);
    3092                 : 
    3093 LBC           0 :     if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
    3094 UIC           0 :         ereport(PANIC,
    3095                 :                 errmsg("could not create missing directory \"%s\": %m", path));
    3096                 : }
    3097                 : 
    3098 ECB             : 
    3099                 : /*
    3100                 :  * DATABASE resource manager's routines
    3101                 :  */
    3102                 : void
    3103 GIC          30 : dbase_redo(XLogReaderState *record)
    3104                 : {
    3105 CBC          30 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    3106                 : 
    3107 ECB             :     /* Backup blocks are not used in dbase records */
    3108 GIC          30 :     Assert(!XLogRecHasAnyBlockRefs(record));
    3109                 : 
    3110              30 :     if (info == XLOG_DBASE_CREATE_FILE_COPY)
    3111                 :     {
    3112               3 :         xl_dbase_create_file_copy_rec *xlrec =
    3113               3 :         (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
    3114                 :         char       *src_path;
    3115                 :         char       *dst_path;
    3116                 :         char       *parent_path;
    3117                 :         struct stat st;
    3118                 : 
    3119               3 :         src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
    3120               3 :         dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3121                 : 
    3122                 :         /*
    3123                 :          * Our theory for replaying a CREATE is to forcibly drop the target
    3124 ECB             :          * subdirectory if present, then re-copy the source data. This may be
    3125                 :          * more work than needed, but it is simple to implement.
    3126                 :          */
    3127 GIC           3 :         if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
    3128 ECB             :         {
    3129 UIC           0 :             if (!rmtree(dst_path, true))
    3130 ECB             :                 /* If this failed, copydir() below is going to error. */
    3131 LBC           0 :                 ereport(WARNING,
    3132                 :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3133 EUB             :                                 dst_path)));
    3134                 :         }
    3135                 : 
    3136                 :         /*
    3137                 :          * If the parent of the target path doesn't exist, create it now. This
    3138                 :          * enables us to create the target underneath later.
    3139                 :          */
    3140 GIC           3 :         parent_path = pstrdup(dst_path);
    3141 GBC           3 :         get_parent_directory(parent_path);
    3142               3 :         if (stat(parent_path, &st) < 0)
    3143                 :         {
    3144 UIC           0 :             if (errno != ENOENT)
    3145               0 :                 ereport(FATAL,
    3146                 :                         errmsg("could not stat directory \"%s\": %m",
    3147                 :                                dst_path));
    3148                 : 
    3149                 :             /* create the parent directory if needed and valid */
    3150               0 :             recovery_create_dbdir(parent_path, true);
    3151                 :         }
    3152 CBC           3 :         pfree(parent_path);
    3153                 : 
    3154 ECB             :         /*
    3155                 :          * There's a case where the copy source directory is missing for the
    3156                 :          * same reason above.  Create the empty source directory so that
    3157                 :          * copydir below doesn't fail.  The directory will be dropped soon by
    3158                 :          * recovery.
    3159                 :          */
    3160 GIC           3 :         if (stat(src_path, &st) < 0 && errno == ENOENT)
    3161 LBC           0 :             recovery_create_dbdir(src_path, false);
    3162 ECB             : 
    3163                 :         /*
    3164                 :          * Force dirty buffers out to disk, to ensure source database is
    3165                 :          * up-to-date for the copy.
    3166                 :          */
    3167 GIC           3 :         FlushDatabaseBuffers(xlrec->src_db_id);
    3168 ECB             : 
    3169                 :         /* Close all sgmr fds in all backends. */
    3170 GIC           3 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3171                 : 
    3172                 :         /*
    3173                 :          * Copy this subdirectory to the new location
    3174                 :          *
    3175                 :          * We don't need to copy subdirectories
    3176 ECB             :          */
    3177 GIC           3 :         copydir(src_path, dst_path, false);
    3178 EUB             : 
    3179 GIC           3 :         pfree(src_path);
    3180 GBC           3 :         pfree(dst_path);
    3181                 :     }
    3182 GIC          27 :     else if (info == XLOG_DBASE_CREATE_WAL_LOG)
    3183                 :     {
    3184              18 :         xl_dbase_create_wal_log_rec *xlrec =
    3185              18 :         (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
    3186                 :         char       *dbpath;
    3187                 :         char       *parent_path;
    3188                 : 
    3189 CBC          18 :         dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
    3190 ECB             : 
    3191                 :         /* create the parent directory if needed and valid */
    3192 GIC          18 :         parent_path = pstrdup(dbpath);
    3193 GBC          18 :         get_parent_directory(parent_path);
    3194              18 :         recovery_create_dbdir(parent_path, true);
    3195                 : 
    3196                 :         /* Create the database directory with the version file. */
    3197 GIC          18 :         CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
    3198                 :                                 true);
    3199 GBC          18 :         pfree(dbpath);
    3200                 :     }
    3201 CBC           9 :     else if (info == XLOG_DBASE_DROP)
    3202                 :     {
    3203 GIC           9 :         xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
    3204                 :         char       *dst_path;
    3205                 :         int         i;
    3206                 : 
    3207               9 :         if (InHotStandby)
    3208                 :         {
    3209 ECB             :             /*
    3210 EUB             :              * Lock database while we resolve conflicts to ensure that
    3211                 :              * InitPostgres() cannot fully re-execute concurrently. This
    3212                 :              * avoids backends re-connecting automatically to same database,
    3213                 :              * which can happen in some cases.
    3214                 :              *
    3215                 :              * This will lock out walsenders trying to connect to db-specific
    3216 ECB             :              * slots for logical decoding too, so it's safe for us to drop
    3217                 :              * slots.
    3218                 :              */
    3219 CBC           9 :             LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3220 GIC           9 :             ResolveRecoveryConflictWithDatabase(xlrec->db_id);
    3221                 :         }
    3222                 : 
    3223                 :         /* Drop any database-specific replication slots */
    3224               9 :         ReplicationSlotsDropDBSlots(xlrec->db_id);
    3225                 : 
    3226 ECB             :         /* Drop pages for this database that are in the shared buffer cache */
    3227 GIC           9 :         DropDatabaseBuffers(xlrec->db_id);
    3228 ECB             : 
    3229                 :         /* Also, clean out any fsync requests that might be pending in md.c */
    3230 GIC           9 :         ForgetDatabaseSyncRequests(xlrec->db_id);
    3231 ECB             : 
    3232                 :         /* Clean out the xlog relcache too */
    3233 CBC           9 :         XLogDropDatabase(xlrec->db_id);
    3234 ECB             : 
    3235                 :         /* Close all sgmr fds in all backends. */
    3236 GIC           9 :         WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
    3237                 : 
    3238 CBC          18 :         for (i = 0; i < xlrec->ntablespaces; i++)
    3239                 :         {
    3240 GIC           9 :             dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
    3241 ECB             : 
    3242                 :             /* And remove the physical files */
    3243 CBC           9 :             if (!rmtree(dst_path, true))
    3244 UIC           0 :                 ereport(WARNING,
    3245                 :                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
    3246 ECB             :                                 dst_path)));
    3247 GIC           9 :             pfree(dst_path);
    3248 ECB             :         }
    3249                 : 
    3250 CBC           9 :         if (InHotStandby)
    3251                 :         {
    3252 ECB             :             /*
    3253                 :              * Release locks prior to commit. XXX There is a race condition
    3254                 :              * here that may allow backends to reconnect, but the window for
    3255                 :              * this is small because the gap between here and commit is mostly
    3256                 :              * fairly small and it is unlikely that people will be dropping
    3257                 :              * databases that we are trying to connect to anyway.
    3258                 :              */
    3259 GIC           9 :             UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    3260                 :         }
    3261                 :     }
    3262                 :     else
    3263 UIC           0 :         elog(PANIC, "dbase_redo: unknown op code %u", info);
    3264 GIC          30 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a