Skip to content

Commit 034a9fc

Browse files
committed
Handle DROP DATABASE getting interrupted
Until now, when DROP DATABASE got interrupted in the wrong moment, the removal of the pg_database row would also roll back, even though some irreversible steps have already been taken. E.g. DropDatabaseBuffers() might have thrown out dirty buffers, or files could have been unlinked. But we continued to allow connections to such a corrupted database. To fix this, mark databases invalid with an in-place update, just before starting to perform irreversible steps. As we can't add a new column in the back branches, we use pg_database.datconnlimit = -2 for this purpose. An invalid database cannot be connected to anymore, but can still be dropped. Unfortunately we can't easily add output to psql's \l to indicate that some database is invalid, it doesn't fit in any of the existing columns. Add tests verifying that a interrupted DROP DATABASE is handled correctly in the backend and in various tools. Reported-by: Evgeny Morozov <postgresql3@realityexists.net> Author: Andres Freund <andres@anarazel.de> Reviewed-by: Daniel Gustafsson <daniel@yesql.se> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/20230509004637.cgvmfwrbht7xm7p6@awork3.anarazel.de Discussion: https://postgr.es/m/20230314174521.74jl6ffqsee5mtug@awork3.anarazel.de Backpatch: 11-, bug present in all supported versions
1 parent 7aec84e commit 034a9fc

File tree

16 files changed

+380
-27
lines changed

16 files changed

+380
-27
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2647,7 +2647,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
26472647
<entry></entry>
26482648
<entry>
26492649
Sets maximum number of concurrent connections that can be made
2650-
to this database. -1 means no limit.
2650+
to this database. -1 means no limit, -2 indicates the database is
2651+
invalid.
26512652
</entry>
26522653
</row>
26532654

src/backend/commands/dbcommands.c

Lines changed: 85 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
139139
int encoding = -1;
140140
bool dbistemplate = false;
141141
bool dballowconnections = true;
142-
int dbconnlimit = -1;
142+
int dbconnlimit = DATCONNLIMIT_UNLIMITED;
143143
int notherbackends;
144144
int npreparedxacts;
145145
createdb_failure_params fparms;
@@ -288,7 +288,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
288288
if (dconnlimit && dconnlimit->arg)
289289
{
290290
dbconnlimit = defGetInt32(dconnlimit);
291-
if (dbconnlimit < -1)
291+
if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
292292
ereport(ERROR,
293293
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
294294
errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -336,6 +336,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
336336
errmsg("template database \"%s\" does not exist",
337337
dbtemplate)));
338338

339+
/*
340+
* If the source database was in the process of being dropped, we can't
341+
* use it as a template.
342+
*/
343+
if (database_is_invalid_oid(src_dboid))
344+
ereport(ERROR,
345+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
346+
errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
347+
errhint("Use DROP DATABASE to drop invalid databases."));
348+
339349
/*
340350
* Permission check: to copy a DB that's not marked datistemplate, you
341351
* must be superuser or the owner thereof.
@@ -796,6 +806,7 @@ dropdb(const char *dbname, bool missing_ok)
796806
bool db_istemplate;
797807
Relation pgdbrel;
798808
HeapTuple tup;
809+
Form_pg_database datform;
799810
int notherbackends;
800811
int npreparedxacts;
801812
int nslots,
@@ -903,17 +914,6 @@ dropdb(const char *dbname, bool missing_ok)
903914
"There are %d subscriptions.",
904915
nsubscriptions, nsubscriptions)));
905916

906-
/*
907-
* Remove the database's tuple from pg_database.
908-
*/
909-
tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
910-
if (!HeapTupleIsValid(tup))
911-
elog(ERROR, "cache lookup failed for database %u", db_id);
912-
913-
CatalogTupleDelete(pgdbrel, &tup->t_self);
914-
915-
ReleaseSysCache(tup);
916-
917917
/*
918918
* Delete any comments or security labels associated with the database.
919919
*/
@@ -930,6 +930,32 @@ dropdb(const char *dbname, bool missing_ok)
930930
*/
931931
dropDatabaseDependencies(db_id);
932932

