Skip to content

Commit 218b101

Browse files
committed
Fix race condition in invalidating obsolete replication slots
The code added to mark replication slots invalid in commit c655077 had the race condition that a slot can be dropped or advanced concurrently with checkpointer trying to invalidate it. Rewrite the code to close those races. The changes to ReplicationSlotAcquire's API added with c655077 are not necessary anymore. To avoid an ABI break in released branches, this commit leaves that unchanged; it'll be changed in a master-only commit separately. Backpatch to 13, where this code first appeared. Reported-by: Andres Freund <andres@anarazel.de> Author: Andres Freund <andres@anarazel.de> Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/20210408001037.wfmk6jud36auhfqm@alap3.anarazel.de
1 parent 6e43f1c commit 218b101

File tree

1 file changed

+145
-78
lines changed

1 file changed

+145
-78
lines changed

src/backend/replication/slot.c

Lines changed: 145 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,117 +1124,184 @@ ReplicationSlotReserveWal(void)
11241124
}
11251125

11261126
/*
1127-
* Mark any slot that points to an LSN older than the given segment
1128-
* as invalid; it requires WAL that's about to be removed.
1127+
* Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1128+
* and mark it invalid, if necessary and possible.
11291129
*
1130-
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
1130+
* Returns whether ReplicationSlotControlLock was released in the interim (and
1131+
* in that case we're not holding the lock at return, otherwise we are).
1132+
*
1133+
* This is inherently racy, because we release the LWLock
1134+
* for syscalls, so caller must restart if we return true.
11311135
*/
1132-
void
1133-
InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
1136+
static bool
1137+
InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
11341138
{
1135-
XLogRecPtr oldestLSN;
1136-
1137-
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1139+
int last_signaled_pid = 0;
1140+
bool released_lock = false;
11381141

1139-
restart:
1140-
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1141-
for (int i = 0; i < max_replication_slots; i++)
1142+
for (;;)
11421143
{
1143-
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1144-
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
1144+
XLogRecPtr restart_lsn;
11451145
NameData slotname;
1146-
int wspid;
1147-
int last_signaled_pid = 0;
1146+
int active_pid = 0;
1147+
1148+
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
11481149

11491150
if (!s->in_use)
1150-
continue;
1151+
{
1152+
if (released_lock)
1153+
LWLockRelease(ReplicationSlotControlLock);
1154+
break;
1155+
}
11511156

1157+
/*
1158+
* Check if the slot needs to be invalidated. If it needs to be
1159+
* invalidated, and is not currently acquired, acquire it and mark it
1160+
* as having been invalidated. We do this with the spinlock held to
1161+
* avoid race conditions -- for example the restart_lsn could move
1162+
* forward, or the slot could be dropped.
1163+
*/
11521164
SpinLockAcquire(&s->mutex);
1153-
slotname = s->data.name;
1165+
11541166
restart_lsn = s->data.restart_lsn;
1155-
SpinLockRelease(&s->mutex);
11561167

1168+
/*
1169+
* If the slot is already invalid or is fresh enough, we don't need to
1170+
* do anything.
1171+
*/
11571172
if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
1158-
continue;
1159-
LWLockRelease(ReplicationSlotControlLock);
1160-
CHECK_FOR_INTERRUPTS();
1173+
{
1174+
SpinLockRelease(&s->mutex);
1175+
if (released_lock)
1176+
LWLockRelease(ReplicationSlotControlLock);
1177+
break;
1178+
}
1179+
1180+
slotname = s->data.name;
1181+
active_pid = s->active_pid;
1182+
1183+
/*
1184+
* If the slot can be acquired, do so and mark it invalidated
1185+
* immediately. Otherwise we'll signal the owning process, below, and
1186+
* retry.
1187+
*/
1188+
if (active_pid == 0)
1189+
{
1190+
MyReplicationSlot = s;
1191+
s->active_pid = MyProcPid;
1192+
s->data.invalidated_at = restart_lsn;
1193+
s->data.restart_lsn = InvalidXLogRecPtr;
1194+
}
11611195

1162-
/* Get ready to sleep on the slot in case it is active */
1163-
ConditionVariablePrepareToSleep(&s->active_cv);
1196+
SpinLockRelease(&s->mutex);
11641197

1165-
for (;;)
1198+
if (active_pid != 0)
11661199
{
11671200
/*
1168-
* Try to mark this slot as used by this process.
1169-
*
1170-
* Note that ReplicationSlotAcquireInternal(SAB_Inquire)
1171-
* should not cancel the prepared condition variable
1172-
* if this slot is active in other process. Because in this case
1173-
* we have to wait on that CV for the process owning
1174-
* the slot to be terminated, later.
1201+
* Prepare the sleep on the slot's condition variable before
1202+
* releasing the lock, to close a possible race condition if the
1203+
* slot is released before the sleep below.
11751204
*/
1176-
wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
1205+
ConditionVariablePrepareToSleep(&s->active_cv);
11771206

1178-
/*
1179-
* Exit the loop if we successfully acquired the slot or
1180-
* the slot was dropped during waiting for the owning process
1181-
* to be terminated. For example, the latter case is likely to
1182-
* happen when the slot is temporary because it's automatically
1183-
* dropped by the termination of the owning process.
1184-
*/
1185-
if (wspid <= 0)
1186-
break;
1207+
LWLockRelease(ReplicationSlotControlLock);
1208+
released_lock = true;
11871209

11881210
/*
1189-
* Signal to terminate the process that owns the slot.
1211+
* Signal to terminate the process that owns the slot, if we
1212+
* haven't already signalled it. (Avoidance of repeated
1213+
* signalling is the only reason for there to be a loop in this
1214+
* routine; otherwise we could rely on caller's restart loop.)
11901215
*
1191-
* There is the race condition where other process may own
1192-
* the slot after the process using it was terminated and before
1193-
* this process owns it. To handle this case, we signal again
1194-
* if the PID of the owning process is changed than the last.
1195-
*
1196-
* XXX This logic assumes that the same PID is not reused
1197-
* very quickly.
1216+
* There is the race condition that other process may own the slot
1217+
* after its current owner process is terminated and before this
1218+
* process owns it. To handle that, we signal only if the PID of
1219+
* the owning process has changed from the previous time. (This
1220+
* logic assumes that the same PID is not reused very quickly.)
11981221
*/
1199-
if (last_signaled_pid != wspid)
1222+
if (last_signaled_pid != active_pid)
12001223
{
12011224
ereport(LOG,
1202-
(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
1203-
wspid, NameStr(slotname))));
1204-
(void) kill(wspid, SIGTERM);
1205-
last_signaled_pid = wspid;
1225+
(errmsg("terminating process %d to release replication slot \"%s\"",
1226+
active_pid, NameStr(slotname))));
1227+
1228+
(void) kill(active_pid, SIGTERM);
1229+
last_signaled_pid = active_pid;
12061230
}
12071231

1208-
ConditionVariableTimedSleep(&s->active_cv, 10,
1209-
WAIT_EVENT_REPLICATION_SLOT_DROP);
1232+
/* Wait until the slot is released. */
1233+
ConditionVariableSleep(&s->active_cv,
1234+
WAIT_EVENT_REPLICATION_SLOT_DROP);
1235+
1236+
/*
1237+
* Re-acquire lock and start over; we expect to invalidate the slot
1238+
* next time (unless another process acquires the slot in the
1239+
* meantime).
1240+
*/
1241+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1242+
continue;
12101243
}
1211-
ConditionVariableCancelSleep();
1244+
else
1245+
{
1246+
/*
1247+
* We hold the slot now and have already invalidated it; flush it
1248+
* to ensure that state persists.
1249+
*
1250+
* Don't want to hold ReplicationSlotControlLock across file
1251+
* system operations, so release it now but be sure to tell caller
1252+
* to restart from scratch.
1253+
*/
1254+
LWLockRelease(ReplicationSlotControlLock);
1255+
released_lock = true;
12121256

1213-
/*
1214-
* Do nothing here and start from scratch if the slot has
1215-
* already been dropped.
1216-
*/
1217-
if (wspid == -1)
1218-
goto restart;
1257+
/* Make sure the invalidated state persists across server restart */
1258+
ReplicationSlotMarkDirty();
1259+
ReplicationSlotSave();
1260+
ReplicationSlotRelease();
12191261

1220-
ereport(LOG,
1221-
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
1222-
NameStr(slotname),
1223-
(uint32) (restart_lsn >> 32),
1224-
(uint32) restart_lsn)));
1262+
ereport(LOG,
1263+
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
1264+
NameStr(slotname),
1265+
(uint32) (restart_lsn >> 32),
1266+
(uint32) restart_lsn)));
12251267

