Skip to content

Commit ff539da

Browse files
Cleanup slots during drop database
Automatically drop all logical replication slots associated with a database when the database is dropped. Previously we threw an ERROR if a slot existed. Now we throw ERROR only if a slot is active in the database being dropped. Craig Ringer
1 parent 4d33a7f commit ff539da

File tree

7 files changed

+182
-14
lines changed

7 files changed

+182
-14
lines changed

doc/src/sgml/func.sgml

+2-1
Original file line numberDiff line numberDiff line change
@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
1887618876
<entry>
1887718877
Drops the physical or logical replication slot
1887818878
named <parameter>slot_name</parameter>. Same as replication protocol
18879-
command <literal>DROP_REPLICATION_SLOT</>.
18879+
command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
18880+
be called when connected to the same database the slot was created on.
1888018881
</entry>
1888118882
</row>
1888218883

doc/src/sgml/protocol.sgml

+2
Original file line numberDiff line numberDiff line change
@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
20342034
<para>
20352035
Drops a replication slot, freeing any reserved server-side resources. If
20362036
the slot is currently in use by an active connection, this command fails.
2037+
If the slot is a logical slot that was created in a database other than
2038+
the database the walsender is connected to, this command fails.
20372039
</para>
20382040
<variablelist>
20392041
<varlistentry>

src/backend/commands/dbcommands.c

+23-9
Original file line numberDiff line numberDiff line change
@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
845845
errmsg("cannot drop the currently open database")));
846846

847847
/*
848-
* Check whether there are, possibly unconnected, logical slots that refer
849-
* to the to-be-dropped database. The database lock we are holding
850-
* prevents the creation of new slots using the database.
848+
* Check whether there are active logical slots that refer to the
849+
* to-be-dropped database. The database lock we are holding prevents the
850+
* creation of new slots using the database or existing slots becoming
851+
* active.
851852
*/
852-
if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
853+
(void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
854+
if (nslots_active)
855+
{
853856
ereport(ERROR,
854857
(errcode(ERRCODE_OBJECT_IN_USE),
855-
errmsg("database \"%s\" is used by a logical replication slot",
858+
errmsg("database \"%s\" is used by an active logical replication slot",
856859
dbname),
857-
errdetail_plural("There is %d slot, %d of them active.",
858-
"There are %d slots, %d of them active.",
859-
nslots,
860-
nslots, nslots_active)));
860+
errdetail_plural("There is %d active slot",
861+
"There are %d active slots",
862+
nslots_active, nslots_active)));
863+
}
861864

862865
/*
863866
* Check for other backends in the target database. (Because we hold the
@@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok)
914917
*/
915918
dropDatabaseDependencies(db_id);
916919

920+
/*
921+
* Drop db-specific replication slots.
922+
*/
923+
ReplicationSlotsDropDBSlots(db_id);
924+
917925
/*
918926
* Drop pages for this database that are in the shared buffer cache. This
919927
* is important to ensure that no remaining backend tries to write out a
@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
21242132
* InitPostgres() cannot fully re-execute concurrently. This
21252133
* avoids backends re-connecting automatically to same database,
21262134
* which can happen in some cases.
2135+
*
2136+
* This will lock out walsenders trying to connect to db-specific
2137+
* slots for logical decoding too, so it's safe for us to drop slots.
21272138
*/
21282139
LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
21292140
ResolveRecoveryConflictWithDatabase(xlrec->db_id);
21302141
}
21312142

2143+
/* Drop any database-specific replication slots */
2144+
ReplicationSlotsDropDBSlots(xlrec->db_id);
2145+
21322146
/* Drop pages for this database that are in the shared buffer cache */
21332147
DropDatabaseBuffers(xlrec->db_id);
21342148

src/backend/replication/slot.c

+88
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
796796
return false;
797797
}
798798

799+
/*
800+
* ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
801+
* passed database oid. The caller should hold an exclusive lock on the
802+
* pg_database oid for the database to prevent creation of new slots on the db
803+
* or replay from existing slots.
804+
*
805+
* This routine isn't as efficient as it could be - but we don't drop databases
806+
* often, especially databases with lots of slots.
807+
*
808+
* Another session that concurrently acquires an existing slot on the target DB
809+
* (most likely to drop it) may cause this function to ERROR. If that happens
810+
* it may have dropped some but not all slots.
811+
*/
812+
void
813+
ReplicationSlotsDropDBSlots(Oid dboid)
814+
{
815+
int i;
816+
817+
if (max_replication_slots <= 0)
818+
return;
819+
820+
restart:
821+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
822+
for (i = 0; i < max_replication_slots; i++)
823+
{
824+
ReplicationSlot *s;
825+
NameData slotname;
826+
int active_pid;
827+
828+
s = &ReplicationSlotCtl->replication_slots[i];
829+
830+
/* cannot change while ReplicationSlotCtlLock is held */
831+
if (!s->in_use)
832+
continue;
833+
834+
/* only logical slots are database specific, skip */
835+
if (!SlotIsLogical(s))
836+
continue;
837+
838+
/* not our database, skip */
839+
if (s->data.database != dboid)
840+
continue;
841+
842+
/* Claim the slot, as if ReplicationSlotAcquire()ing. */
843+
SpinLockAcquire(&s->mutex);
844+
strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
845+
NameStr(slotname)[NAMEDATALEN-1] = '\0';
846+
active_pid = s->active_pid;
847+
if (active_pid == 0)
848+
{
849+
MyReplicationSlot = s;
850+
s->active_pid = MyProcPid;
851+
}
852+
SpinLockRelease(&s->mutex);
853+
854+
/*
855+
* We might fail here if the slot was active. Even though we hold an
856+
* exclusive lock on the database object a logical slot for that DB can
857+
* still be active if it's being dropped by a backend connected to
858+
* another DB or is otherwise acquired.
859+
*
860+
* It's an unlikely race that'll only arise from concurrent user action,
861+
* so we'll just bail out.
862+
*/
863+
if (active_pid)
864+
elog(ERROR, "replication slot %s is in use by pid %d",
865+
NameStr(slotname), active_pid);
866+
867+
/*
868+
* To avoid largely duplicating ReplicationSlotDropAcquired() or
869+
* complicating it with already_locked flags for ProcArrayLock,
870+
* ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
871+
* just release our ReplicationSlotControlLock to drop the slot.
872+
*
873+
* For safety we'll restart our scan from the beginning each
874+
* time we release the lock.
875+
*/
876+
LWLockRelease(ReplicationSlotControlLock);
877+
ReplicationSlotDropAcquired();
878+
goto restart;
879+
}
880+
LWLockRelease(ReplicationSlotControlLock);
881+
882+
/* recompute limits once after all slots are dropped */
883+
ReplicationSlotsComputeRequiredXmin(false);
884+
ReplicationSlotsComputeRequiredLSN();
885+
}
886+
799887