933+
tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
934+
if (!HeapTupleIsValid(tup))
935+
elog(ERROR, "cache lookup failed for database %u", db_id);
936+
datform = (Form_pg_database) GETSTRUCT(tup);
937+
938+
/*
939+
* Except for the deletion of the catalog row, subsequent actions are not
940+
* transactional (consider DropDatabaseBuffers() discarding modified
941+
* buffers). But we might crash or get interrupted below. To prevent
942+
* accesses to a database with invalid contents, mark the database as
943+
* invalid using an in-place update.
944+
*
945+
* We need to flush the WAL before continuing, to guarantee the
946+
* modification is durable before performing irreversible filesystem
947+
* operations.
948+
*/
949+
datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
950+
heap_inplace_update(pgdbrel, tup);
951+
XLogFlush(XactLastRecEnd);
952+
953+
/*
954+
* Also delete the tuple - transactionally. If this transaction commits,
955+
* the row will be gone, but if we fail, dropdb() can be invoked again.
956+
*/
957+
CatalogTupleDelete(pgdbrel, &tup->t_self);
958+
933959
/*
934960
* Drop db-specific replication slots.
935961
*/
@@ -1427,7 +1453,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
14271453
ListCell *option;
14281454
bool dbistemplate = false;
14291455
bool dballowconnections = true;
1430-
int dbconnlimit = -1;
1456+
int dbconnlimit = DATCONNLIMIT_UNLIMITED;
14311457
DefElem *distemplate = NULL;
14321458
DefElem *dallowconnections = NULL;
14331459
DefElem *dconnlimit = NULL;
@@ -1510,7 +1536,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
15101536
if (dconnlimit && dconnlimit->arg)
15111537
{
15121538
dbconnlimit = defGetInt32(dconnlimit);
1513-
if (dbconnlimit < -1)
1539+
if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
15141540
ereport(ERROR,
15151541
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
15161542
errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -1537,6 +1563,14 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
15371563
datform = (Form_pg_database) GETSTRUCT(tuple);
15381564
dboid = datform->oid;
15391565

1566+
if (database_is_invalid_form(datform))
1567+
{
1568+
ereport(FATAL,
1569+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1570+
errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
1571+
errhint("Use DROP DATABASE to drop invalid databases."));
1572+
}
1573+
15401574
if (!pg_database_ownercheck(dboid, GetUserId()))
15411575
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
15421576
stmt->dbname);
@@ -2093,6 +2127,42 @@ get_database_name(Oid dbid)
20932127
return result;
20942128
}
20952129

