Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/multixact.h"
30 : #include "access/tableam.h"
31 : #include "access/xact.h"
32 : #include "access/xloginsert.h"
33 : #include "access/xlogrecovery.h"
34 : #include "access/xlogutils.h"
35 : #include "catalog/catalog.h"
36 : #include "catalog/dependency.h"
37 : #include "catalog/indexing.h"
38 : #include "catalog/objectaccess.h"
39 : #include "catalog/pg_authid.h"
40 : #include "catalog/pg_collation.h"
41 : #include "catalog/pg_database.h"
42 : #include "catalog/pg_db_role_setting.h"
43 : #include "catalog/pg_subscription.h"
44 : #include "catalog/pg_tablespace.h"
45 : #include "commands/comment.h"
46 : #include "commands/dbcommands.h"
47 : #include "commands/dbcommands_xlog.h"
48 : #include "commands/defrem.h"
49 : #include "commands/seclabel.h"
50 : #include "commands/tablespace.h"
51 : #include "common/file_perm.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "pgstat.h"
55 : #include "postmaster/bgwriter.h"
56 : #include "replication/slot.h"
57 : #include "storage/copydir.h"
58 : #include "storage/fd.h"
59 : #include "storage/ipc.h"
60 : #include "storage/lmgr.h"
61 : #include "storage/md.h"
62 : #include "storage/procarray.h"
63 : #include "storage/smgr.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/guc.h"
68 : #include "utils/pg_locale.h"
69 : #include "utils/relmapper.h"
70 : #include "utils/snapmgr.h"
71 : #include "utils/syscache.h"
72 :
73 : /*
74 : * Create database strategy.
75 : *
76 : * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
77 : * copied block.
78 : *
79 : * CREATEDB_FILE_COPY will simply perform a file system level copy of the
80 : * database and log a single record for each tablespace copied. To make this
81 : * safe, it also triggers checkpoints before and after the operation.
82 : */
83 : typedef enum CreateDBStrategy
84 : {
85 : CREATEDB_WAL_LOG,
86 : CREATEDB_FILE_COPY
87 : } CreateDBStrategy;
88 :
89 : typedef struct
90 : {
91 : Oid src_dboid; /* source (template) DB */
92 : Oid dest_dboid; /* DB we are trying to create */
93 : CreateDBStrategy strategy; /* create db strategy */
94 : } createdb_failure_params;
95 :
96 : typedef struct
97 : {
98 : Oid dest_dboid; /* DB we are trying to move */
99 : Oid dest_tsoid; /* tablespace we are trying to move to */
100 : } movedb_failure_params;
101 :
102 : /*
103 : * Information about a relation to be copied when creating a database.
104 : */
105 : typedef struct CreateDBRelInfo
106 : {
107 : RelFileLocator rlocator; /* physical relation identifier */
108 : Oid reloid; /* relation oid */
109 : bool permanent; /* relation is permanent or unlogged */
110 : } CreateDBRelInfo;
111 :
112 :
113 : /* non-export function prototypes */
114 : static void createdb_failure_callback(int code, Datum arg);
115 : static void movedb(const char *dbname, const char *tblspcname);
116 : static void movedb_failure_callback(int code, Datum arg);
117 : static bool get_db_info(const char *name, LOCKMODE lockmode,
118 : Oid *dbIdP, Oid *ownerIdP,
119 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
120 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
121 : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbIculocale,
122 : char **dbIcurules,
123 : char *dbLocProvider,
124 : char **dbCollversion);
125 : static void remove_dbtablespaces(Oid db_id);
126 : static bool check_db_file_conflict(Oid db_id);
127 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
128 : static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
129 : Oid dst_tsid);
130 : static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
131 : static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
132 : Oid dbid, char *srcpath,
133 : List *rlocatorlist, Snapshot snapshot);
134 : static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
135 : Oid tbid, Oid dbid,
136 : char *srcpath);
137 : static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
138 : bool isRedo);
139 : static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
140 : Oid src_tsid, Oid dst_tsid);
141 : static void recovery_create_dbdir(char *path, bool only_tblspc);
142 :
143 : /*
144 : * Create a new database using the WAL_LOG strategy.
145 : *
146 : * Each copied block is separately written to the write-ahead log.
147 : */
148 : static void
376 rhaas 149 GIC 184 : CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
376 rhaas 150 ECB : Oid src_tsid, Oid dst_tsid)
151 : {
152 : char *srcpath;
153 : char *dstpath;
277 rhaas 154 GNC 184 : List *rlocatorlist = NULL;
376 rhaas 155 ECB : ListCell *cell;
156 : LockRelId srcrelid;
157 : LockRelId dstrelid;
158 : RelFileLocator srcrlocator;
159 : RelFileLocator dstrlocator;
160 : CreateDBRelInfo *relinfo;
161 :
162 : /* Get source and destination database paths. */
376 rhaas 163 GIC 184 : srcpath = GetDatabasePath(src_dboid, src_tsid);
376 rhaas 164 CBC 184 : dstpath = GetDatabasePath(dst_dboid, dst_tsid);
376 rhaas 165 ECB :
166 : /* Create database directory and write PG_VERSION file. */
376 rhaas 167 GIC 184 : CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
376 rhaas 168 ECB :
169 : /* Copy relmap file from source database to the destination database. */
376 rhaas 170 GIC 184 : RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
376 rhaas 171 ECB :
172 : /* Get list of relfilelocators to copy from the source database. */
277 rhaas 173 GNC 184 : rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
174 184 : Assert(rlocatorlist != NIL);
376 rhaas 175 ECB :
176 : /*
177 : * Database IDs will be the same for all relations so set them before
178 : * entering the loop.
179 : */
376 rhaas 180 GIC 184 : srcrelid.dbId = src_dboid;
376 rhaas 181 CBC 184 : dstrelid.dbId = dst_dboid;
376 rhaas 182 ECB :
183 : /* Loop over our list of relfilelocators and copy each one. */
277 rhaas 184 GNC 41045 : foreach(cell, rlocatorlist)
376 rhaas 185 ECB : {
376 rhaas 186 GIC 40861 : relinfo = lfirst(cell);
277 rhaas 187 GNC 40861 : srcrlocator = relinfo->rlocator;
376 rhaas 188 ECB :
189 : /*
190 : * If the relation is from the source db's default tablespace then we
191 : * need to create it in the destination db's default tablespace.
192 : * Otherwise, we need to create in the same tablespace as it is in the
193 : * source database.
194 : */
277 rhaas 195 GNC 40861 : if (srcrlocator.spcOid == src_tsid)
196 40861 : dstrlocator.spcOid = dst_tsid;
376 rhaas 197 ECB : else
277 rhaas 198 UNC 0 : dstrlocator.spcOid = srcrlocator.spcOid;
376 rhaas 199 EUB :
277 rhaas 200 GNC 40861 : dstrlocator.dbOid = dst_dboid;
201 40861 : dstrlocator.relNumber = srcrlocator.relNumber;
376 rhaas 202 ECB :
203 : /*
204 : * Acquire locks on source and target relations before copying.
205 : *
206 : * We typically do not read relation data into shared_buffers without
207 : * holding a relation lock. It's unclear what could go wrong if we
208 : * skipped it in this case, because nobody can be modifying either the
209 : * source or destination database at this point, and we have locks on
210 : * both databases, too, but let's take the conservative route.
211 : */
376 rhaas 212 GIC 40861 : dstrelid.relId = srcrelid.relId = relinfo->reloid;
376 rhaas 213 CBC 40861 : LockRelationId(&srcrelid, AccessShareLock);
214 40861 : LockRelationId(&dstrelid, AccessShareLock);
376 rhaas 215 ECB :
216 : /* Copy relation storage from source to the destination. */
277 rhaas 217 GNC 40861 : CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
376 rhaas 218 ECB :
219 : /* Release the relation locks. */
376 rhaas 220 GIC 40861 : UnlockRelationId(&srcrelid, AccessShareLock);
376 rhaas 221 CBC 40861 : UnlockRelationId(&dstrelid, AccessShareLock);
376 rhaas 222 ECB : }
223 :
349 alvherre 224 GIC 184 : pfree(srcpath);
349 alvherre 225 CBC 184 : pfree(dstpath);
277 rhaas 226 GNC 184 : list_free_deep(rlocatorlist);
376 rhaas 227 CBC 184 : }
376 rhaas 228 ECB :
229 : /*
230 : * Scan the pg_class table in the source database to identify the relations
231 : * that need to be copied to the destination database.
232 : *
233 : * This is an exception to the usual rule that cross-database access is
234 : * not possible. We can make it work here because we know that there are no
235 : * connections to the source database and (since there can't be prepared
236 : * transactions touching that database) no in-doubt tuples either. This
237 : * means that we don't need to worry about pruning removing anything from
238 : * under us, and we don't need to be too picky about our snapshot either.
239 : * As long as it sees all previously-committed XIDs as committed and all
240 : * aborted XIDs as aborted, we should be fine: nothing else is possible
241 : * here.
242 : *
243 : * We can't rely on the relcache for anything here, because that only knows
244 : * about the database to which we are connected, and can't handle access to
245 : * other databases. That also means we can't rely on the heap scan
246 : * infrastructure, which would be a bad idea anyway since it might try
247 : * to do things like HOT pruning which we definitely can't do safely in
248 : * a database to which we're not even connected.
249 : */
250 : static List *
376 rhaas 251 GIC 184 : ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
376 rhaas 252 ECB : {
253 : RelFileLocator rlocator;
254 : BlockNumber nblocks;
255 : BlockNumber blkno;
256 : Buffer buf;
257 : RelFileNumber relfilenumber;
258 : Page page;
277 rhaas 259 GNC 184 : List *rlocatorlist = NIL;
376 rhaas 260 ECB : LockRelId relid;
261 : Snapshot snapshot;
262 : SMgrRelation smgr;
263 : BufferAccessStrategy bstrategy;
264 :
265 : /* Get pg_class relfilenumber. */
277 rhaas 266 GNC 184 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
267 : RelationRelationId);
268 :
269 : /* Don't read data into shared_buffers without holding a relation lock. */
376 rhaas 270 GIC 184 : relid.dbId = dbid;
376 rhaas 271 CBC 184 : relid.relId = RelationRelationId;
272 184 : LockRelationId(&relid, AccessShareLock);
376 rhaas 273 ECB :
274 : /* Prepare a RelFileLocator for the pg_class relation. */
277 rhaas 275 GNC 184 : rlocator.spcOid = tbid;
276 184 : rlocator.dbOid = dbid;
277 184 : rlocator.relNumber = relfilenumber;
376 rhaas 278 ECB :
240 rhaas 279 GNC 184 : smgr = smgropen(rlocator, InvalidBackendId);
240 rhaas 280 CBC 184 : nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
281 184 : smgrclose(smgr);
376 rhaas 282 ECB :
283 : /* Use a buffer access strategy since this is a bulk read operation. */
376 rhaas 284 GIC 184 : bstrategy = GetAccessStrategy(BAS_BULKREAD);
376 rhaas 285 ECB :
286 : /*
287 : * As explained in the function header comments, we need a snapshot that
288 : * will see all committed transactions as committed, and our transaction
289 : * snapshot - or the active snapshot - might not be new enough for that,
290 : * but the return value of GetLatestSnapshot() should work fine.
291 : */
376 rhaas 292 GIC 184 : snapshot = GetLatestSnapshot();
376 rhaas 293 ECB :
294 : /* Process the relation block by block. */
376 rhaas 295 GIC 2772 : for (blkno = 0; blkno < nblocks; blkno++)
376 rhaas 296 ECB : {
376 rhaas 297 GIC 2588 : CHECK_FOR_INTERRUPTS();
376 rhaas 298 ECB :
277 rhaas 299 GNC 2588 : buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
46 michael 300 ECB : RBM_NORMAL, bstrategy, true);
301 :
376 rhaas 302 GIC 2588 : LockBuffer(buf, BUFFER_LOCK_SHARE);
376 rhaas 303 CBC 2588 : page = BufferGetPage(buf);
304 2588 : if (PageIsNew(page) || PageIsEmpty(page))
376 rhaas 305 ECB : {
376 rhaas 306 UIC 0 : UnlockReleaseBuffer(buf);
376 rhaas 307 UBC 0 : continue;
376 rhaas 308 EUB : }
309 :
310 : /* Append relevant pg_class tuples for current page to rlocatorlist. */
277 rhaas 311 GNC 2588 : rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
312 : srcpath, rlocatorlist,
313 : snapshot);
314 :
376 rhaas 315 GIC 2588 : UnlockReleaseBuffer(buf);
376 rhaas 316 ECB : }
317 :
318 : /* Release relation lock. */
376 rhaas 319 GIC 184 : UnlockRelationId(&relid, AccessShareLock);
376 rhaas 320 ECB :
277 rhaas 321 GNC 184 : return rlocatorlist;
376 rhaas 322 ECB : }
323 :
324 : /*
325 : * Scan one page of the source database's pg_class relation and add relevant
326 : * entries to rlocatorlist. The return value is the updated list.
327 : */
328 : static List *
376 rhaas 329 GIC 2588 : ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
330 : char *srcpath, List *rlocatorlist,
331 : Snapshot snapshot)
332 : {
332 tgl 333 2588 : BlockNumber blkno = BufferGetBlockNumber(buf);
332 tgl 334 ECB : OffsetNumber offnum;
335 : OffsetNumber maxoff;
336 : HeapTupleData tuple;
337 :
376 rhaas 338 GIC 2588 : maxoff = PageGetMaxOffsetNumber(page);
376 rhaas 339 ECB :
340 : /* Loop over offsets. */
376 rhaas 341 GIC 2588 : for (offnum = FirstOffsetNumber;
376 rhaas 342 CBC 131298 : offnum <= maxoff;
343 128710 : offnum = OffsetNumberNext(offnum))
376 rhaas 344 ECB : {
345 : ItemId itemid;
346 :
376 rhaas 347 GIC 128710 : itemid = PageGetItemId(page, offnum);
376 rhaas 348 ECB :
349 : /* Nothing to do if slot is empty or already dead. */
376 rhaas 350 GIC 128710 : if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
376 rhaas 351 CBC 92258 : ItemIdIsRedirected(itemid))
352 52052 : continue;
376 rhaas 353 ECB :
376 rhaas 354 GIC 76658 : Assert(ItemIdIsNormal(itemid));
376 rhaas 355 CBC 76658 : ItemPointerSet(&(tuple.t_self), blkno, offnum);
376 rhaas 356 ECB :
357 : /* Initialize a HeapTupleData structure. */
376 rhaas 358 GIC 76658 : tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
376 rhaas 359 CBC 76658 : tuple.t_len = ItemIdGetLength(itemid);
360 76658 : tuple.t_tableOid = RelationRelationId;
376 rhaas 361 ECB :
362 : /* Skip tuples that are not visible to this snapshot. */
376 rhaas 363 GIC 76658 : if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
376 rhaas 364 ECB : {
365 : CreateDBRelInfo *relinfo;
366 :
367 : /*
368 : * ScanSourceDatabasePgClassTuple is in charge of constructing a
369 : * CreateDBRelInfo object for this tuple, but can also decide that
370 : * this tuple isn't something we need to copy. If we do need to
371 : * copy the relation, add it to the list.
372 : */
376 rhaas 373 GIC 76005 : relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
376 rhaas 374 ECB : srcpath);
376 rhaas 375 GIC 76005 : if (relinfo != NULL)
277 rhaas 376 GNC 40861 : rlocatorlist = lappend(rlocatorlist, relinfo);
376 rhaas 377 ECB : }
378 : }
379 :
277 rhaas 380 GNC 2588 : return rlocatorlist;
376 rhaas 381 ECB : }
382 :
383 : /*
384 : * Decide whether a certain pg_class tuple represents something that
385 : * needs to be copied from the source database to the destination database,
386 : * and if so, construct a CreateDBRelInfo for it.
387 : *
388 : * Visibility checks are handled by the caller, so our job here is just
389 : * to assess the data stored in the tuple.
390 : */
391 : CreateDBRelInfo *
376 rhaas 392 GIC 76005 : ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
376 rhaas 393 ECB : char *srcpath)
394 : {
395 : CreateDBRelInfo *relinfo;
396 : Form_pg_class classForm;
255 rhaas 397 GNC 76005 : RelFileNumber relfilenumber = InvalidRelFileNumber;
376 rhaas 398 ECB :
376 rhaas 399 GIC 76005 : classForm = (Form_pg_class) GETSTRUCT(tuple);
376 rhaas 400 ECB :
401 : /*
402 : * Return NULL if this object does not need to be copied.
403 : *
404 : * Shared objects don't need to be copied, because they are shared.
405 : * Objects without storage can't be copied, because there's nothing to
406 : * copy. Temporary relations don't need to be copied either, because they
407 : * are inaccessible outside of the session that created them, which must
408 : * be gone already, and couldn't connect to a different database if it
409 : * still existed. autovacuum will eventually remove the pg_class entries
410 : * as well.
411 : */
376 rhaas 412 GIC 76005 : if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
376 rhaas 413 CBC 66805 : !RELKIND_HAS_STORAGE(classForm->relkind) ||
414 40861 : classForm->relpersistence == RELPERSISTENCE_TEMP)
415 35144 : return NULL;
376 rhaas 416 ECB :
417 : /*
418 : * If relfilenumber is valid then directly use it. Otherwise, consult the
419 : * relmap.
420 : */
277 rhaas 421 GNC 40861 : if (RelFileNumberIsValid(classForm->relfilenode))
422 37733 : relfilenumber = classForm->relfilenode;
376 rhaas 423 ECB : else
277 rhaas 424 GNC 3128 : relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
425 : classForm->oid);
426 :
427 : /* We must have a valid relfilenumber. */
428 40861 : if (!RelFileNumberIsValid(relfilenumber))
277 rhaas 429 UNC 0 : elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
376 rhaas 430 EUB : classForm->oid);
431 :
432 : /* Prepare a rel info element and add it to the list. */
376 rhaas 433 GIC 40861 : relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
376 rhaas 434 CBC 40861 : if (OidIsValid(classForm->reltablespace))
277 rhaas 435 UNC 0 : relinfo->rlocator.spcOid = classForm->reltablespace;
376 rhaas 436 EUB : else
277 rhaas 437 GNC 40861 : relinfo->rlocator.spcOid = tbid;
376 rhaas 438 ECB :
277 rhaas 439 GNC 40861 : relinfo->rlocator.dbOid = dbid;
440 40861 : relinfo->rlocator.relNumber = relfilenumber;
376 rhaas 441 CBC 40861 : relinfo->reloid = classForm->oid;
376 rhaas 442 ECB :
443 : /* Temporary relations were rejected above. */
376 rhaas 444 GIC 40861 : Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
376 rhaas 445 CBC 40861 : relinfo->permanent =
446 40861 : (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
376 rhaas 447 ECB :
376 rhaas 448 GIC 40861 : return relinfo;
376 rhaas 449 ECB : }
450 :
451 : /*
452 : * Create database directory and write out the PG_VERSION file in the database
453 : * path. If isRedo is true, it's okay for the database directory to exist
454 : * already.
455 : */
456 : static void
376 rhaas 457 GIC 202 : CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
376 rhaas 458 ECB : {
459 : int fd;
460 : int nbytes;
461 : char versionfile[MAXPGPATH];
462 : char buf[16];
463 :
464 : /*
465 : * Prepare version data before starting a critical section.
466 : *
467 : * Note that we don't have to copy this from the source database; there's
468 : * only one legal value.
469 : */
376 rhaas 470 GIC 202 : sprintf(buf, "%s\n", PG_MAJORVERSION);
376 rhaas 471 CBC 202 : nbytes = strlen(PG_MAJORVERSION) + 1;
376 rhaas 472 ECB :
473 : /* If we are not in WAL replay then write the WAL. */
376 rhaas 474 GIC 202 : if (!isRedo)
376 rhaas 475 ECB : {
476 : xl_dbase_create_wal_log_rec xlrec;
477 : XLogRecPtr lsn;
478 :
376 rhaas 479 GIC 184 : START_CRIT_SECTION();
376 rhaas 480 ECB :
376 rhaas 481 GIC 184 : xlrec.db_id = dbid;
376 rhaas 482 CBC 184 : xlrec.tablespace_id = tsid;
376 rhaas 483 ECB :
376 rhaas 484 GIC 184 : XLogBeginInsert();
376 rhaas 485 CBC 184 : XLogRegisterData((char *) (&xlrec),
376 rhaas 486 ECB : sizeof(xl_dbase_create_wal_log_rec));
487 :
376 rhaas 488 GIC 184 : lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
376 rhaas 489 ECB :
490 : /* As always, WAL must hit the disk before the data update does. */
376 rhaas 491 GIC 184 : XLogFlush(lsn);
376 rhaas 492 ECB : }
493 :
494 : /* Create database directory. */
376 rhaas 495 GIC 202 : if (MakePGDirectory(dbpath) < 0)
376 rhaas 496 ECB : {
497 : /* Failure other than already exists or not in WAL replay? */
376 rhaas 498 GIC 8 : if (errno != EEXIST || !isRedo)
376 rhaas 499 LBC 0 : ereport(ERROR,
376 rhaas 500 EUB : (errcode_for_file_access(),
501 : errmsg("could not create directory \"%s\": %m", dbpath)));
502 : }
503 :
504 : /*
505 : * Create PG_VERSION file in the database path. If the file already
506 : * exists and we are in WAL replay then try again to open it in write
507 : * mode.
508 : */
376 rhaas 509 GIC 202 : snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
376 rhaas 510 ECB :
376 rhaas 511 GIC 202 : fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
376 rhaas 512 CBC 202 : if (fd < 0 && errno == EEXIST && isRedo)
513 8 : fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
376 rhaas 514 ECB :
376 rhaas 515 GIC 202 : if (fd < 0)
376 rhaas 516 LBC 0 : ereport(ERROR,
376 rhaas 517 EUB : (errcode_for_file_access(),
518 : errmsg("could not create file \"%s\": %m", versionfile)));
519 :
520 : /* Write PG_MAJORVERSION in the PG_VERSION file. */
376 rhaas 521 GIC 202 : pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
376 rhaas 522 CBC 202 : errno = 0;
523 202 : if ((int) write(fd, buf, nbytes) != nbytes)
376 rhaas 524 ECB : {
525 : /* If write didn't set errno, assume problem is no disk space. */
376 rhaas 526 UIC 0 : if (errno == 0)
376 rhaas 527 UBC 0 : errno = ENOSPC;
528 0 : ereport(ERROR,
376 rhaas 529 EUB : (errcode_for_file_access(),
530 : errmsg("could not write to file \"%s\": %m", versionfile)));
531 : }
376 rhaas 532 GIC 202 : pgstat_report_wait_end();
376 rhaas 533 ECB :
534 : /* Close the version file. */
376 rhaas 535 GIC 202 : CloseTransientFile(fd);
376 rhaas 536 ECB :
537 : /* Critical section done. */
376 rhaas 538 GIC 202 : if (!isRedo)
376 rhaas 539 CBC 184 : END_CRIT_SECTION();
540 202 : }
376 rhaas 541 ECB :
542 : /*
543 : * Create a new database using the FILE_COPY strategy.
544 : *
545 : * Copy each tablespace at the filesystem level, and log a single WAL record
546 : * for each tablespace copied. This requires a checkpoint before and after the
547 : * copy, which may be expensive, but it does greatly reduce WAL generation
548 : * if the copied database is large.
549 : */
550 : static void
376 rhaas 551 GIC 613 : CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
376 rhaas 552 ECB : Oid dst_tsid)
553 : {
554 : TableScanDesc scan;
555 : Relation rel;
556 : HeapTuple tuple;
557 :
558 : /*
559 : * Force a checkpoint before starting the copy. This will force all dirty
560 : * buffers, including those of unlogged tables, out to disk, to ensure
561 : * source database is up-to-date on disk for the copy.
562 : * FlushDatabaseBuffers() would suffice for that, but we also want to
563 : * process any pending unlink requests. Otherwise, if a checkpoint
564 : * happened while we're copying files, a file might be deleted just when
565 : * we're about to copy it, causing the lstat() call in copydir() to fail
566 : * with ENOENT.
567 : */
376 rhaas 568 GIC 613 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
376 rhaas 569 ECB : CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
570 :
571 : /*
572 : * Iterate through all tablespaces of the template database, and copy each
573 : * one to the new database.
574 : */
376 rhaas 575 GIC 613 : rel = table_open(TableSpaceRelationId, AccessShareLock);
376 rhaas 576 CBC 613 : scan = table_beginscan_catalog(rel, 0, NULL);
577 1857 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
376 rhaas 578 ECB : {
376 rhaas 579 GIC 1244 : Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
376 rhaas 580 CBC 1244 : Oid srctablespace = spaceform->oid;
376 rhaas 581 ECB : Oid dsttablespace;
582 : char *srcpath;
583 : char *dstpath;
584 : struct stat st;
585 :
586 : /* No need to copy global tablespace */
376 rhaas 587 GIC 1244 : if (srctablespace == GLOBALTABLESPACE_OID)
376 rhaas 588 CBC 631 : continue;
376 rhaas 589 ECB :
376 rhaas 590 GIC 631 : srcpath = GetDatabasePath(src_dboid, srctablespace);
376 rhaas 591 ECB :
376 rhaas 592 GIC 1244 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
376 rhaas 593 CBC 613 : directory_is_empty(srcpath))
376 rhaas 594 ECB : {
595 : /* Assume we can ignore it */
376 rhaas 596 GIC 18 : pfree(srcpath);
376 rhaas 597 CBC 18 : continue;
376 rhaas 598 ECB : }
599 :
376 rhaas 600 GIC 613 : if (srctablespace == src_tsid)
376 rhaas 601 CBC 613 : dsttablespace = dst_tsid;
376 rhaas 602 ECB : else
376 rhaas 603 UIC 0 : dsttablespace = srctablespace;
376 rhaas 604 EUB :
376 rhaas 605 GIC 613 : dstpath = GetDatabasePath(dst_dboid, dsttablespace);
376 rhaas 606 ECB :
607 : /*
608 : * Copy this subdirectory to the new location
609 : *
610 : * We don't need to copy subdirectories
611 : */
376 rhaas 612 GIC 613 : copydir(srcpath, dstpath, false);
376 rhaas 613 ECB :
614 : /* Record the filesystem change in XLOG */
615 : {
616 : xl_dbase_create_file_copy_rec xlrec;
617 :
376 rhaas 618 GIC 613 : xlrec.db_id = dst_dboid;
376 rhaas 619 CBC 613 : xlrec.tablespace_id = dsttablespace;
620 613 : xlrec.src_db_id = src_dboid;
621 613 : xlrec.src_tablespace_id = srctablespace;
376 rhaas 622 ECB :
376 rhaas 623 GIC 613 : XLogBeginInsert();
376 rhaas 624 CBC 613 : XLogRegisterData((char *) &xlrec,
376 rhaas 625 ECB : sizeof(xl_dbase_create_file_copy_rec));
626 :
376 rhaas 627 GIC 613 : (void) XLogInsert(RM_DBASE_ID,
376 rhaas 628 ECB : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
629 : }
349 alvherre 630 GIC 613 : pfree(srcpath);
349 alvherre 631 CBC 613 : pfree(dstpath);
376 rhaas 632 ECB : }
376 rhaas 633 GIC 613 : table_endscan(scan);
376 rhaas 634 CBC 613 : table_close(rel, AccessShareLock);
376 rhaas 635 ECB :
636 : /*
637 : * We force a checkpoint before committing. This effectively means that
638 : * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
639 : * replayed (at least not in ordinary crash recovery; we still have to
640 : * make the XLOG entry for the benefit of PITR operations). This avoids
641 : * two nasty scenarios:
642 : *
643 : * #1: When PITR is off, we don't XLOG the contents of newly created
644 : * indexes; therefore the drop-and-recreate-whole-directory behavior of
645 : * DBASE_CREATE replay would lose such indexes.
646 : *
647 : * #2: Since we have to recopy the source database during DBASE_CREATE
648 : * replay, we run the risk of copying changes in it that were committed
649 : * after the original CREATE DATABASE command but before the system crash
650 : * that led to the replay. This is at least unexpected and at worst could
651 : * lead to inconsistencies, eg duplicate table names.
652 : *
653 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
654 : *
655 : * In PITR replay, the first of these isn't an issue, and the second is
656 : * only a risk if the CREATE DATABASE and subsequent template database
657 : * change both occur while a base backup is being taken. There doesn't
658 : * seem to be much we can do about that except document it as a
659 : * limitation.
660 : *
661 : * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
662 : * strategy that avoids these problems.
663 : */
376 rhaas 664 GIC 613 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
376 rhaas 665 CBC 613 : }
8487 peter_e 666 ECB :
667 : /*
668 : * CREATE DATABASE
669 : */
670 : Oid
2406 peter_e 671 GIC 808 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
9770 scrappy 672 ECB : {
673 : Oid src_dboid;
674 : Oid src_owner;
1290 andres 675 GIC 808 : int src_encoding = -1;
1290 andres 676 CBC 808 : char *src_collate = NULL;
677 808 : char *src_ctype = NULL;
388 peter 678 808 : char *src_iculocale = NULL;
32 peter 679 GNC 808 : char *src_icurules = NULL;
387 andres 680 CBC 808 : char src_locprovider = '\0';
419 peter 681 808 : char *src_collversion = NULL;
8181 tgl 682 ECB : bool src_istemplate;
6602 683 : bool src_allowconn;
1290 andres 684 GIC 808 : TransactionId src_frozenxid = InvalidTransactionId;
685 808 : MultiXactId src_minmxid = InvalidMultiXactId;
6869 tgl 686 ECB : Oid src_deftablespace;
6459 687 : volatile Oid dst_deftablespace;
688 : Relation pg_database_rel;
689 : HeapTuple tuple;
267 peter 690 GNC 808 : Datum new_record[Natts_pg_database] = {0};
691 808 : bool new_record_nulls[Natts_pg_database] = {0};
440 rhaas 692 CBC 808 : Oid dboid = InvalidOid;
6184 tgl 693 ECB : Oid datdba;
6892 neilc 694 : ListCell *option;
6797 bruce 695 GIC 808 : DefElem *dtablespacename = NULL;
7600 696 808 : DefElem *downer = NULL;
7522 bruce 697 CBC 808 : DefElem *dtemplate = NULL;
698 808 : DefElem *dencoding = NULL;
1356 peter 699 808 : DefElem *dlocale = NULL;
5311 heikki.linnakangas 700 808 : DefElem *dcollate = NULL;
701 808 : DefElem *dctype = NULL;
388 peter 702 808 : DefElem *diculocale = NULL;
32 peter 703 GNC 808 : DefElem *dicurules = NULL;
332 tgl 704 CBC 808 : DefElem *dlocprovider = NULL;
3204 705 808 : DefElem *distemplate = NULL;
706 808 : DefElem *dallowconnections = NULL;
6461 707 808 : DefElem *dconnlimit = NULL;
419 peter 708 808 : DefElem *dcollversion = NULL;
376 rhaas 709 808 : DefElem *dstrategy = NULL;
7600 bruce 710 808 : char *dbname = stmt->dbname;
711 808 : char *dbowner = NULL;
6501 tgl 712 808 : const char *dbtemplate = NULL;
5311 heikki.linnakangas 713 808 : char *dbcollate = NULL;
714 808 : char *dbctype = NULL;
388 peter 715 808 : char *dbiculocale = NULL;
32 peter 716 GNC 808 : char *dbicurules = NULL;
388 peter 717 CBC 808 : char dblocprovider = '\0';
4032 tgl 718 ECB : char *canonname;
6184 tgl 719 CBC 808 : int encoding = -1;
3204 720 808 : bool dbistemplate = false;
721 808 : bool dballowconnections = true;
6184 tgl 722 GIC 808 : int dbconnlimit = -1;
419 peter 723 CBC 808 : char *dbcollversion = NULL;
5361 tgl 724 ECB : int notherbackends;
725 : int npreparedxacts;
376 rhaas 726 CBC 808 : CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
5471 tgl 727 ECB : createdb_failure_params fparms;
728 :
729 : /* Extract options from the statement node tree */
7600 bruce 730 CBC 2880 : foreach(option, stmt->options)
731 : {
7600 bruce 732 GIC 2072 : DefElem *defel = (DefElem *) lfirst(option);
733 :
6869 tgl 734 CBC 2072 : if (strcmp(defel->defname, "tablespace") == 0)
735 : {
736 8 : if (dtablespacename)
633 dean.a.rasheed 737 UIC 0 : errorConflictingDefElem(defel, pstate);
6869 tgl 738 CBC 8 : dtablespacename = defel;
739 : }
740 2064 : else if (strcmp(defel->defname, "owner") == 0)
7600 bruce 741 EUB : {
6869 tgl 742 CBC 1 : if (downer)
633 dean.a.rasheed 743 UIC 0 : errorConflictingDefElem(defel, pstate);
6869 tgl 744 CBC 1 : downer = defel;
745 : }
7600 bruce 746 2063 : else if (strcmp(defel->defname, "template") == 0)
7600 bruce 747 EUB : {
7600 bruce 748 CBC 119 : if (dtemplate)
633 dean.a.rasheed 749 UIC 0 : errorConflictingDefElem(defel, pstate);
7600 bruce 750 CBC 119 : dtemplate = defel;
751 : }
752 1944 : else if (strcmp(defel->defname, "encoding") == 0)
7600 bruce 753 EUB : {
7600 bruce 754 CBC 23 : if (dencoding)
633 dean.a.rasheed 755 UIC 0 : errorConflictingDefElem(defel, pstate);
7600 bruce 756 CBC 23 : dencoding = defel;
757 : }
1356 peter 758 1921 : else if (strcmp(defel->defname, "locale") == 0)
1356 peter 759 EUB : {
1356 peter 760 CBC 17 : if (dlocale)
633 dean.a.rasheed 761 UIC 0 : errorConflictingDefElem(defel, pstate);
1356 peter 762 CBC 17 : dlocale = defel;
763 : }
5116 heikki.linnakangas 764 1904 : else if (strcmp(defel->defname, "lc_collate") == 0)
5311 heikki.linnakangas 765 EUB : {
5311 heikki.linnakangas 766 CBC 7 : if (dcollate)
633 dean.a.rasheed 767 UIC 0 : errorConflictingDefElem(defel, pstate);
5311 heikki.linnakangas 768 CBC 7 : dcollate = defel;
769 : }
5116 770 1897 : else if (strcmp(defel->defname, "lc_ctype") == 0)
5311 heikki.linnakangas 771 EUB : {
5311 heikki.linnakangas 772 CBC 7 : if (dctype)
633 dean.a.rasheed 773 UIC 0 : errorConflictingDefElem(defel, pstate);
5311 heikki.linnakangas 774 CBC 7 : dctype = defel;
775 : }
388 peter 776 1890 : else if (strcmp(defel->defname, "icu_locale") == 0)
388 peter 777 EUB : {
388 peter 778 CBC 19 : if (diculocale)
388 peter 779 UIC 0 : errorConflictingDefElem(defel, pstate);
388 peter 780 CBC 19 : diculocale = defel;
781 : }
32 peter 782 GNC 1871 : else if (strcmp(defel->defname, "icu_rules") == 0)
783 : {
32 peter 784 UNC 0 : if (dicurules)
785 0 : errorConflictingDefElem(defel, pstate);
786 0 : dicurules = defel;
787 : }
388 peter 788 CBC 1871 : else if (strcmp(defel->defname, "locale_provider") == 0)
388 peter 789 EUB : {
388 peter 790 CBC 22 : if (dlocprovider)
388 peter 791 UIC 0 : errorConflictingDefElem(defel, pstate);
388 peter 792 CBC 22 : dlocprovider = defel;
793 : }
3204 tgl 794 GBC 1849 : else if (strcmp(defel->defname, "is_template") == 0)
3204 tgl 795 EUB : {
3204 tgl 796 GBC 305 : if (distemplate)
633 dean.a.rasheed 797 UIC 0 : errorConflictingDefElem(defel, pstate);
3204 tgl 798 CBC 305 : distemplate = defel;
799 : }
800 1544 : else if (strcmp(defel->defname, "allow_connections") == 0)
3204 tgl 801 EUB : {
3204 tgl 802 CBC 303 : if (dallowconnections)
633 dean.a.rasheed 803 UIC 0 : errorConflictingDefElem(defel, pstate);
3204 tgl 804 CBC 303 : dallowconnections = defel;
805 : }
806 1241 : else if (strcmp(defel->defname, "connection_limit") == 0)
6461 tgl 807 EUB : {
6461 tgl 808 LBC 0 : if (dconnlimit)
633 dean.a.rasheed 809 UIC 0 : errorConflictingDefElem(defel, pstate);
6461 tgl 810 LBC 0 : dconnlimit = defel;
811 : }
419 peter 812 CBC 1241 : else if (strcmp(defel->defname, "collation_version") == 0)
419 peter 813 EUB : {
419 peter 814 CBC 6 : if (dcollversion)
419 peter 815 UIC 0 : errorConflictingDefElem(defel, pstate);
419 peter 816 CBC 6 : dcollversion = defel;
817 : }
6869 tgl 818 GBC 1235 : else if (strcmp(defel->defname, "location") == 0)
6869 tgl 819 EUB : {
6869 tgl 820 UBC 0 : ereport(WARNING,
821 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
6869 tgl 822 ECB : errmsg("LOCATION is not supported anymore"),
823 : errhint("Consider using tablespaces instead."),
2406 peter_e 824 : parser_errposition(pstate, defel->location)));
6869 tgl 825 EUB : }
440 rhaas 826 CBC 1235 : else if (strcmp(defel->defname, "oid") == 0)
827 : {
156 tgl 828 615 : dboid = defGetObjectId(defel);
829 :
440 rhaas 830 EUB : /*
831 : * We don't normally permit new databases to be created with
832 : * system-assigned OIDs. pg_upgrade tries to preserve database
833 : * OIDs, so we can't allow any database to be created with an OID
834 : * that might be in use in a freshly-initialized cluster created
835 : * by some future version. We assume all such OIDs will be from
332 tgl 836 ECB : * the system-managed OID range.
837 : *
440 rhaas 838 : * As an exception, however, we permit any OID to be assigned when
839 : * allow_system_table_mods=on (so that initdb can assign system
840 : * OIDs to template0 and postgres) or when performing a binary
841 : * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
842 : * in the source cluster).
843 : */
440 rhaas 844 GIC 615 : if (dboid < FirstNormalObjectId &&
845 608 : !allowSystemTableMods && !IsBinaryUpgrade)
440 rhaas 846 UIC 0 : ereport(ERROR,
847 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
848 : errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
849 : }
376 rhaas 850 GIC 620 : else if (strcmp(defel->defname, "strategy") == 0)
851 : {
852 620 : if (dstrategy)
376 rhaas 853 UIC 0 : errorConflictingDefElem(defel, pstate);
376 rhaas 854 CBC 620 : dstrategy = defel;
376 rhaas 855 ECB : }
7600 bruce 856 EUB : else
3204 tgl 857 UIC 0 : ereport(ERROR,
858 : (errcode(ERRCODE_SYNTAX_ERROR),
859 : errmsg("option \"%s\" not recognized", defel->defname),
2406 peter_e 860 ECB : parser_errposition(pstate, defel->location)));
861 : }
7600 bruce 862 :
7463 tgl 863 GBC 808 : if (downer && downer->arg)
3204 tgl 864 CBC 1 : dbowner = defGetString(downer);
7463 tgl 865 GIC 808 : if (dtemplate && dtemplate->arg)
3204 866 119 : dbtemplate = defGetString(dtemplate);
7463 tgl 867 GBC 808 : if (dencoding && dencoding->arg)
868 : {
869 : const char *encoding_name;
870 :
7463 tgl 871 GIC 23 : if (IsA(dencoding->arg, Integer))
872 : {
3204 tgl 873 LBC 0 : encoding = defGetInt32(dencoding);
7463 874 0 : encoding_name = pg_encoding_to_char(encoding);
875 0 : if (strcmp(encoding_name, "") == 0 ||
876 0 : pg_valid_server_encoding(encoding_name) < 0)
7205 877 0 : ereport(ERROR,
878 : (errcode(ERRCODE_UNDEFINED_OBJECT),
879 : errmsg("%d is not a valid encoding code",
880 : encoding),
2406 peter_e 881 ECB : parser_errposition(pstate, dencoding->location)));
882 : }
3204 tgl 883 EUB : else
7463 884 : {
3204 tgl 885 GBC 23 : encoding_name = defGetString(dencoding);
5657 886 23 : encoding = pg_valid_server_encoding(encoding_name);
887 23 : if (encoding < 0)
7205 tgl 888 UIC 0 : ereport(ERROR,
889 : (errcode(ERRCODE_UNDEFINED_OBJECT),
890 : errmsg("%s is not a valid encoding name",
891 : encoding_name),
892 : parser_errposition(pstate, dencoding->location)));
893 : }
894 : }
1356 peter 895 CBC 808 : if (dlocale && dlocale->arg)
1356 peter 896 ECB : {
1356 peter 897 CBC 17 : dbcollate = defGetString(dlocale);
1356 peter 898 GBC 17 : dbctype = defGetString(dlocale);
899 : }
5311 heikki.linnakangas 900 GIC 808 : if (dcollate && dcollate->arg)
3204 tgl 901 7 : dbcollate = defGetString(dcollate);
5311 heikki.linnakangas 902 808 : if (dctype && dctype->arg)
3204 tgl 903 7 : dbctype = defGetString(dctype);
388 peter 904 808 : if (diculocale && diculocale->arg)
388 peter 905 CBC 19 : dbiculocale = defGetString(diculocale);
32 peter 906 GNC 808 : if (dicurules && dicurules->arg)
32 peter 907 UNC 0 : dbicurules = defGetString(dicurules);
388 peter 908 GIC 808 : if (dlocprovider && dlocprovider->arg)
388 peter 909 ECB : {
388 peter 910 CBC 22 : char *locproviderstr = defGetString(dlocprovider);
911 :
912 22 : if (pg_strcasecmp(locproviderstr, "icu") == 0)
913 20 : dblocprovider = COLLPROVIDER_ICU;
914 2 : else if (pg_strcasecmp(locproviderstr, "libc") == 0)
915 1 : dblocprovider = COLLPROVIDER_LIBC;
388 peter 916 ECB : else
388 peter 917 CBC 1 : ereport(ERROR,
388 peter 918 ECB : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
388 peter 919 EUB : errmsg("unrecognized locale provider: %s",
388 peter 920 ECB : locproviderstr)));
921 : }
3204 tgl 922 CBC 807 : if (distemplate && distemplate->arg)
3204 tgl 923 GIC 305 : dbistemplate = defGetBoolean(distemplate);
3204 tgl 924 CBC 807 : if (dallowconnections && dallowconnections->arg)
925 303 : dballowconnections = defGetBoolean(dallowconnections);
6461 926 807 : if (dconnlimit && dconnlimit->arg)
5182 heikki.linnakangas 927 ECB : {
3204 tgl 928 UIC 0 : dbconnlimit = defGetInt32(dconnlimit);
5182 heikki.linnakangas 929 LBC 0 : if (dbconnlimit < -1)
5182 heikki.linnakangas 930 UIC 0 : ereport(ERROR,
931 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
932 : errmsg("invalid connection limit: %d", dbconnlimit)));
933 : }
419 peter 934 CBC 807 : if (dcollversion)
935 6 : dbcollversion = defGetString(dcollversion);
9345 bruce 936 ECB :
6494 tgl 937 : /* obtain OID of proposed owner */
7714 tgl 938 CBC 807 : if (dbowner)
4630 rhaas 939 GIC 1 : datdba = get_role_oid(dbowner, false);
7714 tgl 940 EUB : else
7714 tgl 941 GBC 806 : datdba = GetUserId();
7714 tgl 942 EUB :
943 : /*
944 : * To create a database, must have createdb privilege and must be able to
945 : * become the target role (this does not imply that the target role itself
6385 bruce 946 ECB : * must have createdb privilege). The latter provision guards against
3260 947 : * "giveaway" attacks. Note that a superuser will always have both of
948 : * these privileges a fortiori.
949 : */
3029 alvherre 950 CBC 807 : if (!have_createdb_privilege())
6478 tgl 951 3 : ereport(ERROR,
952 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6478 tgl 953 ECB : errmsg("permission denied to create database")));
954 :
142 rhaas 955 GNC 804 : check_can_set_role(GetUserId(), datdba);
956 :
957 : /*
958 : * Lookup database (template) to be cloned, and obtain share lock on it.
959 : * ShareLock allows two CREATE DATABASEs to work from the same template
960 : * concurrently, while ensuring no one is busy dropping it in parallel
961 : * (which would be Very Bad since we'd likely get an incomplete copy
6184 tgl 962 ECB : * without knowing it). This also prevents any new connections from being
963 : * made to the source until we finish copying it, so we can be sure it
964 : * won't change underneath us.
965 : */
8181 tgl 966 GIC 804 : if (!dbtemplate)
2118 tgl 967 CBC 686 : dbtemplate = "template1"; /* Default template database name */
968 :
6184 tgl 969 GIC 804 : if (!get_db_info(dbtemplate, ShareLock,
970 : &src_dboid, &src_owner, &src_encoding,
971 : &src_istemplate, &src_allowconn,
972 : &src_frozenxid, &src_minmxid, &src_deftablespace,
973 : &src_collate, &src_ctype, &src_iculocale, &src_icurules, &src_locprovider,
974 : &src_collversion))
7205 tgl 975 UIC 0 : ereport(ERROR,
976 : (errcode(ERRCODE_UNDEFINED_DATABASE),
977 : errmsg("template database \"%s\" does not exist",
6184 tgl 978 ECB : dbtemplate)));
8053 bruce 979 :
980 : /*
981 : * Permission check: to copy a DB that's not marked datistemplate, you
982 : * must be superuser or the owner thereof.
983 : */
8181 tgl 984 GIC 804 : if (!src_istemplate)
985 : {
147 peter 986 GNC 6 : if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
7205 tgl 987 UBC 0 : ereport(ERROR,
988 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
989 : errmsg("permission denied to copy database \"%s\"",
990 : dbtemplate)));
991 : }
992 :
993 : /* Validate the database creation strategy. */
376 rhaas 994 GIC 804 : if (dstrategy && dstrategy->arg)
995 : {
376 rhaas 996 ECB : char *strategy;
997 :
376 rhaas 998 CBC 620 : strategy = defGetString(dstrategy);
376 rhaas 999 GBC 620 : if (strcmp(strategy, "wal_log") == 0)
376 rhaas 1000 GIC 6 : dbstrategy = CREATEDB_WAL_LOG;
1001 614 : else if (strcmp(strategy, "file_copy") == 0)
1002 613 : dbstrategy = CREATEDB_FILE_COPY;
1003 : else
1004 1 : ereport(ERROR,
1005 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
197 peter 1006 ECB : errmsg("invalid create database strategy \"%s\"", strategy),
1007 : errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
1008 : }
1009 :
5311 heikki.linnakangas 1010 : /* If encoding or locales are defaulted, use source's setting */
8181 tgl 1011 CBC 803 : if (encoding < 0)
1012 780 : encoding = src_encoding;
5311 heikki.linnakangas 1013 803 : if (dbcollate == NULL)
1014 779 : dbcollate = src_collate;
5311 heikki.linnakangas 1015 GIC 803 : if (dbctype == NULL)
5311 heikki.linnakangas 1016 CBC 779 : dbctype = src_ctype;
388 peter 1017 GIC 803 : if (dblocprovider == '\0')
1018 782 : dblocprovider = src_locprovider;
230 1019 803 : if (dbiculocale == NULL && dblocprovider == COLLPROVIDER_ICU)
1020 768 : dbiculocale = src_iculocale;
32 peter 1021 GNC 803 : if (dbicurules == NULL && dblocprovider == COLLPROVIDER_ICU)
1022 787 : dbicurules = src_icurules;
1023 :
1024 : /* Some encodings are client only */
7836 bruce 1025 CBC 803 : if (!PG_VALID_BE_ENCODING(encoding))
7205 tgl 1026 LBC 0 : ereport(ERROR,
7205 tgl 1027 ECB : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
7136 peter_e 1028 : errmsg("invalid server encoding %d", encoding)));
7836 bruce 1029 :
4032 tgl 1030 : /* Check that the chosen locales are valid, and get canonical spellings */
4032 tgl 1031 CBC 803 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
5311 heikki.linnakangas 1032 1 : ereport(ERROR,
5311 heikki.linnakangas 1033 ECB : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
4013 peter_e 1034 : errmsg("invalid locale name: \"%s\"", dbcollate)));
4032 tgl 1035 CBC 802 : dbcollate = canonname;
1036 802 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
5311 heikki.linnakangas 1037 GIC 1 : ereport(ERROR,
1038 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
4013 peter_e 1039 ECB : errmsg("invalid locale name: \"%s\"", dbctype)));
4032 tgl 1040 GBC 801 : dbctype = canonname;
1041 :
4439 peter_e 1042 GIC 801 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
1043 :
388 peter 1044 801 : if (dblocprovider == COLLPROVIDER_ICU)
388 peter 1045 ECB : {
205 peter 1046 CBC 787 : if (!(is_encoding_supported_by_icu(encoding)))
205 peter 1047 GIC 1 : ereport(ERROR,
1048 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
205 peter 1049 ECB : errmsg("encoding \"%s\" is not supported with ICU provider",
1050 : pg_encoding_to_char(encoding))));
1051 :
1052 : /*
1053 : * This would happen if template0 uses the libc provider but the new
388 1054 : * database uses icu.
1055 : */
388 peter 1056 CBC 786 : if (!dbiculocale)
388 peter 1057 GIC 1 : ereport(ERROR,
388 peter 1058 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1059 : errmsg("ICU locale must be specified")));
1060 :
1061 : /*
1062 : * During binary upgrade, or when the locale came from the template
1063 : * database, preserve locale string. Otherwise, canonicalize to a
1064 : * language tag.
1065 : */
5 jdavis 1066 GNC 785 : if (!IsBinaryUpgrade && dbiculocale != src_iculocale)
1067 : {
1068 13 : char *langtag = icu_language_tag(dbiculocale,
1069 : icu_validation_level);
1070 :
1071 13 : if (langtag && strcmp(dbiculocale, langtag) != 0)
1072 : {
1073 2 : ereport(NOTICE,
1074 : (errmsg("using standard form \"%s\" for locale \"%s\"",
1075 : langtag, dbiculocale)));
1076 :
1077 2 : dbiculocale = langtag;
1078 : }
1079 : }
1080 :
12 1081 785 : icu_validate_locale(dbiculocale);
1082 : }
1083 : else
1084 : {
200 peter 1085 GIC 14 : if (dbiculocale)
200 peter 1086 UIC 0 : ereport(ERROR,
1087 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1088 : errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1089 :
31 peter 1090 GNC 14 : if (dbicurules)
31 peter 1091 UNC 0 : ereport(ERROR,
1092 : (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1093 : errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1094 : }
388 peter 1095 ECB :
5311 heikki.linnakangas 1096 : /*
1097 : * Check that the new encoding and locale settings match the source
1098 : * database. We insist on this because we simply copy the source data ---
1099 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
1100 : * according to the source locale would be wrong.
1101 : *
1102 : * However, we assume that template0 doesn't contain any non-ASCII data
1103 : * nor any indexes that depend on collation or ctype, so template0 can be
1104 : * used as template for creating a database with any encoding or locale.
1105 : */
5311 heikki.linnakangas 1106 GIC 798 : if (strcmp(dbtemplate, "template0") != 0)
5311 heikki.linnakangas 1107 ECB : {
5086 tgl 1108 GIC 692 : if (encoding != src_encoding)
5086 tgl 1109 UIC 0 : ereport(ERROR,
5086 tgl 1110 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1111 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1112 : pg_encoding_to_char(encoding),
1113 : pg_encoding_to_char(src_encoding)),
1114 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
1115 :
5099 tgl 1116 CBC 692 : if (strcmp(dbcollate, src_collate) != 0)
5311 heikki.linnakangas 1117 UIC 0 : ereport(ERROR,
1118 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1119 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
5086 tgl 1120 ECB : dbcollate, src_collate),
1121 : errhint("Use the same collation as in the template database, or use template0 as template.")));
1122 :
5099 tgl 1123 GIC 692 : if (strcmp(dbctype, src_ctype) != 0)
5311 heikki.linnakangas 1124 LBC 0 : ereport(ERROR,
5086 tgl 1125 EUB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1126 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1127 : dbctype, src_ctype),
1128 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
388 peter 1129 ECB :
388 peter 1130 GBC 692 : if (dblocprovider != src_locprovider)
388 peter 1131 UIC 0 : ereport(ERROR,
1132 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1133 : errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1134 : collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1135 : errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1136 :
388 peter 1137 GIC 692 : if (dblocprovider == COLLPROVIDER_ICU)
1138 : {
1139 : char *val1;
1140 : char *val2;
1141 :
1142 680 : Assert(dbiculocale);
1143 680 : Assert(src_iculocale);
1144 680 : if (strcmp(dbiculocale, src_iculocale) != 0)
388 peter 1145 UIC 0 : ereport(ERROR,
1146 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1147 : errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
388 peter 1148 ECB : dbiculocale, src_iculocale),
1149 : errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1150 :
32 peter 1151 GNC 680 : val1 = dbicurules;
1152 680 : if (!val1)
1153 680 : val1 = "";
1154 680 : val2 = src_icurules;
1155 680 : if (!val2)
1156 680 : val2 = "";
1157 680 : if (strcmp(val1, val2) != 0)
32 peter 1158 UNC 0 : ereport(ERROR,
1159 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1160 : errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1161 : val1, val2),
1162 : errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
388 peter 1163 ECB : }
5311 heikki.linnakangas 1164 EUB : }
1165 :
1166 : /*
1167 : * If we got a collation version for the template database, check that it
1168 : * matches the actual OS collation version. Otherwise error; the user
1169 : * needs to fix the template database first. Don't complain if a
1170 : * collation version was specified explicitly as a statement option; that
419 peter 1171 ECB : * is used by pg_upgrade to reproduce the old state exactly.
419 peter 1172 EUB : *
1173 : * (If the template database has no collation version, then either the
1174 : * platform/provider does not support collation versioning, or it's
1175 : * template0, for which we stipulate that it does not contain
1176 : * collation-using objects.)
1177 : */
419 peter 1178 CBC 798 : if (src_collversion && !dcollversion)
419 peter 1179 EUB : {
1180 : char *actual_versionstr;
1181 :
388 peter 1182 GIC 385 : actual_versionstr = get_collation_actual_version(dblocprovider, dblocprovider == COLLPROVIDER_ICU ? dbiculocale : dbcollate);
419 1183 385 : if (!actual_versionstr)
419 peter 1184 UIC 0 : ereport(ERROR,
419 peter 1185 ECB : (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
419 peter 1186 EUB : dbtemplate)));
1187 :
419 peter 1188 GIC 385 : if (strcmp(actual_versionstr, src_collversion) != 0)
419 peter 1189 UIC 0 : ereport(ERROR,
1190 : (errmsg("template database \"%s\" has a collation version mismatch",
1191 : dbtemplate),
419 peter 1192 ECB : errdetail("The template database was created using collation version %s, "
1193 : "but the operating system provides version %s.",
1194 : src_collversion, actual_versionstr),
1195 : errhint("Rebuild all objects in the template database that use the default collation and run "
1196 : "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1197 : "or build PostgreSQL with the right library version.",
1198 : quote_identifier(dbtemplate))));
1199 : }
419 peter 1200 EUB :
419 peter 1201 GIC 798 : if (dbcollversion == NULL)
1202 792 : dbcollversion = src_collversion;
1203 :
1204 : /*
1205 : * Normally, we copy the collation version from the template database.
419 peter 1206 ECB : * This last resort only applies if the template database does not have a
1207 : * collation version, which is normally only the case for template0.
1208 : */
419 peter 1209 CBC 798 : if (dbcollversion == NULL)
388 1210 407 : dbcollversion = get_collation_actual_version(dblocprovider, dblocprovider == COLLPROVIDER_ICU ? dbiculocale : dbcollate);
419 peter 1211 ECB :
6869 tgl 1212 : /* Resolve default tablespace for new database */
6869 tgl 1213 GBC 798 : if (dtablespacename && dtablespacename->arg)
6869 tgl 1214 GIC 8 : {
1215 : char *tablespacename;
1216 : AclResult aclresult;
1217 :
3204 1218 8 : tablespacename = defGetString(dtablespacename);
4630 rhaas 1219 8 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
1220 : /* check permissions */
147 peter 1221 GNC 8 : aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1222 : ACL_CREATE);
6797 bruce 1223 GIC 8 : if (aclresult != ACLCHECK_OK)
1954 peter_e 1224 UIC 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1225 : tablespacename);
1226 :
1227 : /* pg_global must never be the default tablespace */
5658 tgl 1228 GIC 8 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
5658 tgl 1229 UIC 0 : ereport(ERROR,
1230 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1231 : errmsg("pg_global cannot be used as default tablespace")));
1232 :
6748 tgl 1233 ECB : /*
1234 : * If we are trying to change the default tablespace of the template,
1235 : * we require that the template not have any files in the new default
1236 : * tablespace. This is necessary because otherwise the copied
1237 : * database would contain pg_class rows that refer to its default
1238 : * tablespace both explicitly (by OID) and implicitly (as zero), which
6748 tgl 1239 EUB : * would cause problems. For example another CREATE DATABASE using
1240 : * the copied database as template, and trying to change its default
1241 : * tablespace again, would yield outright incorrect results (it would
1242 : * improperly move tables to the new default tablespace that should
6748 tgl 1243 ECB : * stay in the same tablespace).
6748 tgl 1244 EUB : */
6748 tgl 1245 GIC 8 : if (dst_deftablespace != src_deftablespace)
1246 : {
1247 : char *srcpath;
1248 : struct stat st;
1249 :
1250 8 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1251 :
1252 8 : if (stat(srcpath, &st) == 0 &&
6748 tgl 1253 UIC 0 : S_ISDIR(st.st_mode) &&
1254 0 : !directory_is_empty(srcpath))
1255 0 : ereport(ERROR,
6748 tgl 1256 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1257 : errmsg("cannot assign new default tablespace \"%s\"",
1258 : tablespacename),
1259 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1260 : dbtemplate)));
6748 tgl 1261 GIC 8 : pfree(srcpath);
1262 : }
1263 : }
6869 tgl 1264 ECB : else
1265 : {
1266 : /* Use template database's default tablespace */
6869 tgl 1267 GIC 790 : dst_deftablespace = src_deftablespace;
6869 tgl 1268 ECB : /* Note there is no additional permission check in this path */
1269 : }
1270 :
1271 : /*
1272 : * If built with appropriate switch, whine when regression-testing
1380 1273 : * conventions for database names are violated. But don't complain during
1274 : * initdb.
1275 : */
1276 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1277 : if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1278 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1380 tgl 1279 EUB : #endif
1280 :
1281 : /*
1282 : * Check for db name conflict. This is just to give a more friendly error
6031 bruce 1283 ECB : * message than "unique index violation". There's a race condition but
6031 bruce 1284 EUB : * we're willing to accept the less friendly message in that case.
1285 : */
4630 rhaas 1286 GIC 798 : if (OidIsValid(get_database_oid(dbname, true)))
6184 tgl 1287 1 : ereport(ERROR,
1288 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1289 : errmsg("database \"%s\" already exists", dbname)));
1290 :
1291 : /*
1292 : * The source DB can't have any active backends, except this one
1293 : * (exception is to allow CREATE DB while connected to template1).
1294 : * Otherwise we might copy inconsistent data.
1295 : *
1296 : * This should be last among the basic error checks, because it involves
1297 : * potential waiting; we may as well throw an error first if we're gonna
1298 : * throw one.
1299 : */
5361 tgl 1300 CBC 797 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
5791 tgl 1301 UIC 0 : ereport(ERROR,
1302 : (errcode(ERRCODE_OBJECT_IN_USE),
1303 : errmsg("source database \"%s\" is being accessed by other users",
1304 : dbtemplate),
5361 tgl 1305 ECB : errdetail_busy_db(notherbackends, npreparedxacts)));
1306 :
6017 1307 : /*
5624 bruce 1308 EUB : * Select an OID for the new database, checking that it doesn't have a
1309 : * filename conflict with anything already existing in the tablespace
6017 tgl 1310 : * directories.
1311 : */
1539 andres 1312 GIC 797 : pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1313 :
1314 : /*
1315 : * If database OID is configured, check if the OID is already in use or
440 rhaas 1316 ECB : * data directory already exists.
1317 : */
440 rhaas 1318 GIC 797 : if (OidIsValid(dboid))
1319 : {
1320 615 : char *existing_dbname = get_database_name(dboid);
1321 :
440 rhaas 1322 CBC 615 : if (existing_dbname != NULL)
440 rhaas 1323 UIC 0 : ereport(ERROR,
1324 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1325 : errmsg("database OID %u is already in use by database \"%s\"",
1326 : dboid, existing_dbname));
1327 :
440 rhaas 1328 GIC 615 : if (check_db_file_conflict(dboid))
440 rhaas 1329 UIC 0 : ereport(ERROR,
1330 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1331 : errmsg("data directory with the specified OID %u already exists", dboid));
1332 : }
1333 : else
1334 : {
1335 : /* Select an OID for the new database if is not explicitly configured. */
1336 : do
1337 : {
440 rhaas 1338 GIC 182 : dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1339 : Anum_pg_database_oid);
1340 182 : } while (check_db_file_conflict(dboid));
440 rhaas 1341 ECB : }
6017 tgl 1342 :
1343 : /*
1344 : * Insert a new tuple into pg_database. This establishes our ownership of
1345 : * the new database name (anyone else trying to insert the same name will
1346 : * block on the unique index, and fail after we commit).
1347 : */
1348 :
388 peter 1349 GIC 797 : Assert((dblocprovider == COLLPROVIDER_ICU && dbiculocale) ||
1350 : (dblocprovider != COLLPROVIDER_ICU && !dbiculocale));
1351 :
1352 : /* Form tuple */
1601 andres 1353 GBC 797 : new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
6184 tgl 1354 GIC 797 : new_record[Anum_pg_database_datname - 1] =
1355 797 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
1356 797 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1357 797 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
388 peter 1358 797 : new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
3204 tgl 1359 797 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1360 797 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
6184 1361 797 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
5999 1362 797 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
3728 alvherre 1363 797 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
6184 tgl 1364 CBC 797 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
431 peter 1365 GIC 797 : new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1366 797 : new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
388 1367 797 : if (dbiculocale)
1368 784 : new_record[Anum_pg_database_daticulocale - 1] = CStringGetTextDatum(dbiculocale);
1369 : else
388 peter 1370 CBC 13 : new_record_nulls[Anum_pg_database_daticulocale - 1] = true;
32 peter 1371 GNC 797 : if (dbicurules)
32 peter 1372 UNC 0 : new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1373 : else
32 peter 1374 GNC 797 : new_record_nulls[Anum_pg_database_daticurules - 1] = true;
419 peter 1375 GIC 797 : if (dbcollversion)
419 peter 1376 CBC 792 : new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1377 : else
1378 5 : new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
6184 tgl 1379 EUB :
1380 : /*
1381 : * We deliberately set datacl to default (NULL), rather than copying it
1382 : * from the template database. Copying it would be a bad idea when the
1383 : * owner is not the same as the template's owner.
6184 tgl 1384 ECB : */
5271 tgl 1385 GBC 797 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
1386 :
5271 tgl 1387 GIC 797 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1388 : new_record, new_record_nulls);
1389 :
2259 alvherre 1390 797 : CatalogTupleInsert(pg_database_rel, tuple);
1391 :
1392 : /*
1393 : * Now generate additional catalog entries associated with the new DB
6184 tgl 1394 ECB : */
1395 :
1396 : /* Register owner dependency */
6184 tgl 1397 GIC 797 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1398 :
1399 : /* Create pg_shdepend entries for objects within database */
1400 797 : copyTemplateDependencies(src_dboid, dboid);
1401 :
1402 : /* Post creation hook for new database */
3686 rhaas 1403 797 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1404 :
8120 tgl 1405 ECB : /*
1406 : * If we're going to be reading data for the to-be-created database into
1407 : * shared_buffers, take a lock on it. Nobody should know that this
1408 : * database exists yet, but it's good to maintain the invariant that an
1409 : * AccessExclusiveLock on the database is sufficient to drop all
376 rhaas 1410 : * of its buffers without worrying about more being read later.
1411 : *
332 tgl 1412 : * Note that we need to do this before entering the
1413 : * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1414 : * expects this lock to be held already.
8177 1415 : */
376 rhaas 1416 CBC 797 : if (dbstrategy == CREATEDB_WAL_LOG)
1417 184 : LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
8994 bruce 1418 ECB :
8346 tgl 1419 : /*
6385 bruce 1420 : * Once we start copying subdirectories, we need to be able to clean 'em
5471 tgl 1421 : * up if we fail. Use an ENSURE block to make sure this happens. (This
6385 bruce 1422 : * is not a 100% solution, because of the possibility of failure during
1423 : * transaction commit after we leave this routine, but it should handle
1424 : * most scenarios.)
1425 : */
5471 tgl 1426 CBC 797 : fparms.src_dboid = src_dboid;
1427 797 : fparms.dest_dboid = dboid;
376 rhaas 1428 GBC 797 : fparms.strategy = dbstrategy;
1429 :
5471 tgl 1430 CBC 797 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
5471 tgl 1431 ECB : PointerGetDatum(&fparms));
6869 1432 : {
1433 : /*
376 rhaas 1434 : * If the user has asked to create a database with WAL_LOG strategy
1435 : * then call CreateDatabaseUsingWalLog, which will copy the database
1436 : * at the block level and it will WAL log each copied block.
1437 : * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1438 : * database file by file.
1439 : */
376 rhaas 1440 GIC 797 : if (dbstrategy == CREATEDB_WAL_LOG)
376 rhaas 1441 CBC 184 : CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1442 : dst_deftablespace);
376 rhaas 1443 ECB : else
376 rhaas 1444 GIC 613 : CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1445 : dst_deftablespace);
8181 tgl 1446 ECB :
1447 : /*
1448 : * Close pg_database, but keep lock till commit.
1449 : */
1539 andres 1450 GIC 797 : table_close(pg_database_rel, NoLock);
1451 :
1452 : /*
4968 alvherre 1453 ECB : * Force synchronous commit, thus minimizing the window between
1454 : * creation of the database files and committal of the transaction. If
1455 : * we crash before committing, we'll have a DB that's taking up disk
5624 bruce 1456 : * space but is not in pg_database, which is not good.
1457 : */
4968 alvherre 1458 GIC 797 : ForceSyncCommit();
6459 tgl 1459 ECB : }
5471 tgl 1460 GIC 797 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
1461 : PointerGetDatum(&fparms));
1462 :
3753 rhaas 1463 797 : return dboid;
1464 : }
1465 :
1466 : /*
1467 : * Check whether chosen encoding matches chosen locale settings. This
1468 : * restriction is necessary because libc's locale-specific code usually
1469 : * fails when presented with data in an encoding it's not expecting. We
1470 : * allow mismatch in four cases:
1471 : *
4439 peter_e 1472 ECB : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1473 : * which works with any encoding.
1474 : *
1475 : * 2. locale encoding = -1, which means that we couldn't determine the
1476 : * locale's encoding and have to trust the user to get it right.
1477 : *
1478 : * 3. selected encoding is UTF8 and platform is win32. This is because
1479 : * UTF8 is a pseudo codepage that is supported in all locales since it's
1480 : * converted to UTF16 before being used.
1481 : *
1482 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1483 : * is risky but we have historically allowed it --- notably, the
1484 : * regression tests require it.
1485 : *
1486 : * Note: if you change this policy, fix initdb to match.
1487 : */
1488 : void
4439 peter_e 1489 GIC 830 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1490 : {
4382 bruce 1491 830 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1492 830 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
1493 :
4439 peter_e 1494 833 : if (!(ctype_encoding == encoding ||
1495 9 : ctype_encoding == PG_SQL_ASCII ||
4439 peter_e 1496 ECB : ctype_encoding == -1 ||
1497 : #ifdef WIN32
1498 : encoding == PG_UTF8 ||
1499 : #endif
4439 peter_e 1500 CBC 3 : (encoding == PG_SQL_ASCII && superuser())))
4439 peter_e 1501 UIC 0 : ereport(ERROR,
1502 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1503 : errmsg("encoding \"%s\" does not match locale \"%s\"",
1504 : pg_encoding_to_char(encoding),
1505 : ctype),
2118 tgl 1506 ECB : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1507 : pg_encoding_to_char(ctype_encoding))));
1508 :
4439 peter_e 1509 GIC 833 : if (!(collate_encoding == encoding ||
1510 9 : collate_encoding == PG_SQL_ASCII ||
1511 : collate_encoding == -1 ||
1512 : #ifdef WIN32
1513 : encoding == PG_UTF8 ||
4439 peter_e 1514 ECB : #endif
4439 peter_e 1515 GIC 3 : (encoding == PG_SQL_ASCII && superuser())))
4439 peter_e 1516 LBC 0 : ereport(ERROR,
1517 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1518 : errmsg("encoding \"%s\" does not match locale \"%s\"",
4439 peter_e 1519 ECB : pg_encoding_to_char(encoding),
1520 : collate),
1521 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1522 : pg_encoding_to_char(collate_encoding))));
4439 peter_e 1523 GIC 830 : }
1524 :
1525 : /* Error cleanup callback for createdb */
1526 : static void
5471 tgl 1527 UIC 0 : createdb_failure_callback(int code, Datum arg)
1528 : {
1529 0 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
1530 :
1531 : /*
1532 : * If we were copying database at block levels then drop pages for the
1533 : * destination database that are in the shared buffer cache. And tell
1534 : * checkpointer to forget any pending fsync and unlink requests for files
1535 : * in the database. The reasoning behind doing this is same as explained
1536 : * in dropdb function. But unlike dropdb we don't need to call
1537 : * pgstat_drop_database because this database is still not created so
1538 : * there should not be any stat for this.
1539 : */
376 rhaas 1540 0 : if (fparms->strategy == CREATEDB_WAL_LOG)
1541 : {
1542 0 : DropDatabaseBuffers(fparms->dest_dboid);
1543 0 : ForgetDatabaseSyncRequests(fparms->dest_dboid);
1544 :
376 rhaas 1545 ECB : /* Release lock on the target database. */
376 rhaas 1546 UIC 0 : UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
376 rhaas 1547 ECB : AccessShareLock);
1548 : }
1549 :
5471 tgl 1550 : /*
5050 bruce 1551 : * Release lock on source database before doing recursive remove. This is
1552 : * not essential but it seems desirable to release the lock as soon as
1553 : * possible.
1554 : */
5471 tgl 1555 UIC 0 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
5471 tgl 1556 ECB :
5471 tgl 1557 EUB : /* Throw away any successfully copied subdirectories */
5471 tgl 1558 UIC 0 : remove_dbtablespaces(fparms->dest_dboid);
8181 1559 0 : }
1560 :
1561 :
1562 : /*
1563 : * DROP DATABASE
1564 : */
9770 scrappy 1565 ECB : void
1244 akapila 1566 CBC 35 : dropdb(const char *dbname, bool missing_ok, bool force)
1567 : {
1568 : Oid db_id;
1569 : bool db_istemplate;
1570 : Relation pgdbrel;
8598 tgl 1571 ECB : HeapTuple tup;
5361 tgl 1572 EUB : int notherbackends;
1573 : int npreparedxacts;
1574 : int nslots,
1575 : nslots_active;
1576 : int nsubscriptions;
1577 :
1578 : /*
6031 bruce 1579 ECB : * Look up the target database's OID, and get exclusive lock on it. We
1580 : * need this to ensure that no new backend starts up in the target
1581 : * database while we are deleting it (see postinit.c), and that no one is
1582 : * using it as a CREATE DATABASE template or trying to delete it for
6184 tgl 1583 EUB : * themselves.
1584 : */
1539 andres 1585 GBC 35 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1586 :
6184 tgl 1587 GIC 35 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1588 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1589 : {
6347 bruce 1590 14 : if (!missing_ok)
1591 : {
andrew 1592 6 : ereport(ERROR,
1593 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1594 : errmsg("database \"%s\" does not exist", dbname)));
1595 : }
6347 andrew 1596 EUB : else
1597 : {
1598 : /* Close pg_database, release the lock, since we changed nothing */
1539 andres 1599 GBC 8 : table_close(pgdbrel, RowExclusiveLock);
6347 bruce 1600 GIC 8 : ereport(NOTICE,
1601 : (errmsg("database \"%s\" does not exist, skipping",
6347 andrew 1602 EUB : dbname)));
6347 andrew 1603 GIC 8 : return;
1604 : }
1605 : }
1606 :
1607 : /*
1608 : * Permission checks
1609 : */
147 peter 1610 GNC 21 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1954 peter_e 1611 UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1612 : dbname);
1613 :
4048 rhaas 1614 EUB : /* DROP hook for the database being removed */
3686 rhaas 1615 GBC 21 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1616 :
1617 : /*
1618 : * Disallow dropping a DB that is marked istemplate. This is just to
1619 : * prevent people from accidentally dropping template0 or template1; they
1620 : * can do so if they're really determined ...
1621 : */
8181 tgl 1622 CBC 21 : if (db_istemplate)
7205 tgl 1623 UIC 0 : ereport(ERROR,
1624 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1625 : errmsg("cannot drop a template database")));
1626 :
1627 : /* Obviously can't drop my own database */
5791 tgl 1628 GIC 21 : if (db_id == MyDatabaseId)
5791 tgl 1629 UIC 0 : ereport(ERROR,
1630 : (errcode(ERRCODE_OBJECT_IN_USE),
1631 : errmsg("cannot drop the currently open database")));
1632 :
1633 : /*
1634 : * Check whether there are active logical slots that refer to the
1635 : * to-be-dropped database. The database lock we are holding prevents the
1636 : * creation of new slots using the database or existing slots becoming
1637 : * active.
1638 : */
2203 simon 1639 GIC 21 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1640 21 : if (nslots_active)
2203 simon 1641 ECB : {
3324 rhaas 1642 GIC 1 : ereport(ERROR,
3324 rhaas 1643 ECB : (errcode(ERRCODE_OBJECT_IN_USE),
1644 : errmsg("database \"%s\" is used by an active logical replication slot",
1645 : dbname),
1876 peter_e 1646 : errdetail_plural("There is %d active slot.",
1647 : "There are %d active slots.",
2203 simon 1648 : nslots_active, nslots_active)));
1649 : }
1650 :
1651 : /*
1652 : * Check if there are subscriptions defined in the target database.
1653 : *
1654 : * We can't drop them automatically because they might be holding
2271 peter_e 1655 : * resources in other databases/instances.
1656 : */
2271 peter_e 1657 GIC 20 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
2271 peter_e 1658 UIC 0 : ereport(ERROR,
2271 peter_e 1659 ECB : (errcode(ERRCODE_OBJECT_IN_USE),
1660 : errmsg("database \"%s\" is being used by logical replication subscription",
1661 : dbname),
1662 : errdetail_plural("There is %d subscription.",
1663 : "There are %d subscriptions.",
1664 : nsubscriptions, nsubscriptions)));
1665 :
1244 akapila 1666 :
1244 akapila 1667 EUB : /*
1668 : * Attempt to terminate all existing connections to the target database if
1669 : * the user has requested to do so.
1670 : */
1244 akapila 1671 CBC 20 : if (force)
1244 akapila 1672 GIC 1 : TerminateOtherDBBackends(db_id);
1673 :
1674 : /*
1675 : * Check for other backends in the target database. (Because we hold the
1676 : * database lock, no new ones can start after this.)
1677 : *
1247 akapila 1678 ECB : * As in CREATE DATABASE, check this after other error conditions.
1247 akapila 1679 EUB : */
1247 akapila 1680 GIC 20 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1247 akapila 1681 UIC 0 : ereport(ERROR,
1682 : (errcode(ERRCODE_OBJECT_IN_USE),
1683 : errmsg("database \"%s\" is being accessed by other users",
1247 akapila 1684 ECB : dbname),
1247 akapila 1685 EUB : errdetail_busy_db(notherbackends, npreparedxacts)));
1686 :
1687 : /*
1688 : * Remove the database's tuple from pg_database.
1689 : */
4802 rhaas 1690 GIC 20 : tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
8598 tgl 1691 20 : if (!HeapTupleIsValid(tup))
6185 tgl 1692 UIC 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1693 :
2258 tgl 1694 GIC 20 : CatalogTupleDelete(pgdbrel, &tup->t_self);
8598 tgl 1695 ECB :
6185 tgl 1696 CBC 20 : ReleaseSysCache(tup);
1697 :
7576 tgl 1698 ECB : /*
1699 : * Delete any comments or security labels associated with the database.
1700 : */
6265 bruce 1701 GIC 20 : DeleteSharedComments(db_id, DatabaseRelationId);
4281 rhaas 1702 20 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1703 :
1704 : /*
1705 : * Remove settings associated with this database
1706 : */
4932 alvherre 1707 20 : DropSetting(db_id, InvalidOid);
1708 :
1709 : /*
1710 : * Remove shared dependency references for the database.
1711 : */
6390 tgl 1712 20 : dropDatabaseDependencies(db_id);
6390 tgl 1713 ECB :
2203 simon 1714 EUB : /*
1715 : * Drop db-specific replication slots.
1716 : */
2203 simon 1717 GIC 20 : ReplicationSlotsDropDBSlots(db_id);
1718 :
1719 : /*
1720 : * Drop pages for this database that are in the shared buffer cache. This
1721 : * is important to ensure that no remaining backend tries to write out a
1722 : * dirty buffer to the dead database later...
1723 : */
6220 tgl 1724 20 : DropDatabaseBuffers(db_id);
1725 :
1726 : /*
368 andres 1727 ECB : * Tell the cumulative stats system to forget it immediately, too.
5903 tgl 1728 : */
5903 tgl 1729 GIC 20 : pgstat_drop_database(db_id);
1730 :
1731 : /*
1732 : * Tell checkpointer to forget any pending fsync and unlink requests for
1733 : * files in the database; else the fsyncs will fail at next checkpoint, or
1734 : * worse, it will delete files that belong to a newly created database
1735 : * with the same OID.
5926 tgl 1736 ECB : */
1466 tmunro 1737 GBC 20 : ForgetDatabaseSyncRequests(db_id);
1738 :
1739 : /*
1740 : * Force a checkpoint to make sure the checkpointer has received the
1741 : * message sent by ForgetDatabaseSyncRequests.
1742 : */
5764 tgl 1743 GIC 20 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1744 :
1745 : /* Close all smgr fds in all backends. */
421 tmunro 1746 CBC 20 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
421 tmunro 1747 ECB :
9345 bruce 1748 EUB : /*
1749 : * Remove all tablespace subdirs belonging to the database.
9345 bruce 1750 ECB : */
6869 tgl 1751 GIC 20 : remove_dbtablespaces(db_id);
8120 tgl 1752 ECB :
1753 : /*
1754 : * Close pg_database, but keep lock till commit.
1755 : */
1539 andres 1756 GIC 20 : table_close(pgdbrel, NoLock);
6622 tgl 1757 ECB :
1758 : /*
1759 : * Force synchronous commit, thus minimizing the window between removal of
1760 : * the database files and committal of the transaction. If we crash before
1761 : * committing, we'll have a DB that's gone on disk but still there
1762 : * according to pg_database, which is not good.
1763 : */
4968 alvherre 1764 GIC 20 : ForceSyncCommit();
1765 : }
1766 :
1767 :
7226 peter_e 1768 ECB : /*
1769 : * Rename database
1770 : */
1771 : ObjectAddress
7226 peter_e 1772 UIC 0 : RenameDatabase(const char *oldname, const char *newname)
7226 peter_e 1773 ECB : {
1774 : Oid db_id;
1775 : HeapTuple newtup;
1776 : Relation rel;
1777 : int notherbackends;
1778 : int npreparedxacts;
1779 : ObjectAddress address;
1780 :
1781 : /*
1782 : * Look up the target database's OID, and get exclusive lock on it. We
1783 : * need this for the same reasons as DROP DATABASE.
1784 : */
1539 andres 1785 LBC 0 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
1786 :
6184 tgl 1787 UIC 0 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
1788 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
7226 peter_e 1789 0 : ereport(ERROR,
1790 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1791 : errmsg("database \"%s\" does not exist", oldname)));
1792 :
5791 tgl 1793 ECB : /* must be owner */
147 peter 1794 UNC 0 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1954 peter_e 1795 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1796 : oldname);
1797 :
1798 : /* must have createdb rights */
3029 alvherre 1799 LBC 0 : if (!have_createdb_privilege())
5791 tgl 1800 UIC 0 : ereport(ERROR,
1801 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5791 tgl 1802 ECB : errmsg("permission denied to rename database")));
1803 :
1804 : /*
1805 : * If built with appropriate switch, whine when regression-testing
1806 : * conventions for database names are violated.
1380 1807 : */
1808 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1809 : if (strstr(newname, "regression") == NULL)
1810 : elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1811 : #endif
1812 :
1813 : /*
1814 : * Make sure the new name doesn't exist. See notes for same error in
1815 : * CREATE DATABASE.
1816 : */
4630 rhaas 1817 UIC 0 : if (OidIsValid(get_database_oid(newname, true)))
5791 tgl 1818 0 : ereport(ERROR,
1819 : (errcode(ERRCODE_DUPLICATE_DATABASE),
5791 tgl 1820 ECB : errmsg("database \"%s\" already exists", newname)));
1821 :
1822 : /*
1823 : * XXX Client applications probably store the current database somewhere,
1824 : * so renaming it could cause confusion. On the other hand, there may not
1825 : * be an actual problem besides a little confusion, so think about this
1826 : * and decide.
1827 : */
6184 tgl 1828 UBC 0 : if (db_id == MyDatabaseId)
7226 peter_e 1829 UIC 0 : ereport(ERROR,
1830 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1831 : errmsg("current database cannot be renamed")));
1832 :
1833 : /*
1834 : * Make sure the database does not have active sessions. This is the same
1835 : * concern as above, but applied to other sessions.
1836 : *
1837 : * As in CREATE DATABASE, check this after other error conditions.
1838 : */
5361 tgl 1839 0 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
7205 1840 0 : ereport(ERROR,
7205 tgl 1841 EUB : (errcode(ERRCODE_OBJECT_IN_USE),
1842 : errmsg("database \"%s\" is being accessed by other users",
5361 1843 : oldname),
1844 : errdetail_busy_db(notherbackends, npreparedxacts)));
7226 peter_e 1845 :
1846 : /* rename */
4802 rhaas 1847 UIC 0 : newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
6184 tgl 1848 0 : if (!HeapTupleIsValid(newtup))
1849 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
7226 peter_e 1850 UBC 0 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
2259 alvherre 1851 0 : CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1852 :
3675 rhaas 1853 UIC 0 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1854 :
2959 alvherre 1855 UBC 0 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2959 alvherre 1856 EUB :
1857 : /*
1858 : * Close pg_database, but keep lock till commit.
1859 : */
1539 andres 1860 UIC 0 : table_close(rel, NoLock);
1861 :
2959 alvherre 1862 0 : return address;
1863 : }
1864 :
1865 :
1866 : /*
1867 : * ALTER DATABASE SET TABLESPACE
1868 : */
1869 : static void
5266 tgl 1870 GIC 5 : movedb(const char *dbname, const char *tblspcname)
1871 : {
1872 : Oid db_id;
5050 bruce 1873 EUB : Relation pgdbrel;
1874 : int notherbackends;
1875 : int npreparedxacts;
1876 : HeapTuple oldtuple,
1877 : newtuple;
1878 : Oid src_tblspcoid,
1879 : dst_tblspcoid;
1880 : ScanKeyData scankey;
1881 : SysScanDesc sysscan;
1882 : AclResult aclresult;
1883 : char *src_dbpath;
1884 : char *dst_dbpath;
1885 : DIR *dstdir;
1886 : struct dirent *xlde;
1887 : movedb_failure_params fparms;
1888 :
1889 : /*
1890 : * Look up the target database's OID, and get exclusive lock on it. We
1891 : * need this to ensure that no new backend starts up in the database while
5266 tgl 1892 : * we are moving it, and that no one is using it as a CREATE DATABASE
1893 : * template or trying to delete it.
1894 : */
1539 andres 1895 GIC 5 : pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1896 :
5266 tgl 1897 5 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1898 : NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
5266 tgl 1899 UIC 0 : ereport(ERROR,
5266 tgl 1900 EUB : (errcode(ERRCODE_UNDEFINED_DATABASE),
1901 : errmsg("database \"%s\" does not exist", dbname)));
1902 :
1903 : /*
1904 : * We actually need a session lock, so that the lock will persist across
1905 : * the commit/restart below. (We could almost get away with letting the
1906 : * lock be released at commit, except that someone could try to move
1907 : * relations of the DB back into the old directory while we rmtree() it.)
1908 : */
5266 tgl 1909 GIC 5 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1910 : AccessExclusiveLock);
1911 :
1912 : /*
5266 tgl 1913 EUB : * Permission checks
1914 : */
147 peter 1915 GNC 5 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1954 peter_e 1916 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1917 : dbname);
1918 :
1919 : /*
1920 : * Obviously can't move the tables of my own database
1921 : */
5266 tgl 1922 GIC 5 : if (db_id == MyDatabaseId)
5266 tgl 1923 LBC 0 : ereport(ERROR,
1924 : (errcode(ERRCODE_OBJECT_IN_USE),
1925 : errmsg("cannot change the tablespace of the currently open database")));
1926 :
1927 : /*
1928 : * Get tablespace's oid
1929 : */
4630 rhaas 1930 GIC 5 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1931 :
1932 : /*
1933 : * Permission checks
1934 : */
147 peter 1935 GNC 5 : aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
1936 : ACL_CREATE);
5266 tgl 1937 GIC 5 : if (aclresult != ACLCHECK_OK)
1954 peter_e 1938 UIC 0 : aclcheck_error(aclresult, OBJECT_TABLESPACE,
1939 : tblspcname);
1940 :
1941 : /*
1942 : * pg_global must never be the default tablespace
1943 : */
5266 tgl 1944 GIC 5 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
5266 tgl 1945 UIC 0 : ereport(ERROR,
1946 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1947 : errmsg("pg_global cannot be used as default tablespace")));
5266 tgl 1948 ECB :
1949 : /*
1950 : * No-op if same tablespace
1951 : */
5266 tgl 1952 GBC 5 : if (src_tblspcoid == dst_tblspcoid)
1953 : {
1539 andres 1954 UIC 0 : table_close(pgdbrel, NoLock);
5266 tgl 1955 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1956 : AccessExclusiveLock);
1957 0 : return;
1958 : }
1959 :
1960 : /*
1961 : * Check for other backends in the target database. (Because we hold the
5266 tgl 1962 ECB : * database lock, no new ones can start after this.)
1963 : *
1964 : * As in CREATE DATABASE, check this after other error conditions.
1965 : */
5266 tgl 1966 GIC 5 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
5266 tgl 1967 UIC 0 : ereport(ERROR,
5266 tgl 1968 ECB : (errcode(ERRCODE_OBJECT_IN_USE),
5266 tgl 1969 EUB : errmsg("database \"%s\" is being accessed by other users",
1970 : dbname),
1971 : errdetail_busy_db(notherbackends, npreparedxacts)));
1972 :
1973 : /*
1974 : * Get old and new database paths
5266 tgl 1975 ECB : */
5266 tgl 1976 GBC 5 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
5266 tgl 1977 GIC 5 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1978 :
1979 : /*
1980 : * Force a checkpoint before proceeding. This will force all dirty
1981 : * buffers, including those of unlogged tables, out to disk, to ensure
1982 : * source database is up-to-date on disk for the copy.
5050 bruce 1983 ECB : * FlushDatabaseBuffers() would suffice for that, but we also want to
1984 : * process any pending unlink requests. Otherwise, the check for existing
1985 : * files in the target directory might fail unnecessarily, not to mention
1986 : * that the copy might fail due to source files getting deleted under it.
1987 : * On Windows, this also ensures that background procs don't hold any open
1988 : * files, which would cause rmdir() to fail.
1989 : */
3093 andres 1990 CBC 5 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
3093 andres 1991 EUB : | CHECKPOINT_FLUSH_ALL);
1992 :
1993 : /* Close all smgr fds in all backends. */
421 tmunro 1994 GIC 5 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
1995 :
1996 : /*
3078 tgl 1997 ECB : * Now drop all buffers holding data of the target database; they should
3078 tgl 1998 EUB : * no longer be dirty so DropDatabaseBuffers is safe.
1999 : *
2000 : * It might seem that we could just let these buffers age out of shared
2001 : * buffers naturally, since they should not get referenced anymore. The
2002 : * problem with that is that if the user later moves the database back to
2003 : * its original tablespace, any still-surviving buffers would appear to
2004 : * contain valid data again --- but they'd be missing any changes made in
3078 tgl 2005 ECB : * the database while it was in the new tablespace. In any case, freeing
2006 : * buffers that should never be used again seems worth the cycles.
3078 tgl 2007 EUB : *
2008 : * Note: it'd be sufficient to get rid of buffers matching db_id and
2009 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
2010 : */
3078 tgl 2011 GIC 5 : DropDatabaseBuffers(db_id);
2012 :
2013 : /*
2014 : * Check for existence of files in the target directory, i.e., objects of
2015 : * this database that are already in the target tablespace. We can't
2016 : * allow the move in such a case, because we would need to change those
2017 : * relations' pg_class.reltablespace entries to zero, and we don't have
2018 : * access to the DB's pg_class to do so.
5266 tgl 2019 ECB : */
5266 tgl 2020 GBC 5 : dstdir = AllocateDir(dst_dbpath);
5266 tgl 2021 GIC 5 : if (dstdir != NULL)
2022 : {
5266 tgl 2023 UIC 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2024 : {
2025 0 : if (strcmp(xlde->d_name, ".") == 0 ||
2026 0 : strcmp(xlde->d_name, "..") == 0)
2027 0 : continue;
2028 :
5266 tgl 2029 LBC 0 : ereport(ERROR,
5086 tgl 2030 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2031 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2032 : dbname, tblspcname),
2033 : errhint("You must move them back to the database's default tablespace before using this command.")));
2034 : }
2035 :
5266 tgl 2036 UIC 0 : FreeDir(dstdir);
2037 :
2038 : /*
2039 : * The directory exists but is empty. We must remove it before using
2040 : * the copydir function.
2041 : */
2042 0 : if (rmdir(dst_dbpath) != 0)
5266 tgl 2043 LBC 0 : elog(ERROR, "could not remove directory \"%s\": %m",
2044 : dst_dbpath);
2045 : }
2046 :
5266 tgl 2047 ECB : /*
2048 : * Use an ENSURE block to make sure we remove the debris if the copy fails
2049 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
2050 : * of the possibility of failure during transaction commit, but it should
2051 : * handle most scenarios.
2052 : */
5266 tgl 2053 GIC 5 : fparms.dest_dboid = db_id;
2054 5 : fparms.dest_tsoid = dst_tblspcoid;
2055 5 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2056 : PointerGetDatum(&fparms));
2057 : {
267 peter 2058 GNC 5 : Datum new_record[Natts_pg_database] = {0};
2059 5 : bool new_record_nulls[Natts_pg_database] = {0};
2060 5 : bool new_record_repl[Natts_pg_database] = {0};
2061 :
2062 : /*
2063 : * Copy files from the old tablespace to the new one
2064 : */
5266 tgl 2065 GIC 5 : copydir(src_dbpath, dst_dbpath, false);
2066 :
2067 : /*
5266 tgl 2068 ECB : * Record the filesystem change in XLOG
2069 : */
2070 : {
2071 : xl_dbase_create_file_copy_rec xlrec;
2072 :
5266 tgl 2073 GIC 5 : xlrec.db_id = db_id;
2074 5 : xlrec.tablespace_id = dst_tblspcoid;
2075 5 : xlrec.src_db_id = db_id;
2076 5 : xlrec.src_tablespace_id = src_tblspcoid;
5266 tgl 2077 ECB :
3062 heikki.linnakangas 2078 CBC 5 : XLogBeginInsert();
376 rhaas 2079 GIC 5 : XLogRegisterData((char *) &xlrec,
376 rhaas 2080 EUB : sizeof(xl_dbase_create_file_copy_rec));
2081 :
3062 heikki.linnakangas 2082 GBC 5 : (void) XLogInsert(RM_DBASE_ID,
376 rhaas 2083 EUB : XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
5266 tgl 2084 : }
2085 :
2086 : /*
2087 : * Update the database's pg_database tuple
2088 : */
5266 tgl 2089 GIC 5 : ScanKeyInit(&scankey,
2090 : Anum_pg_database_datname,
2091 : BTEqualStrategyNumber, F_NAMEEQ,
2092 : CStringGetDatum(dbname));
5266 tgl 2093 GBC 5 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2094 : NULL, 1, &scankey);
5266 tgl 2095 GIC 5 : oldtuple = systable_getnext(sysscan);
2118 2096 5 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
5266 tgl 2097 UIC 0 : ereport(ERROR,
2098 : (errcode(ERRCODE_UNDEFINED_DATABASE),
5266 tgl 2099 EUB : errmsg("database \"%s\" does not exist", dbname)));
2100 :
5266 tgl 2101 GIC 5 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2102 5 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2103 :
2104 5 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2105 : new_record,
5266 tgl 2106 ECB : new_record_nulls, new_record_repl);
2259 alvherre 2107 CBC 5 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
5266 tgl 2108 ECB :
1601 andres 2109 GIC 5 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2110 :
5266 tgl 2111 CBC 5 : systable_endscan(sysscan);
5266 tgl 2112 ECB :
2113 : /*
2114 : * Force another checkpoint here. As in CREATE DATABASE, this is to
2115 : * ensure that we don't have to replay a committed
2116 : * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2117 : * any unlogged operations done in the new DB tablespace before the
376 rhaas 2118 : * next checkpoint.
2119 : */
5266 tgl 2120 GIC 5 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
2121 :
2122 : /*
2123 : * Force synchronous commit, thus minimizing the window between
2124 : * copying the database files and committal of the transaction. If we
2125 : * crash before committing, we'll leave an orphaned set of files on
5266 tgl 2126 ECB : * disk, which is not fatal but not good either.
2127 : */
4968 alvherre 2128 CBC 5 : ForceSyncCommit();
5266 tgl 2129 ECB :
2130 : /*
4968 alvherre 2131 : * Close pg_database, but keep lock till commit.
5266 tgl 2132 : */
1539 andres 2133 GIC 5 : table_close(pgdbrel, NoLock);
2134 : }
5266 tgl 2135 CBC 5 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
2136 : PointerGetDatum(&fparms));
2137 :
2138 : /*
2139 : * Commit the transaction so that the pg_database update is committed. If
2140 : * we crash while removing files, the database won't be corrupt, we'll
2141 : * just leave some orphaned files in the old directory.
5266 tgl 2142 ECB : *
2143 : * (This is OK because we know we aren't inside a transaction block.)
2144 : *
2145 : * XXX would it be safe/better to do this inside the ensure block? Not
2146 : * convinced it's a good idea; consider elog just after the transaction
2147 : * really commits.
2148 : */
5266 tgl 2149 CBC 5 : PopActiveSnapshot();
5266 tgl 2150 GBC 5 : CommitTransactionCommand();
2151 :
2152 : /* Start new transaction for the remaining work; don't need a snapshot */
5266 tgl 2153 GIC 5 : StartTransactionCommand();
5266 tgl 2154 ECB :
2155 : /*
2156 : * Remove files from the old tablespace
2157 : */
5266 tgl 2158 GIC 5 : if (!rmtree(src_dbpath, true))
5266 tgl 2159 UIC 0 : ereport(WARNING,
5266 tgl 2160 ECB : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2161 : src_dbpath)));
2162 :
2163 : /*
2164 : * Record the filesystem change in XLOG
2165 : */
2166 : {
2167 : xl_dbase_drop_rec xlrec;
2168 :
5266 tgl 2169 GIC 5 : xlrec.db_id = db_id;
1235 fujii 2170 5 : xlrec.ntablespaces = 1;
2171 :
3062 heikki.linnakangas 2172 5 : XLogBeginInsert();
3062 heikki.linnakangas 2173 CBC 5 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1235 fujii 2174 GIC 5 : XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
2175 :
3062 heikki.linnakangas 2176 5 : (void) XLogInsert(RM_DBASE_ID,
2177 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2178 : }
2179 :
2180 : /* Now it's safe to release the database lock */
5266 tgl 2181 CBC 5 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2182 : AccessExclusiveLock);
2183 :
349 alvherre 2184 GIC 5 : pfree(src_dbpath);
2185 5 : pfree(dst_dbpath);
5266 tgl 2186 ECB : }
2187 :
2188 : /* Error cleanup callback for movedb */
2189 : static void
5266 tgl 2190 UIC 0 : movedb_failure_callback(int code, Datum arg)
2191 : {
2192 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
2193 : char *dstpath;
2194 :
2195 : /* Get rid of anything we managed to copy to the target directory */
2196 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2197 :
2198 0 : (void) rmtree(dstpath, true);
2199 :
349 alvherre 2200 0 : pfree(dstpath);
5266 tgl 2201 0 : }
5266 tgl 2202 ECB :
1244 akapila 2203 : /*
2204 : * Process options and call dropdb function.
2205 : */
2206 : void
1244 akapila 2207 GIC 35 : DropDatabase(ParseState *pstate, DropdbStmt *stmt)
2208 : {
2209 35 : bool force = false;
2210 : ListCell *lc;
1244 akapila 2211 ECB :
1244 akapila 2212 GBC 48 : foreach(lc, stmt->options)
2213 : {
1244 akapila 2214 GIC 13 : DefElem *opt = (DefElem *) lfirst(lc);
2215 :
2216 13 : if (strcmp(opt->defname, "force") == 0)
2217 13 : force = true;
2218 : else
1244 akapila 2219 UIC 0 : ereport(ERROR,
2220 : (errcode(ERRCODE_SYNTAX_ERROR),
2221 : errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
1244 akapila 2222 ECB : parser_errposition(pstate, opt->location)));
2223 : }
2224 :
1244 akapila 2225 CBC 35 : dropdb(stmt->dbname, stmt->missing_ok, force);
2226 28 : }
5266 tgl 2227 ECB :
2228 : /*
6461 2229 : * ALTER DATABASE name ...
2230 : */
2231 : Oid
2406 peter_e 2232 GIC 10 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2233 : {
6461 tgl 2234 ECB : Relation rel;
2235 : Oid dboid;
2236 : HeapTuple tuple,
2237 : newtuple;
1601 andres 2238 : Form_pg_database datform;
2239 : ScanKeyData scankey;
2240 : SysScanDesc scan;
2241 : ListCell *option;
3204 tgl 2242 GIC 10 : bool dbistemplate = false;
3204 tgl 2243 GBC 10 : bool dballowconnections = true;
3204 tgl 2244 GIC 10 : int dbconnlimit = -1;
3204 tgl 2245 GBC 10 : DefElem *distemplate = NULL;
3204 tgl 2246 GIC 10 : DefElem *dallowconnections = NULL;
6461 2247 10 : DefElem *dconnlimit = NULL;
5266 2248 10 : DefElem *dtablespace = NULL;
267 peter 2249 GNC 10 : Datum new_record[Natts_pg_database] = {0};
2250 10 : bool new_record_nulls[Natts_pg_database] = {0};
2251 10 : bool new_record_repl[Natts_pg_database] = {0};
2252 :
6461 tgl 2253 EUB : /* Extract options from the statement node tree */
6461 tgl 2254 GBC 20 : foreach(option, stmt->options)
2255 : {
6461 tgl 2256 GIC 10 : DefElem *defel = (DefElem *) lfirst(option);
2257 :
3204 2258 10 : if (strcmp(defel->defname, "is_template") == 0)
2259 : {
3204 tgl 2260 CBC 3 : if (distemplate)
633 dean.a.rasheed 2261 UIC 0 : errorConflictingDefElem(defel, pstate);
3204 tgl 2262 CBC 3 : distemplate = defel;
2263 : }
3204 tgl 2264 GIC 7 : else if (strcmp(defel->defname, "allow_connections") == 0)
3204 tgl 2265 ECB : {
3204 tgl 2266 GIC 2 : if (dallowconnections)
633 dean.a.rasheed 2267 LBC 0 : errorConflictingDefElem(defel, pstate);
3204 tgl 2268 GIC 2 : dallowconnections = defel;
3204 tgl 2269 ECB : }
3204 tgl 2270 CBC 5 : else if (strcmp(defel->defname, "connection_limit") == 0)
2271 : {
6461 tgl 2272 UBC 0 : if (dconnlimit)
633 dean.a.rasheed 2273 UIC 0 : errorConflictingDefElem(defel, pstate);
6461 tgl 2274 0 : dconnlimit = defel;
2275 : }
5266 tgl 2276 GIC 5 : else if (strcmp(defel->defname, "tablespace") == 0)
2277 : {
5266 tgl 2278 CBC 5 : if (dtablespace)
633 dean.a.rasheed 2279 LBC 0 : errorConflictingDefElem(defel, pstate);
5266 tgl 2280 GIC 5 : dtablespace = defel;
2281 : }
2282 : else
3204 tgl 2283 UIC 0 : ereport(ERROR,
2284 : (errcode(ERRCODE_SYNTAX_ERROR),
2406 peter_e 2285 ECB : errmsg("option \"%s\" not recognized", defel->defname),
2286 : parser_errposition(pstate, defel->location)));
2287 : }
2288 :
5266 tgl 2289 GIC 10 : if (dtablespace)
2290 : {
2291 : /*
2292 : * While the SET TABLESPACE syntax doesn't allow any other options,
2293 : * somebody could write "WITH TABLESPACE ...". Forbid any other
2294 : * options from being specified in that case.
3204 tgl 2295 ECB : */
3204 tgl 2296 CBC 5 : if (list_length(stmt->options) != 1)
3204 tgl 2297 LBC 0 : ereport(ERROR,
3204 tgl 2298 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2118 2299 : errmsg("option \"%s\" cannot be specified with other options",
2300 : dtablespace->defname),
2406 peter_e 2301 : parser_errposition(pstate, dtablespace->location)));
5266 tgl 2302 : /* this case isn't allowed within a transaction block */
1878 peter_e 2303 CBC 5 : PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
3204 tgl 2304 5 : movedb(stmt->dbname, defGetString(dtablespace));
3753 rhaas 2305 GIC 5 : return InvalidOid;
2306 : }
5266 tgl 2307 ECB :
3204 tgl 2308 GIC 5 : if (distemplate && distemplate->arg)
3204 tgl 2309 CBC 3 : dbistemplate = defGetBoolean(distemplate);
3204 tgl 2310 GIC 5 : if (dallowconnections && dallowconnections->arg)
3204 tgl 2311 CBC 2 : dballowconnections = defGetBoolean(dallowconnections);
3204 tgl 2312 GIC 5 : if (dconnlimit && dconnlimit->arg)
5182 heikki.linnakangas 2313 ECB : {
3204 tgl 2314 UBC 0 : dbconnlimit = defGetInt32(dconnlimit);
3204 tgl 2315 LBC 0 : if (dbconnlimit < -1)
5182 heikki.linnakangas 2316 UIC 0 : ereport(ERROR,
5182 heikki.linnakangas 2317 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2318 : errmsg("invalid connection limit: %d", dbconnlimit)));
2319 : }
6461 tgl 2320 EUB :
6461 tgl 2321 ECB : /*
2322 : * Get the old tuple. We don't need a lock on the database per se,
6184 2323 : * because we're not going to do anything that would mess up incoming
2324 : * connections.
6461 tgl 2325 EUB : */
1539 andres 2326 GBC 5 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
6461 tgl 2327 5 : ScanKeyInit(&scankey,
2328 : Anum_pg_database_datname,
6461 tgl 2329 ECB : BTEqualStrategyNumber, F_NAMEEQ,
2399 tgl 2330 GIC 5 : CStringGetDatum(stmt->dbname));
6461 tgl 2331 CBC 5 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
3568 rhaas 2332 EUB : NULL, 1, &scankey);
6461 tgl 2333 CBC 5 : tuple = systable_getnext(scan);
6461 tgl 2334 GIC 5 : if (!HeapTupleIsValid(tuple))
6461 tgl 2335 UIC 0 : ereport(ERROR,
6461 tgl 2336 EUB : (errcode(ERRCODE_UNDEFINED_DATABASE),
2337 : errmsg("database \"%s\" does not exist", stmt->dbname)));
2338 :
1601 andres 2339 GIC 5 : datform = (Form_pg_database) GETSTRUCT(tuple);
2340 5 : dboid = datform->oid;
2341 :
147 peter 2342 GNC 5 : if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
1954 peter_e 2343 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
6461 tgl 2344 0 : stmt->dbname);
2345 :
2346 : /*
2347 : * In order to avoid getting locked out and having to go through
2348 : * standalone mode, we refuse to disallow connections to the database
3204 tgl 2349 ECB : * we're currently connected to. Lockout can still happen with concurrent
3204 tgl 2350 EUB : * sessions but the likeliness of that is not high enough to worry about.
2351 : */
3204 tgl 2352 GIC 5 : if (!dballowconnections && dboid == MyDatabaseId)
3204 tgl 2353 UIC 0 : ereport(ERROR,
2354 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2355 : errmsg("cannot disallow connections for current database")));
3204 tgl 2356 ECB :
6461 2357 : /*
2358 : * Build an updated tuple, perusing the information just obtained
2359 : */
3204 tgl 2360 CBC 5 : if (distemplate)
3204 tgl 2361 ECB : {
3204 tgl 2362 GIC 3 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
3204 tgl 2363 GBC 3 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
3204 tgl 2364 EUB : }
3204 tgl 2365 GBC 5 : if (dallowconnections)
2366 : {
3204 tgl 2367 GIC 2 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2368 2 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2369 : }
6461 2370 5 : if (dconnlimit)
2371 : {
3204 tgl 2372 UIC 0 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
5271 2373 0 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2374 : }
6461 tgl 2375 ECB :
5271 tgl 2376 CBC 5 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2377 : new_record_nulls, new_record_repl);
2259 alvherre 2378 GIC 5 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
6461 tgl 2379 ECB :
1601 andres 2380 CBC 5 : InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2381 :
6461 tgl 2382 5 : systable_endscan(scan);
6461 tgl 2383 ECB :
6461 tgl 2384 EUB : /* Close pg_database, but keep lock till commit */
1539 andres 2385 GIC 5 : table_close(rel, NoLock);
2386 :
3753 rhaas 2387 5 : return dboid;
6461 tgl 2388 ECB : }
2389 :
2390 :
419 peter 2391 : /*
419 peter 2392 EUB : * ALTER DATABASE name REFRESH COLLATION VERSION
2393 : */
2394 : ObjectAddress
419 peter 2395 GIC 6 : AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
2396 : {
2397 : Relation rel;
2398 : ScanKeyData scankey;
2399 : SysScanDesc scan;
2400 : Oid db_id;
419 peter 2401 ECB : HeapTuple tuple;
419 peter 2402 EUB : Form_pg_database datForm;
2403 : ObjectAddress address;
2404 : Datum datum;
2405 : bool isnull;
2406 : char *oldversion;
2407 : char *newversion;
2408 :
419 peter 2409 CBC 6 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
419 peter 2410 GIC 6 : ScanKeyInit(&scankey,
419 peter 2411 ECB : Anum_pg_database_datname,
2412 : BTEqualStrategyNumber, F_NAMEEQ,
419 peter 2413 GIC 6 : CStringGetDatum(stmt->dbname));
419 peter 2414 CBC 6 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2415 : NULL, 1, &scankey);
2416 6 : tuple = systable_getnext(scan);
2417 6 : if (!HeapTupleIsValid(tuple))
419 peter 2418 UIC 0 : ereport(ERROR,
419 peter 2419 ECB : (errcode(ERRCODE_UNDEFINED_DATABASE),
2420 : errmsg("database \"%s\" does not exist", stmt->dbname)));
419 peter 2421 EUB :
419 peter 2422 GBC 6 : datForm = (Form_pg_database) GETSTRUCT(tuple);
419 peter 2423 GIC 6 : db_id = datForm->oid;
2424 :
147 peter 2425 GNC 6 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
419 peter 2426 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
419 peter 2427 LBC 0 : stmt->dbname);
2428 :
419 peter 2429 CBC 6 : datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
419 peter 2430 GIC 6 : oldversion = isnull ? NULL : TextDatumGetCString(datum);
419 peter 2431 ECB :
388 peter 2432 GIC 6 : datum = heap_getattr(tuple, datForm->datlocprovider == COLLPROVIDER_ICU ? Anum_pg_database_daticulocale : Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2433 6 : if (isnull)
388 peter 2434 LBC 0 : elog(ERROR, "unexpected null in pg_database");
388 peter 2435 GIC 6 : newversion = get_collation_actual_version(datForm->datlocprovider, TextDatumGetCString(datum));
419 peter 2436 ECB :
2437 : /* cannot change from NULL to non-NULL or vice versa */
419 peter 2438 GIC 6 : if ((!oldversion && newversion) || (oldversion && !newversion))
419 peter 2439 UIC 0 : elog(ERROR, "invalid collation version change");
419 peter 2440 GIC 6 : else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
419 peter 2441 UIC 0 : {
2442 0 : bool nulls[Natts_pg_database] = {0};
2443 0 : bool replaces[Natts_pg_database] = {0};
419 peter 2444 LBC 0 : Datum values[Natts_pg_database] = {0};
2445 :
419 peter 2446 UIC 0 : ereport(NOTICE,
2447 : (errmsg("changing version from %s to %s",
2448 : oldversion, newversion)));
2449 :
2450 0 : values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2451 0 : replaces[Anum_pg_database_datcollversion - 1] = true;
2452 :
2453 0 : tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2454 : values, nulls, replaces);
2455 0 : CatalogTupleUpdate(rel, &tuple->t_self, tuple);
2456 0 : heap_freetuple(tuple);
2457 : }
419 peter 2458 ECB : else
419 peter 2459 CBC 6 : ereport(NOTICE,
2460 : (errmsg("version has not changed")));
2461 :
2462 6 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
419 peter 2463 ECB :
419 peter 2464 GIC 6 : ObjectAddressSet(address, DatabaseRelationId, db_id);
419 peter 2465 ECB :
419 peter 2466 CBC 6 : systable_endscan(scan);
419 peter 2467 EUB :
419 peter 2468 GIC 6 : table_close(rel, NoLock);
2469 :
2470 6 : return address;
419 peter 2471 ECB : }
2472 :
2473 :
7709 peter_e 2474 : /*
7709 peter_e 2475 EUB : * ALTER DATABASE name SET ...
2476 : */
2477 : Oid
7709 peter_e 2478 CBC 516 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
7709 peter_e 2479 ECB : {
4630 rhaas 2480 GIC 516 : Oid datid = get_database_oid(stmt->dbname, false);
4790 bruce 2481 ECB :
6716 tgl 2482 : /*
4932 alvherre 2483 EUB : * Obtain a lock on the database and make sure it didn't go away in the
4932 alvherre 2484 ECB : * meantime.
2485 : */
4932 alvherre 2486 GIC 516 : shdepLockAndCheckObject(DatabaseRelationId, datid);
7709 peter_e 2487 ECB :
147 peter 2488 GNC 516 : if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
1954 peter_e 2489 LBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
4790 bruce 2490 UBC 0 : stmt->dbname);
7709 peter_e 2491 EUB :
4932 alvherre 2492 GBC 516 : AlterSetting(datid, InvalidOid, stmt->setstmt);
4790 bruce 2493 EUB :
4932 alvherre 2494 GIC 516 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
3753 rhaas 2495 EUB :
3753 rhaas 2496 GIC 516 : return datid;
2497 : }
2498 :
7709 peter_e 2499 EUB :
6892 bruce 2500 : /*
2501 : * ALTER DATABASE name OWNER TO newowner
2502 : */
2503 : ObjectAddress
6494 tgl 2504 GBC 18 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
6892 bruce 2505 EUB : {
2506 : Oid db_id;
2507 : HeapTuple tuple;
6892 bruce 2508 ECB : Relation rel;
2509 : ScanKeyData scankey;
2510 : SysScanDesc scan;
6797 2511 : Form_pg_database datForm;
2512 : ObjectAddress address;
6892 2513 :
2514 : /*
6184 tgl 2515 : * Get the old tuple. We don't need a lock on the database per se,
2516 : * because we're not going to do anything that would mess up incoming
2517 : * connections.
2518 : */
1539 andres 2519 CBC 18 : rel = table_open(DatabaseRelationId, RowExclusiveLock);
6892 bruce 2520 GIC 18 : ScanKeyInit(&scankey,
2521 : Anum_pg_database_datname,
2522 : BTEqualStrategyNumber, F_NAMEEQ,
2523 : CStringGetDatum(dbname));
6569 tgl 2524 18 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2525 : NULL, 1, &scankey);
6892 bruce 2526 18 : tuple = systable_getnext(scan);
6892 bruce 2527 CBC 18 : if (!HeapTupleIsValid(tuple))
6892 bruce 2528 UIC 0 : ereport(ERROR,
6892 bruce 2529 ECB : (errcode(ERRCODE_UNDEFINED_DATABASE),
2530 : errmsg("database \"%s\" does not exist", dbname)));
2531 :
6825 tgl 2532 GIC 18 : datForm = (Form_pg_database) GETSTRUCT(tuple);
1601 andres 2533 18 : db_id = datForm->oid;
2534 :
6797 bruce 2535 ECB : /*
2536 : * If the new owner is the same as the existing owner, consider the
2537 : * command to have succeeded. This is to be consistent with other
6797 bruce 2538 EUB : * objects.
6862 tgl 2539 : */
6494 tgl 2540 GIC 18 : if (datForm->datdba != newOwnerId)
6862 tgl 2541 ECB : {
2542 : Datum repl_val[Natts_pg_database];
267 peter 2543 GNC 12 : bool repl_null[Natts_pg_database] = {0};
2544 12 : bool repl_repl[Natts_pg_database] = {0};
6797 bruce 2545 ECB : Acl *newAcl;
2546 : Datum aclDatum;
2547 : bool isNull;
2548 : HeapTuple newtuple;
2549 :
2550 : /* Otherwise, must be owner of the existing object */
147 peter 2551 GNC 12 : if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1954 peter_e 2552 UIC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
6478 tgl 2553 ECB : dbname);
2554 :
2555 : /* Must be able to become new owner */
142 rhaas 2556 GNC 12 : check_can_set_role(GetUserId(), newOwnerId);
2557 :
2558 : /*
2559 : * must have createdb rights
2560 : *
2561 : * NOTE: This is different from other alter-owner checks in that the
2562 : * current user is checked for createdb privileges instead of the
2563 : * destination owner. This is consistent with the CREATE case for
2564 : * databases. Because superusers will always have this right, we need
2565 : * no special case for them.
2566 : */
3029 alvherre 2567 GIC 12 : if (!have_createdb_privilege())
6862 tgl 2568 LBC 0 : ereport(ERROR,
6862 tgl 2569 ECB : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2570 : errmsg("permission denied to change owner of database")));
2571 :
5271 tgl 2572 CBC 12 : repl_repl[Anum_pg_database_datdba - 1] = true;
6494 2573 12 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
6825 tgl 2574 EUB :
2575 : /*
2576 : * Determine the modified ACL for the new owner. This is only
2577 : * necessary when the ACL is non-null.
6825 tgl 2578 ECB : */
6825 tgl 2579 CBC 12 : aclDatum = heap_getattr(tuple,
2580 : Anum_pg_database_datacl,
2581 : RelationGetDescr(rel),
2582 : &isNull);
6825 tgl 2583 GIC 12 : if (!isNull)
2584 : {
6825 tgl 2585 UIC 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
6494 tgl 2586 ECB : datForm->datdba, newOwnerId);
5271 tgl 2587 UIC 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
6825 2588 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
6825 tgl 2589 ECB : }
2590 :
5271 tgl 2591 GIC 12 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2259 alvherre 2592 12 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2593 :
6825 tgl 2594 12 : heap_freetuple(newtuple);
2595 :
2596 : /* Update owner dependency reference */
1601 andres 2597 CBC 12 : changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
6862 tgl 2598 EUB : }
2599 :
1601 andres 2600 GIC 18 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2601 :
2959 alvherre 2602 CBC 18 : ObjectAddressSet(address, DatabaseRelationId, db_id);
2603 :
6616 tgl 2604 GIC 18 : systable_endscan(scan);
2605 :
2606 : /* Close pg_database, but keep lock till commit */
1539 andres 2607 18 : table_close(rel, NoLock);
2608 :
2959 alvherre 2609 18 : return address;
2610 : }
2611 :
2612 :
419 peter 2613 ECB : Datum
419 peter 2614 GBC 303 : pg_database_collation_actual_version(PG_FUNCTION_ARGS)
2615 : {
419 peter 2616 GIC 303 : Oid dbid = PG_GETARG_OID(0);
2617 : HeapTuple tp;
388 peter 2618 ECB : char datlocprovider;
419 2619 : Datum datum;
2620 : char *version;
2621 :
419 peter 2622 GIC 303 : tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2623 303 : if (!HeapTupleIsValid(tp))
419 peter 2624 LBC 0 : ereport(ERROR,
2625 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2626 : errmsg("database with OID %u does not exist", dbid)));
2627 :
388 peter 2628 CBC 303 : datlocprovider = ((Form_pg_database) GETSTRUCT(tp))->datlocprovider;
2629 :
15 dgustafsson 2630 GNC 303 : datum = SysCacheGetAttrNotNull(DATABASEOID, tp, datlocprovider == COLLPROVIDER_ICU ? Anum_pg_database_daticulocale : Anum_pg_database_datcollate);
388 peter 2631 GBC 303 : version = get_collation_actual_version(datlocprovider, TextDatumGetCString(datum));
2632 :
419 peter 2633 GIC 303 : ReleaseSysCache(tp);
419 peter 2634 ECB :
419 peter 2635 CBC 303 : if (version)
419 peter 2636 GIC 303 : PG_RETURN_TEXT_P(cstring_to_text(version));
419 peter 2637 ECB : else
419 peter 2638 UIC 0 : PG_RETURN_NULL();
2639 : }
419 peter 2640 ECB :
2641 :
2642 : /*
8487 peter_e 2643 : * Helper functions
2644 : */
9770 scrappy 2645 :
2646 : /*
6184 tgl 2647 : * Look up info about the database named "name". If the database exists,
2648 : * obtain the specified lock type on it, fill in any of the remaining
2649 : * parameters that aren't NULL, and return true. If no such database,
2062 peter_e 2650 : * return false.
2651 : */
8487 2652 : static bool
6184 tgl 2653 GIC 844 : get_db_info(const char *name, LOCKMODE lockmode,
2654 : Oid *dbIdP, Oid *ownerIdP,
2655 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
2656 : TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
388 peter 2657 ECB : Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbIculocale,
2658 : char **dbIcurules,
2659 : char *dbLocProvider,
419 2660 : char **dbCollversion)
2661 : {
6184 tgl 2662 GIC 844 : bool result = false;
2663 : Relation relation;
2664 :
163 peter 2665 GNC 844 : Assert(name);
9345 bruce 2666 ECB :
8181 tgl 2667 : /* Caller may wish to grab a better lock on pg_database beforehand... */
1539 andres 2668 GBC 844 : relation = table_open(DatabaseRelationId, AccessShareLock);
2669 :
2670 : /*
2671 : * Loop covers the rare case where the database is renamed before we can
6031 bruce 2672 ECB : * lock it. We try again just in case we can find a new one of the same
2673 : * name.
6184 tgl 2674 : */
2675 : for (;;)
6184 tgl 2676 UIC 0 : {
6184 tgl 2677 ECB : ScanKeyData scanKey;
2678 : SysScanDesc scan;
2679 : HeapTuple tuple;
2680 : Oid dbOid;
2681 :
6184 tgl 2682 EUB : /*
2683 : * there's no syscache for database-indexed-by-name, so must do it the
2684 : * hard way
2685 : */
6184 tgl 2686 GIC 844 : ScanKeyInit(&scanKey,
2687 : Anum_pg_database_datname,
2688 : BTEqualStrategyNumber, F_NAMEEQ,
2689 : CStringGetDatum(name));
2690 :
2691 844 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2692 : NULL, 1, &scanKey);
2693 :
2694 844 : tuple = systable_getnext(scan);
2695 :
2696 844 : if (!HeapTupleIsValid(tuple))
6184 tgl 2697 ECB : {
2698 : /* definitely no database of that name */
6184 tgl 2699 GIC 14 : systable_endscan(scan);
2700 14 : break;
2701 : }
2702 :
1601 andres 2703 830 : dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2704 :
6184 tgl 2705 830 : systable_endscan(scan);
6184 tgl 2706 ECB :
2707 : /*
2708 : * Now that we have a database OID, we can try to lock the DB.
2709 : */
6184 tgl 2710 GIC 830 : if (lockmode != NoLock)
2711 830 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
6184 tgl 2712 ECB :
2713 : /*
2714 : * And now, re-fetch the tuple by OID. If it's still there and still
2715 : * the same name, we win; else, drop the lock and loop back to try
2716 : * again.
2717 : */
4802 rhaas 2718 GIC 830 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
6184 tgl 2719 830 : if (HeapTupleIsValid(tuple))
6184 tgl 2720 EUB : {
6184 tgl 2721 GIC 830 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2722 :
2723 830 : if (strcmp(name, NameStr(dbform->datname)) == 0)
2724 : {
2725 : Datum datum;
2726 : bool isnull;
2727 :
2728 : /* oid of the database */
2729 830 : if (dbIdP)
6184 tgl 2730 CBC 830 : *dbIdP = dbOid;
2731 : /* oid of the owner */
6184 tgl 2732 GIC 830 : if (ownerIdP)
2733 804 : *ownerIdP = dbform->datdba;
2734 : /* character encoding */
6184 tgl 2735 CBC 830 : if (encodingP)
6184 tgl 2736 GIC 804 : *encodingP = dbform->encoding;
2737 : /* allowed as template? */
6184 tgl 2738 CBC 830 : if (dbIsTemplateP)
6184 tgl 2739 GIC 825 : *dbIsTemplateP = dbform->datistemplate;
6184 tgl 2740 ECB : /* allowing connections? */
6184 tgl 2741 GIC 830 : if (dbAllowConnP)
2742 804 : *dbAllowConnP = dbform->datallowconn;
5999 tgl 2743 ECB : /* limit of frozen XIDs */
5999 tgl 2744 CBC 830 : if (dbFrozenXidP)
5999 tgl 2745 GIC 804 : *dbFrozenXidP = dbform->datfrozenxid;
2746 : /* minimum MultiXactId */
3728 alvherre 2747 CBC 830 : if (dbMinMultiP)
3728 alvherre 2748 GIC 804 : *dbMinMultiP = dbform->datminmxid;
6184 tgl 2749 ECB : /* default tablespace for this database */
6184 tgl 2750 GIC 830 : if (dbTablespace)
2751 809 : *dbTablespace = dbform->dattablespace;
2752 : /* default locale settings for this database */
388 peter 2753 830 : if (dbLocProvider)
388 peter 2754 CBC 804 : *dbLocProvider = dbform->datlocprovider;
5050 bruce 2755 830 : if (dbCollate)
2756 : {
15 dgustafsson 2757 GNC 804 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
437 peter 2758 GIC 804 : *dbCollate = TextDatumGetCString(datum);
2759 : }
5050 bruce 2760 830 : if (dbCtype)
437 peter 2761 ECB : {
15 dgustafsson 2762 GNC 804 : datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
437 peter 2763 CBC 804 : *dbCtype = TextDatumGetCString(datum);
2764 : }
388 2765 830 : if (dbIculocale)
2766 : {
388 peter 2767 GIC 804 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticulocale, &isnull);
2768 804 : if (isnull)
2769 21 : *dbIculocale = NULL;
2770 : else
388 peter 2771 CBC 783 : *dbIculocale = TextDatumGetCString(datum);
388 peter 2772 ECB : }
32 peter 2773 GNC 830 : if (dbIcurules)
2774 : {
2775 804 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2776 804 : if (isnull)
2777 804 : *dbIcurules = NULL;
2778 : else
32 peter 2779 UNC 0 : *dbIcurules = TextDatumGetCString(datum);
2780 : }
419 peter 2781 GIC 830 : if (dbCollversion)
419 peter 2782 ECB : {
419 peter 2783 CBC 804 : datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
419 peter 2784 GIC 804 : if (isnull)
419 peter 2785 CBC 416 : *dbCollversion = NULL;
419 peter 2786 ECB : else
419 peter 2787 GIC 388 : *dbCollversion = TextDatumGetCString(datum);
419 peter 2788 ECB : }
6184 tgl 2789 CBC 830 : ReleaseSysCache(tuple);
6184 tgl 2790 GIC 830 : result = true;
6184 tgl 2791 CBC 830 : break;
6184 tgl 2792 ECB : }
2793 : /* can only get here if it was just renamed */
6184 tgl 2794 LBC 0 : ReleaseSysCache(tuple);
6184 tgl 2795 ECB : }
2796 :
6184 tgl 2797 LBC 0 : if (lockmode != NoLock)
2798 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2799 : }
9345 bruce 2800 ECB :
1539 andres 2801 CBC 844 : table_close(relation, AccessShareLock);
2802 :
6184 tgl 2803 844 : return result;
8487 peter_e 2804 ECB : }
9345 bruce 2805 :
2806 : /* Check if current user has createdb privileges */
2807 : bool
3029 alvherre 2808 CBC 834 : have_createdb_privilege(void)
2809 : {
2810 834 : bool result = false;
2811 : HeapTuple utup;
3029 alvherre 2812 ECB :
2813 : /* Superusers can always do everything */
3029 alvherre 2814 GIC 834 : if (superuser())
3029 alvherre 2815 CBC 816 : return true;
2816 :
2817 18 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2818 18 : if (HeapTupleIsValid(utup))
3029 alvherre 2819 ECB : {
3029 alvherre 2820 GIC 18 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
3029 alvherre 2821 CBC 18 : ReleaseSysCache(utup);
2822 : }
2823 18 : return result;
2824 : }
3029 alvherre 2825 ECB :
6869 tgl 2826 : /*
2827 : * Remove tablespace directories
2828 : *
6869 tgl 2829 EUB : * We don't know what tablespaces db_id is using, so iterate through all
2830 : * tablespaces removing <tablespace>/db_id
6869 tgl 2831 ECB : */
2832 : static void
6869 tgl 2833 CBC 20 : remove_dbtablespaces(Oid db_id)
8187 peter_e 2834 ECB : {
6797 bruce 2835 : Relation rel;
2836 : TableScanDesc scan;
2837 : HeapTuple tuple;
1060 tgl 2838 GIC 20 : List *ltblspc = NIL;
1060 tgl 2839 ECB : ListCell *cell;
2840 : int ntblspc;
2841 : int i;
2842 : Oid *tablespace_ids;
2843 :
1539 andres 2844 GBC 20 : rel = table_open(TableSpaceRelationId, AccessShareLock);
1490 andres 2845 GIC 20 : scan = table_beginscan_catalog(rel, 0, NULL);
6869 tgl 2846 81 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
8187 peter_e 2847 EUB : {
1601 andres 2848 GBC 61 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
1601 andres 2849 GIC 61 : Oid dsttablespace = spcform->oid;
2850 : char *dstpath;
6869 tgl 2851 ECB : struct stat st;
2852 :
2853 : /* Don't mess with the global tablespace */
6869 tgl 2854 GIC 61 : if (dsttablespace == GLOBALTABLESPACE_OID)
2855 41 : continue;
2856 :
2857 41 : dstpath = GetDatabasePath(db_id, dsttablespace);
8187 peter_e 2858 ECB :
6017 tgl 2859 GIC 41 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
8187 peter_e 2860 ECB : {
2861 : /* Assume we can ignore it */
6869 tgl 2862 GIC 21 : pfree(dstpath);
2863 21 : continue;
8187 peter_e 2864 ECB : }
8181 tgl 2865 :
6825 bruce 2866 GIC 20 : if (!rmtree(dstpath, true))
6869 tgl 2867 LBC 0 : ereport(WARNING,
5469 tgl 2868 ECB : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2869 : dstpath)));
6797 2870 :
1235 fujii 2871 CBC 20 : ltblspc = lappend_oid(ltblspc, dsttablespace);
1235 fujii 2872 GIC 20 : pfree(dstpath);
1235 fujii 2873 ECB : }
2874 :
1235 fujii 2875 GIC 20 : ntblspc = list_length(ltblspc);
2876 20 : if (ntblspc == 0)
2877 : {
1235 fujii 2878 UIC 0 : table_endscan(scan);
2879 0 : table_close(rel, AccessShareLock);
2880 0 : return;
2881 : }
2882 :
1235 fujii 2883 CBC 20 : tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
1235 fujii 2884 GIC 20 : i = 0;
2885 40 : foreach(cell, ltblspc)
2886 20 : tablespace_ids[i++] = lfirst_oid(cell);
2887 :
1235 fujii 2888 ECB : /* Record the filesystem change in XLOG */
2889 : {
2890 : xl_dbase_drop_rec xlrec;
2891 :
1235 fujii 2892 GIC 20 : xlrec.db_id = db_id;
2893 20 : xlrec.ntablespaces = ntblspc;
1235 fujii 2894 ECB :
1235 fujii 2895 CBC 20 : XLogBeginInsert();
2896 20 : XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
1235 fujii 2897 GIC 20 : XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
1235 fujii 2898 ECB :
1235 fujii 2899 CBC 20 : (void) XLogInsert(RM_DBASE_ID,
2900 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
2901 : }
2902 :
1235 fujii 2903 GIC 20 : list_free(ltblspc);
1235 fujii 2904 CBC 20 : pfree(tablespace_ids);
1235 fujii 2905 ECB :
1490 andres 2906 GIC 20 : table_endscan(scan);
1539 andres 2907 CBC 20 : table_close(rel, AccessShareLock);
2908 : }
7548 tgl 2909 ECB :
2910 : /*
2911 : * Check for existing files that conflict with a proposed new DB OID;
2062 peter_e 2912 : * return true if there are any
6017 tgl 2913 : *
2914 : * If there were a subdirectory in any tablespace matching the proposed new
2915 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
2916 : * try to remove that already-existing subdirectory during the cleanup in
6017 tgl 2917 EUB : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
2918 : * instead we make this extra check before settling on the OID of the new
2919 : * database. This exactly parallels what GetNewRelFileNumber() does for table
2920 : * relfilenumber values.
6017 tgl 2921 ECB : */
2922 : static bool
6017 tgl 2923 GIC 797 : check_db_file_conflict(Oid db_id)
2924 : {
6017 tgl 2925 CBC 797 : bool result = false;
6017 tgl 2926 ECB : Relation rel;
2927 : TableScanDesc scan;
6017 tgl 2928 EUB : HeapTuple tuple;
2929 :
1539 andres 2930 GBC 797 : rel = table_open(TableSpaceRelationId, AccessShareLock);
1490 andres 2931 GIC 797 : scan = table_beginscan_catalog(rel, 0, NULL);
6017 tgl 2932 2434 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
6017 tgl 2933 ECB : {
1601 andres 2934 CBC 1637 : Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2935 1637 : Oid dsttablespace = spcform->oid;
6017 tgl 2936 ECB : char *dstpath;
2937 : struct stat st;
2938 :
2939 : /* Don't mess with the global tablespace */
6017 tgl 2940 GIC 1637 : if (dsttablespace == GLOBALTABLESPACE_OID)
2941 797 : continue;
6017 tgl 2942 ECB :
6017 tgl 2943 CBC 840 : dstpath = GetDatabasePath(db_id, dsttablespace);
2944 :
2945 840 : if (lstat(dstpath, &st) == 0)
6017 tgl 2946 ECB : {
2947 : /* Found a conflicting file (or directory, whatever) */
6017 tgl 2948 UIC 0 : pfree(dstpath);
6017 tgl 2949 LBC 0 : result = true;
6017 tgl 2950 UIC 0 : break;
2951 : }
2952 :
6017 tgl 2953 CBC 840 : pfree(dstpath);
6017 tgl 2954 ECB : }
2955 :
1490 andres 2956 CBC 797 : table_endscan(scan);
1539 2957 797 : table_close(rel, AccessShareLock);
2958 :
6017 tgl 2959 GIC 797 : return result;
2960 : }
2961 :
2962 : /*
2963 : * Issue a suitable errdetail message for a busy database
2964 : */
2965 : static int
5361 tgl 2966 UIC 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
2967 : {
2968 0 : if (notherbackends > 0 && npreparedxacts > 0)
2969 :
2970 : /*
2971 : * We don't deal with singular versus plural here, since gettext
2972 : * doesn't support multiple plurals in one string.
3602 bruce 2973 ECB : */
5361 tgl 2974 UIC 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
5361 tgl 2975 ECB : notherbackends, npreparedxacts);
5361 tgl 2976 UIC 0 : else if (notherbackends > 0)
3950 peter_e 2977 0 : errdetail_plural("There is %d other session using the database.",
2978 : "There are %d other sessions using the database.",
2979 : notherbackends,
3950 peter_e 2980 ECB : notherbackends);
5361 tgl 2981 : else
3950 peter_e 2982 LBC 0 : errdetail_plural("There is %d prepared transaction using the database.",
2983 : "There are %d prepared transactions using the database.",
3950 peter_e 2984 ECB : npreparedxacts,
2985 : npreparedxacts);
5361 tgl 2986 UIC 0 : return 0; /* just to keep ereport macro happy */
2987 : }
2988 :
2989 : /*
7548 tgl 2990 ECB : * get_database_oid - given a database name, look up the OID
2991 : *
2992 : * If missing_ok is false, throw an error if database name not found. If
4630 rhaas 2993 : * true, just return InvalidOid.
2994 : */
7548 tgl 2995 : Oid
4630 rhaas 2996 GIC 3188 : get_database_oid(const char *dbname, bool missing_ok)
2997 : {
7548 tgl 2998 EUB : Relation pg_database;
2999 : ScanKeyData entry[1];
7188 bruce 3000 : SysScanDesc scan;
3001 : HeapTuple dbtuple;
3002 : Oid oid;
7548 tgl 3003 ECB :
3004 : /*
3005 : * There's no syscache for pg_database indexed by name, so we must look
6031 bruce 3006 : * the hard way.
6185 tgl 3007 : */
1539 andres 3008 GIC 3188 : pg_database = table_open(DatabaseRelationId, AccessShareLock);
7088 tgl 3009 CBC 3188 : ScanKeyInit(&entry[0],
3010 : Anum_pg_database_datname,
3011 : BTEqualStrategyNumber, F_NAMEEQ,
3012 : CStringGetDatum(dbname));
6569 tgl 3013 GIC 3188 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3014 : NULL, 1, entry);
3015 :
7226 peter_e 3016 GBC 3188 : dbtuple = systable_getnext(scan);
3017 :
7548 tgl 3018 EUB : /* We assume that there can be at most one matching tuple */
7548 tgl 3019 GIC 3188 : if (HeapTupleIsValid(dbtuple))
1418 3020 2388 : oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3021 : else
7548 3022 800 : oid = InvalidOid;
3023 :
7226 peter_e 3024 GBC 3188 : systable_endscan(scan);
1539 andres 3025 GIC 3188 : table_close(pg_database, AccessShareLock);
7548 tgl 3026 EUB :
4630 rhaas 3027 GBC 3188 : if (!OidIsValid(oid) && !missing_ok)
4382 bruce 3028 GIC 3 : ereport(ERROR,
3029 : (errcode(ERRCODE_UNDEFINED_DATABASE),
3030 : errmsg("database \"%s\" does not exist",
3031 : dbname)));
4630 rhaas 3032 EUB :
7548 tgl 3033 GIC 3185 : return oid;
3034 : }
3035 :
7226 peter_e 3036 EUB :
3037 : /*
3038 : * get_database_name - given a database OID, look up the name
3039 : *
3040 : * Returns a palloc'd string, or NULL if no such database.
3041 : */
3042 : char *
7226 peter_e 3043 GIC 41068 : get_database_name(Oid dbid)
3044 : {
3045 : HeapTuple dbtuple;
7226 peter_e 3046 ECB : char *result;
3047 :
4802 rhaas 3048 GIC 41068 : dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
7226 peter_e 3049 41068 : if (HeapTupleIsValid(dbtuple))
3050 : {
3051 40444 : result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
6185 tgl 3052 40444 : ReleaseSysCache(dbtuple);
3053 : }
3054 : else
7226 peter_e 3055 624 : result = NULL;
3056 :
3057 41068 : return result;
7548 tgl 3058 ECB : }
6797 3059 :
3060 : /*
3061 : * recovery_create_dbdir()
3062 : *
255 alvherre 3063 : * During recovery, there's a case where we validly need to recover a missing
3064 : * tablespace directory so that recovery can continue. This happens when
3065 : * recovery wants to create a database but the holding tablespace has been
3066 : * removed before the server stopped. Since we expect that the directory will
3067 : * be gone before reaching recovery consistency, and we have no knowledge about
3068 : * the tablespace other than its OID here, we create a real directory under
3069 : * pg_tblspc here instead of restoring the symlink.
3070 : *
3071 : * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3072 : */
3073 : static void
255 alvherre 3074 CBC 18 : recovery_create_dbdir(char *path, bool only_tblspc)
255 alvherre 3075 ECB : {
3076 : struct stat st;
3077 :
255 alvherre 3078 CBC 18 : Assert(RecoveryInProgress());
3079 :
255 alvherre 3080 GIC 18 : if (stat(path, &st) == 0)
3081 18 : return;
3082 :
255 alvherre 3083 LBC 0 : if (only_tblspc && strstr(path, "pg_tblspc/") == NULL)
255 alvherre 3084 UIC 0 : elog(PANIC, "requested to created invalid directory: %s", path);
3085 :
3086 0 : if (reachedConsistency && !allow_in_place_tablespaces)
3087 0 : ereport(PANIC,
3088 : errmsg("missing directory \"%s\"", path));
3089 :
3090 0 : elog(reachedConsistency ? WARNING : DEBUG1,
3091 : "creating missing directory: %s", path);
3092 :
255 alvherre 3093 LBC 0 : if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
255 alvherre 3094 UIC 0 : ereport(PANIC,
3095 : errmsg("could not create missing directory \"%s\": %m", path));
3096 : }
3097 :
255 alvherre 3098 ECB :
6797 tgl 3099 : /*
3100 : * DATABASE resource manager's routines
3101 : */
3102 : void
3062 heikki.linnakangas 3103 GIC 30 : dbase_redo(XLogReaderState *record)
3104 : {
3062 heikki.linnakangas 3105 CBC 30 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3106 :
5192 heikki.linnakangas 3107 ECB : /* Backup blocks are not used in dbase records */
3062 heikki.linnakangas 3108 GIC 30 : Assert(!XLogRecHasAnyBlockRefs(record));
3109 :
376 rhaas 3110 30 : if (info == XLOG_DBASE_CREATE_FILE_COPY)
3111 : {
3112 3 : xl_dbase_create_file_copy_rec *xlrec =
3113 3 : (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
3114 : char *src_path;
3115 : char *dst_path;
3116 : char *parent_path;
3117 : struct stat st;
3118 :
6591 tgl 3119 3 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3120 3 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3121 :
3122 : /*
3123 : * Our theory for replaying a CREATE is to forcibly drop the target
6385 bruce 3124 ECB : * subdirectory if present, then re-copy the source data. This may be
3125 : * more work than needed, but it is simple to implement.
3126 : */
6591 tgl 3127 GIC 3 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
6591 tgl 3128 ECB : {
6591 tgl 3129 UIC 0 : if (!rmtree(dst_path, true))
4646 bruce 3130 ECB : /* If this failed, copydir() below is going to error. */
6591 tgl 3131 LBC 0 : ereport(WARNING,
3132 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
6459 tgl 3133 EUB : dst_path)));
6591 3134 : }
3135 :
255 alvherre 3136 : /*
3137 : * If the parent of the target path doesn't exist, create it now. This
3138 : * enables us to create the target underneath later.
3139 : */
255 alvherre 3140 GIC 3 : parent_path = pstrdup(dst_path);
255 alvherre 3141 GBC 3 : get_parent_directory(parent_path);
3142 3 : if (stat(parent_path, &st) < 0)
3143 : {
255 alvherre 3144 UIC 0 : if (errno != ENOENT)
3145 0 : ereport(FATAL,
3146 : errmsg("could not stat directory \"%s\": %m",
3147 : dst_path));
3148 :
3149 : /* create the parent directory if needed and valid */
3150 0 : recovery_create_dbdir(parent_path, true);
3151 : }
255 alvherre 3152 CBC 3 : pfree(parent_path);
3153 :
255 alvherre 3154 ECB : /*
3155 : * There's a case where the copy source directory is missing for the
3156 : * same reason above. Create the empty source directory so that
3157 : * copydir below doesn't fail. The directory will be dropped soon by
3158 : * recovery.
3159 : */
255 alvherre 3160 GIC 3 : if (stat(src_path, &st) < 0 && errno == ENOENT)
255 alvherre 3161 LBC 0 : recovery_create_dbdir(src_path, false);
255 alvherre 3162 ECB :
3163 : /*
3164 : * Force dirty buffers out to disk, to ensure source database is
3165 : * up-to-date for the copy.
3166 : */
5764 tgl 3167 GIC 3 : FlushDatabaseBuffers(xlrec->src_db_id);
6591 tgl 3168 ECB :
337 tmunro 3169 : /* Close all sgmr fds in all backends. */
337 tmunro 3170 GIC 3 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3171 :
3172 : /*
3173 : * Copy this subdirectory to the new location
3174 : *
3175 : * We don't need to copy subdirectories
6591 tgl 3176 ECB : */
6459 tgl 3177 GIC 3 : copydir(src_path, dst_path, false);
349 alvherre 3178 EUB :
349 alvherre 3179 GIC 3 : pfree(src_path);
349 alvherre 3180 GBC 3 : pfree(dst_path);
3181 : }
376 rhaas 3182 GIC 27 : else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3183 : {
3184 18 : xl_dbase_create_wal_log_rec *xlrec =
3185 18 : (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
3186 : char *dbpath;
3187 : char *parent_path;
3188 :
376 rhaas 3189 CBC 18 : dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
376 rhaas 3190 ECB :
255 alvherre 3191 : /* create the parent directory if needed and valid */
255 alvherre 3192 GIC 18 : parent_path = pstrdup(dbpath);
255 alvherre 3193 GBC 18 : get_parent_directory(parent_path);
3194 18 : recovery_create_dbdir(parent_path, true);
3195 :
3196 : /* Create the database directory with the version file. */
376 rhaas 3197 GIC 18 : CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3198 : true);
349 alvherre 3199 GBC 18 : pfree(dbpath);
3200 : }
6591 tgl 3201 CBC 9 : else if (info == XLOG_DBASE_DROP)
3202 : {
6591 tgl 3203 GIC 9 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3204 : char *dst_path;
3205 : int i;
3206 :
4859 simon 3207 9 : if (InHotStandby)
3208 : {
4831 simon 3209 ECB : /*
4790 bruce 3210 EUB : * Lock database while we resolve conflicts to ensure that
3211 : * InitPostgres() cannot fully re-execute concurrently. This
3212 : * avoids backends re-connecting automatically to same database,
3213 : * which can happen in some cases.
3214 : *
3215 : * This will lock out walsenders trying to connect to db-specific
2153 bruce 3216 ECB : * slots for logical decoding too, so it's safe for us to drop
3217 : * slots.
3218 : */
4831 simon 3219 CBC 9 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
4833 simon 3220 GIC 9 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
3221 : }
3222 :
3223 : /* Drop any database-specific replication slots */
2203 3224 9 : ReplicationSlotsDropDBSlots(xlrec->db_id);
3225 :
6220 tgl 3226 ECB : /* Drop pages for this database that are in the shared buffer cache */
6220 tgl 3227 GIC 9 : DropDatabaseBuffers(xlrec->db_id);
6220 tgl 3228 ECB :
5841 3229 : /* Also, clean out any fsync requests that might be pending in md.c */
1466 tmunro 3230 GIC 9 : ForgetDatabaseSyncRequests(xlrec->db_id);
5841 tgl 3231 ECB :
3232 : /* Clean out the xlog relcache too */
6220 tgl 3233 CBC 9 : XLogDropDatabase(xlrec->db_id);
6591 tgl 3234 ECB :
3235 : /* Close all sgmr fds in all backends. */
421 tmunro 3236 GIC 9 : WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_SMGRRELEASE));
3237 :
1235 fujii 3238 CBC 18 : for (i = 0; i < xlrec->ntablespaces; i++)
3239 : {
1235 fujii 3240 GIC 9 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
1235 fujii 3241 ECB :
3242 : /* And remove the physical files */
1235 fujii 3243 CBC 9 : if (!rmtree(dst_path, true))
1235 fujii 3244 UIC 0 : ereport(WARNING,
3245 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
1235 fujii 3246 ECB : dst_path)));
1235 fujii 3247 GIC 9 : pfree(dst_path);
1235 fujii 3248 ECB : }
3249 :
4831 simon 3250 CBC 9 : if (InHotStandby)
3251 : {
4831 simon 3252 ECB : /*
3253 : * Release locks prior to commit. XXX There is a race condition
3254 : * here that may allow backends to reconnect, but the window for
3255 : * this is small because the gap between here and commit is mostly
4790 bruce 3256 : * fairly small and it is unlikely that people will be dropping
3257 : * databases that we are trying to connect to anyway.
3258 : */
4831 simon 3259 GIC 9 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3260 : }
3261 : }
3262 : else
6797 tgl 3263 UIC 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
6797 tgl 3264 GIC 30 : }
|