800888
/*
801889
* Check whether the server's configuration supports using replication

src/include/replication/slot.h

+1
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
177177
extern void ReplicationSlotsComputeRequiredLSN(void);
178178
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
179179
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
180+
extern void ReplicationSlotsDropDBSlots(Oid dboid);
180181

181182
extern void StartupReplicationSlots(void);
182183
extern void CheckPointReplicationSlots(void);

src/test/recovery/t/006_logical_decoding.pl

+38-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use warnings;
88
use PostgresNode;
99
use TestLib;
10-
use Test::More tests => 5;
10+
use Test::More tests => 16;
1111

1212
# Initialize master node
1313
my $node_master = get_new_node('master');
@@ -54,7 +54,7 @@
5454
is($stdout_sql, $expected, 'got expected output from SQL decoding session');
5555

5656
my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
57-
diag "waiting to replay $endpos";
57+
print "waiting to replay $endpos\n";
5858

5959
my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
6060
chomp($stdout_recv);
@@ -64,5 +64,41 @@
6464
chomp($stdout_recv);
6565
is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
6666

67+
$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
68+
69+
is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
70+
'replaying logical slot from another database fails');
71+
72+
$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
73+
74+
# make sure you can't drop a slot while active
75+
my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']);
76+
$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
77+
is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3,
78+
'dropping a DB with inactive logical slots fails');
79+
$pg_recvlogical->kill_kill;
80+
is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
81+
'logical slot still exists');
82+
83+
$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
84+
is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
85+
'dropping a DB with inactive logical slots succeeds');
86+
is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
87+
'logical slot was actually dropped with DB');
88+
89+
# Restarting a node with wal_level = logical that has existing
90+
# slots must succeed, but decoding from those slots must fail.
91+
$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
92+
is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
93+
$node_master->restart;
94+
is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
95+
isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
96+
'restored slot catalog_xmin is nonzero');
97+
is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
98+
'reading from slot with wal_level < logical fails');
99+
is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
100+
'can drop logical slot while wal_level = replica');
101+
is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
102+
67103
# done with the node
68104
$node_master->stop;

src/test/recovery/t/010_logical_decoding_timelines.pl

+28-2
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
# This module uses the first approach to show that timeline following
1616
# on a logical slot works.
1717
#
18+
# (For convenience, it also tests some recovery-related operations
19+
# on logical slots).
20+
#
1821
use strict;
1922
use warnings;
2023

2124
use PostgresNode;
2225
use TestLib;
23-
use Test::More tests => 10;
26+
use Test::More tests => 13;
2427
use RecursiveCopy;
2528
use File::Copy;
2629
use IPC::Run ();
@@ -50,6 +53,16 @@
5053
$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
5154
$node_master->safe_psql('postgres',
5255
"INSERT INTO decoding(blah) VALUES ('beforebb');");
56+
57+
# We also want to verify that DROP DATABASE on a standby with a logical
58+
# slot works. This isn't strictly related to timeline following, but
59+
# the only way to get a logical slot on a standby right now is to use
60+
# the same physical copy trick, so:
61+
$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
62+
$node_master->safe_psql('dropme',
63+
"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
64+
);
65+
5366
$node_master->safe_psql('postgres', 'CHECKPOINT;');
5467

5568
my $backup_name = 'b1';
@@ -68,6 +81,17 @@
6881

6982
$node_replica->start;
7083

84+
# If we drop 'dropme' on the master, the standby should drop the
85+
# db and associated slot.
86+
is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
87+
'dropped DB with logical slot OK on master');
88+
$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
89+
is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
90+
'dropped DB dropme on standby');
91+
is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
92+
'logical slot was actually dropped on standby');
93+
94+
# Back to testing failover...
7195
$node_master->safe_psql('postgres',
7296
"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
7397
);
@@ -99,10 +123,13 @@
99123
cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
100124
'xmin on physical slot must not be lower than catalog_xmin');
101125

126+
$node_master->safe_psql('postgres', 'CHECKPOINT');
127+
102128
# Boom, crash
103129
$node_master->stop('immediate');
104130

105131
$node_replica->promote;
132+
print "waiting for replica to come up\n";
106133
$node_replica->poll_query_until('postgres',
107134
"SELECT NOT pg_is_in_recovery();");
108135

@@ -154,5 +181,4 @@ BEGIN
154181
chomp($stdout);
155182
is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
156183

157-
# We don't need the standby anymore
158184
$node_replica->teardown_node();

0 commit comments

Comments
 (0)