2130+
2131+
/*
2132+
* While dropping a database the pg_database row is marked invalid, but the
2133+
* catalog contents still exist. Connections to such a database are not
2134+
* allowed.
2135+
*/
2136+
bool
2137+
database_is_invalid_form(Form_pg_database datform)
2138+
{
2139+
return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
2140+
}
2141+
2142+
2143+
/*
2144+
* Convenience wrapper around database_is_invalid_form()
2145+
*/
2146+
bool
2147+
database_is_invalid_oid(Oid dboid)
2148+
{
2149+
HeapTuple dbtup;
2150+
Form_pg_database dbform;
2151+
bool invalid;
2152+
2153+
dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
2154+
if (!HeapTupleIsValid(dbtup))
2155+
elog(ERROR, "cache lookup failed for database %u", dboid);
2156+
dbform = (Form_pg_database) GETSTRUCT(dbtup);
2157+
2158+
invalid = database_is_invalid_form(dbform);
2159+
2160+
ReleaseSysCache(dbtup);
2161+
2162+
return invalid;
2163+
}
2164+
2165+
20962166
/*
20972167
* recovery_create_dbdir()
20982168
*

src/backend/commands/vacuum.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,6 +1547,20 @@ vac_truncate_clog(TransactionId frozenXID,
15471547
Assert(TransactionIdIsNormal(datfrozenxid));
15481548
Assert(MultiXactIdIsValid(datminmxid));
15491549

1550+
/*
1551+
* If database is in the process of getting dropped, or has been
1552+
* interrupted while doing so, no connections to it are possible
1553+
* anymore. Therefore we don't need to take it into account here.
1554+
* Which is good, because it can't be processed by autovacuum either.
1555+
*/
1556+
if (database_is_invalid_form((Form_pg_database) dbform))
1557+
{
1558+
elog(DEBUG2,
1559+
"skipping invalid database \"%s\" while computing relfrozenxid",
1560+
NameStr(dbform->datname));
1561+
continue;
1562+
}
1563+
15501564
/*
15511565
* If things are working properly, no database should have a
15521566
* datfrozenxid or datminmxid that is "in the future". However, such

src/backend/postmaster/autovacuum.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1894,6 +1894,18 @@ get_database_list(void)
18941894
avw_dbase *avdb;
18951895
MemoryContext oldcxt;
18961896

1897+
/*
1898+
* If database has partially been dropped, we can't, nor need to,
1899+
* vacuum it.
1900+
*/
1901+
if (database_is_invalid_form(pgdatabase))
1902+
{
1903+
elog(DEBUG2,
1904+
"autovacuum: skipping invalid database \"%s\"",
1905+
NameStr(pgdatabase->datname));
1906+
continue;
1907+
}
1908+
18971909
/*
18981910
* Allocate our results in the caller's context, not the
18991911
* transaction's. We do this inside the loop, and restore the original

src/backend/utils/init/postinit.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
974974
if (!bootstrap)
975975
{
976976
HeapTuple tuple;
977+
Form_pg_database datform;
977978

978979
tuple = GetDatabaseTuple(dbname);
979980
if (!HeapTupleIsValid(tuple) ||
@@ -983,6 +984,15 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
983984
(errcode(ERRCODE_UNDEFINED_DATABASE),
984985
errmsg("database \"%s\" does not exist", dbname),
985986
errdetail("It seems to have just been dropped or renamed.")));
987+
988+
datform = (Form_pg_database) GETSTRUCT(tuple);
989+
if (database_is_invalid_form(datform))
990+
{
991+
ereport(FATAL,
992+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
993+
errmsg("cannot connect to invalid database \"%s\"", dbname),
994+
errhint("Use DROP DATABASE to drop invalid databases."));
995+
}
986996
}
987997

988998
/*

src/bin/pg_dump/pg_dumpall.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,7 +1320,7 @@ dropDBs(PGconn *conn)
13201320
res = executeQuery(conn,
13211321
"SELECT datname "
13221322
"FROM pg_database d "
1323-
"WHERE datallowconn "
1323+
"WHERE datallowconn AND datconnlimit != -2 "
13241324
"ORDER BY datname");
13251325

13261326
if (PQntuples(res) > 0)
@@ -1473,7 +1473,7 @@ dumpDatabases(PGconn *conn)
14731473
res = executeQuery(conn,
14741474
"SELECT datname "
14751475
"FROM pg_database d "
1476-
"WHERE datallowconn "
1476+
"WHERE datallowconn AND datconnlimit != -2 "
14771477
"ORDER BY (datname <> 'template1'), datname");
14781478

14791479
if (PQntuples(res) > 0)

src/bin/pg_dump/t/002_pg_dump.pl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1428,6 +1428,17 @@
14281428
},
14291429
},
14301430

1431+
'CREATE DATABASE regression_invalid...' => {
1432+
create_order => 1,
1433+
create_sql => q(
1434+
CREATE DATABASE regression_invalid;
1435+
UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid'),
1436+
regexp => qr/^CREATE DATABASE regression_invalid/m,
1437+
not_like => {
1438+
pg_dumpall_dbprivs => 1,
1439+
},
1440+
},
1441+
14311442
'CREATE ACCESS METHOD gist2' => {
14321443
create_order => 52,
14331444
create_sql =>
@@ -3495,7 +3506,7 @@
34953506
34963507
# Start with number of command_fails_like()*2 tests below (each
34973508
# command_fails_like is actually 2 tests)
3498-
my $num_tests = 12;
3509+
my $num_tests = 14;
34993510
35003511
foreach my $run (sort keys %pgdump_runs)
35013512
{
@@ -3623,6 +3634,14 @@
36233634
qr/\Qpg_dump: error: connection to database "qqq" failed: FATAL: database "qqq" does not exist\E/,
36243635
'connecting to a non-existent database');
36253636
3637+
#########################################
3638+
# Test connecting to an invalid database
3639+
3640+
command_fails_like(
3641+
[ 'pg_dump', '-p', "$port", '-d', 'regression_invalid' ],
3642+
qr/pg_dump: error: connection to database .* failed: FATAL: cannot connect to invalid database "regression_invalid"/,
3643+
'connecting to an invalid database');
3644+
36263645
#########################################
36273646
# Test connecting with an unprivileged user
36283647

src/bin/scripts/clusterdb.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ cluster_all_databases(ConnParams *cparams, const char *progname,
236236
int i;
237237

238238
conn = connectMaintenanceDatabase(cparams, progname, echo);
239-
result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
239+
result = executeQuery(conn,
240+
"SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
241+
progname, echo);
240242
PQfinish(conn);
241243

242244
for (i = 0; i < PQntuples(result); i++)

src/bin/scripts/reindexdb.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,9 @@ reindex_all_databases(ConnParams *cparams,
360360
int i;
361361

362362
conn = connectMaintenanceDatabase(cparams, progname, echo);
363-
result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
363+
result = executeQuery(conn,
364+
"SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
365+
progname, echo);
364366
PQfinish(conn);
365367

366368
for (i = 0; i < PQntuples(result); i++)

src/bin/scripts/t/011_clusterdb_all.pl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 2;
6+
use Test::More tests => 4;
77

88
my $node = get_new_node('main');
99
$node->init;
@@ -17,3 +17,16 @@
1717
[ 'clusterdb', '-a' ],
1818
qr/statement: CLUSTER.*statement: CLUSTER/s,
1919
'cluster all databases');
20+
21+
$node->safe_psql(
22+
'postgres', q(
23+
CREATE DATABASE regression_invalid;
24+
UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
25+
));
26+
$node->command_ok([ 'clusterdb', '-a' ],
27+
'invalid database not targeted by clusterdb -a');
28+
29+
# Doesn't quite belong here, but don't want to waste time by creating an
30+
# invalid database in 010_clusterdb.pl as well.
31+
$node->command_fails([ 'clusterdb', '-d', 'regression_invalid'],
32+
'clusterdb cannot target invalid database');

src/bin/scripts/t/050_dropdb.pl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 11;
6+
use Test::More tests => 12;
77

88
program_help_ok('dropdb');
99
program_version_ok('dropdb');
@@ -21,3 +21,12 @@
2121

2222
$node->command_fails([ 'dropdb', 'nonexistent' ],
2323
'fails with nonexistent database');
24+
25+
# check that invalid database can be dropped with dropdb
26+
$node->safe_psql(
27+
'postgres', q(
28+
CREATE DATABASE regression_invalid;
29+
UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
30+
));
31+
$node->command_ok([ 'dropdb', 'regression_invalid' ],
32+
'invalid database can be dropped');

src/bin/scripts/t/091_reindexdb_all.pl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use warnings;
33

44
use PostgresNode;
5-
use Test::More tests => 2;
5+
use Test::More tests => 4;
66

77
my $node = get_new_node('main');
88
$node->init;
@@ -14,3 +14,16 @@
1414
[ 'reindexdb', '-a' ],
1515
qr/statement: REINDEX.*statement: REINDEX/s,
1616
'reindex all databases');
17+
18+
$node->safe_psql(
19+
'postgres', q(
20+
CREATE DATABASE regression_invalid;
21+
UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
22+
));
23+
$node->command_ok([ 'reindexdb', '-a' ],
24+
'invalid database not targeted by reindexdb -a');
25+
26+
# Doesn't quite belong here, but don't want to waste time by creating an
27+
# invalid database in 090_reindexdb.pl as well.
28+
$node->command_fails([ 'reindexdb', '-d', 'regression_invalid'],
29+
'reindexdb cannot target invalid database');

0 commit comments

Comments
 (0)