1226-
SpinLockAcquire(&s->mutex);
1227-
s->data.invalidated_at = s->data.restart_lsn;
1228-
s->data.restart_lsn = InvalidXLogRecPtr;
1229-
SpinLockRelease(&s->mutex);
1268+
/* done with this slot for now */
1269+
break;
1270+
}
1271+
}
12301272

1231-
/* Make sure the invalidated state persists across server restart */
1232-
ReplicationSlotMarkDirty();
1233-
ReplicationSlotSave();
1234-
ReplicationSlotRelease();
1273+
Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
12351274

1236-
/* if we did anything, start from scratch */
1237-
goto restart;
1275+
return released_lock;
1276+
}
1277+
1278+
/*
1279+
* Mark any slot that points to an LSN older than the given segment
1280+
* as invalid; it requires WAL that's about to be removed.
1281+
*
1282+
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
1283+
*/
1284+
void
1285+
InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
1286+
{
1287+
XLogRecPtr oldestLSN;
1288+
1289+
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1290+
1291+
restart:
1292+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1293+
for (int i = 0; i < max_replication_slots; i++)
1294+
{
1295+
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1296+
1297+
if (!s->in_use)
1298+
continue;
1299+
1300+
if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
1301+
{
1302+
/* if the lock was released, start from scratch */
1303+
goto restart;
1304+
}
12381305
}
12391306
LWLockRelease(ReplicationSlotControlLock);
12401307
}

0 commit comments

Comments
 (0)