@@ -1124,117 +1124,184 @@ ReplicationSlotReserveWal(void)
1124
1124
}
1125
1125
1126
1126
/*
1127
- * Mark any slot that points to an LSN older than the given segment
1128
- * as invalid; it requires WAL that's about to be removed .
1127
+ * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1128
+ * and mark it invalid, if necessary and possible .
1129
1129
*
1130
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1130
+ * Returns whether ReplicationSlotControlLock was released in the interim (and
1131
+ * in that case we're not holding the lock at return, otherwise we are).
1132
+ *
1133
+ * This is inherently racy, because we release the LWLock
1134
+ * for syscalls, so caller must restart if we return true.
1131
1135
*/
1132
- void
1133
- InvalidateObsoleteReplicationSlots ( XLogSegNo oldestSegno )
1136
+ static bool
1137
+ InvalidatePossiblyObsoleteSlot ( ReplicationSlot * s , XLogRecPtr oldestLSN )
1134
1138
{
1135
- XLogRecPtr oldestLSN ;
1136
-
1137
- XLogSegNoOffsetToRecPtr (oldestSegno , 0 , wal_segment_size , oldestLSN );
1139
+ int last_signaled_pid = 0 ;
1140
+ bool released_lock = false;
1138
1141
1139
- restart :
1140
- LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
1141
- for (int i = 0 ; i < max_replication_slots ; i ++ )
1142
+ for (;;)
1142
1143
{
1143
- ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1144
- XLogRecPtr restart_lsn = InvalidXLogRecPtr ;
1144
+ XLogRecPtr restart_lsn ;
1145
1145
NameData slotname ;
1146
- int wspid ;
1147
- int last_signaled_pid = 0 ;
1146
+ int active_pid = 0 ;
1147
+
1148
+ Assert (LWLockHeldByMeInMode (ReplicationSlotControlLock , LW_SHARED ));
1148
1149
1149
1150
if (!s -> in_use )
1150
- continue ;
1151
+ {
1152
+ if (released_lock )
1153
+ LWLockRelease (ReplicationSlotControlLock );
1154
+ break ;
1155
+ }
1151
1156
1157
+ /*
1158
+ * Check if the slot needs to be invalidated. If it needs to be
1159
+ * invalidated, and is not currently acquired, acquire it and mark it
1160
+ * as having been invalidated. We do this with the spinlock held to
1161
+ * avoid race conditions -- for example the restart_lsn could move
1162
+ * forward, or the slot could be dropped.
1163
+ */
1152
1164
SpinLockAcquire (& s -> mutex );
1153
- slotname = s -> data . name ;
1165
+
1154
1166
restart_lsn = s -> data .restart_lsn ;
1155
- SpinLockRelease (& s -> mutex );
1156
1167
1168
+ /*
1169
+ * If the slot is already invalid or is fresh enough, we don't need to
1170
+ * do anything.
1171
+ */
1157
1172
if (XLogRecPtrIsInvalid (restart_lsn ) || restart_lsn >= oldestLSN )
1158
- continue ;
1159
- LWLockRelease (ReplicationSlotControlLock );
1160
- CHECK_FOR_INTERRUPTS ();
1173
+ {
1174
+ SpinLockRelease (& s -> mutex );
1175
+ if (released_lock )
1176
+ LWLockRelease (ReplicationSlotControlLock );
1177
+ break ;
1178
+ }
1179
+
1180
+ slotname = s -> data .name ;
1181
+ active_pid = s -> active_pid ;
1182
+
1183
+ /*
1184
+ * If the slot can be acquired, do so and mark it invalidated
1185
+ * immediately. Otherwise we'll signal the owning process, below, and
1186
+ * retry.
1187
+ */
1188
+ if (active_pid == 0 )
1189
+ {
1190
+ MyReplicationSlot = s ;
1191
+ s -> active_pid = MyProcPid ;
1192
+ s -> data .invalidated_at = restart_lsn ;
1193
+ s -> data .restart_lsn = InvalidXLogRecPtr ;
1194
+ }
1161
1195
1162
- /* Get ready to sleep on the slot in case it is active */
1163
- ConditionVariablePrepareToSleep (& s -> active_cv );
1196
+ SpinLockRelease (& s -> mutex );
1164
1197
1165
- for (;; )
1198
+ if ( active_pid != 0 )
1166
1199
{
1167
1200
/*
1168
- * Try to mark this slot as used by this process.
1169
- *
1170
- * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
1171
- * should not cancel the prepared condition variable
1172
- * if this slot is active in other process. Because in this case
1173
- * we have to wait on that CV for the process owning
1174
- * the slot to be terminated, later.
1201
+ * Prepare the sleep on the slot's condition variable before
1202
+ * releasing the lock, to close a possible race condition if the
1203
+ * slot is released before the sleep below.
1175
1204
*/
1176
- wspid = ReplicationSlotAcquireInternal ( s , NULL , SAB_Inquire );
1205
+ ConditionVariablePrepareToSleep ( & s -> active_cv );
1177
1206
1178
- /*
1179
- * Exit the loop if we successfully acquired the slot or
1180
- * the slot was dropped during waiting for the owning process
1181
- * to be terminated. For example, the latter case is likely to
1182
- * happen when the slot is temporary because it's automatically
1183
- * dropped by the termination of the owning process.
1184
- */
1185
- if (wspid <= 0 )
1186
- break ;
1207
+ LWLockRelease (ReplicationSlotControlLock );
1208
+ released_lock = true;
1187
1209
1188
1210
/*
1189
- * Signal to terminate the process that owns the slot.
1211
+ * Signal to terminate the process that owns the slot, if we
1212
+ * haven't already signalled it. (Avoidance of repeated
1213
+ * signalling is the only reason for there to be a loop in this
1214
+ * routine; otherwise we could rely on caller's restart loop.)
1190
1215
*
1191
- * There is the race condition where other process may own
1192
- * the slot after the process using it was terminated and before
1193
- * this process owns it. To handle this case, we signal again
1194
- * if the PID of the owning process is changed than the last.
1195
- *
1196
- * XXX This logic assumes that the same PID is not reused
1197
- * very quickly.
1216
+ * There is the race condition that other process may own the slot
1217
+ * after its current owner process is terminated and before this
1218
+ * process owns it. To handle that, we signal only if the PID of
1219
+ * the owning process has changed from the previous time. (This
1220
+ * logic assumes that the same PID is not reused very quickly.)
1198
1221
*/
1199
- if (last_signaled_pid != wspid )
1222
+ if (last_signaled_pid != active_pid )
1200
1223
{
1201
1224
ereport (LOG ,
1202
- (errmsg ("terminating process %d because replication slot \"%s\" is too far behind" ,
1203
- wspid , NameStr (slotname ))));
1204
- (void ) kill (wspid , SIGTERM );
1205
- last_signaled_pid = wspid ;
1225
+ (errmsg ("terminating process %d to release replication slot \"%s\"" ,
1226
+ active_pid , NameStr (slotname ))));
1227
+
1228
+ (void ) kill (active_pid , SIGTERM );
1229
+ last_signaled_pid = active_pid ;
1206
1230
}
1207
1231
1208
- ConditionVariableTimedSleep (& s -> active_cv , 10 ,
1209
- WAIT_EVENT_REPLICATION_SLOT_DROP );
1232
+ /* Wait until the slot is released. */
1233
+ ConditionVariableSleep (& s -> active_cv ,
1234
+ WAIT_EVENT_REPLICATION_SLOT_DROP );
1235
+
1236
+ /*
1237
+ * Re-acquire lock and start over; we expect to invalidate the slot
1238
+ * next time (unless another process acquires the slot in the
1239
+ * meantime).
1240
+ */
1241
+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
1242
+ continue ;
1210
1243
}
1211
- ConditionVariableCancelSleep ();
1244
+ else
1245
+ {
1246
+ /*
1247
+ * We hold the slot now and have already invalidated it; flush it
1248
+ * to ensure that state persists.
1249
+ *
1250
+ * Don't want to hold ReplicationSlotControlLock across file
1251
+ * system operations, so release it now but be sure to tell caller
1252
+ * to restart from scratch.
1253
+ */
1254
+ LWLockRelease (ReplicationSlotControlLock );
1255
+ released_lock = true;
1212
1256
1213
- /*
1214
- * Do nothing here and start from scratch if the slot has
1215
- * already been dropped.
1216
- */
1217
- if (wspid == -1 )
1218
- goto restart ;
1257
+ /* Make sure the invalidated state persists across server restart */
1258
+ ReplicationSlotMarkDirty ();
1259
+ ReplicationSlotSave ();
1260
+ ReplicationSlotRelease ();
1219
1261
1220
- ereport (LOG ,
1221
- (errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
1222
- NameStr (slotname ),
1223
- (uint32 ) (restart_lsn >> 32 ),
1224
- (uint32 ) restart_lsn )));
1262
+ ereport (LOG ,
1263
+ (errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
1264
+ NameStr (slotname ),
1265
+ (uint32 ) (restart_lsn >> 32 ),
1266
+ (uint32 ) restart_lsn )));
1225
1267
1226
- SpinLockAcquire ( & s -> mutex );
1227
- s -> data . invalidated_at = s -> data . restart_lsn ;
1228
- s -> data . restart_lsn = InvalidXLogRecPtr ;
1229
- SpinLockRelease ( & s -> mutex );
1268
+ /* done with this slot for now */
1269
+ break ;
1270
+ }
1271
+ }
1230
1272
1231
- /* Make sure the invalidated state persists across server restart */
1232
- ReplicationSlotMarkDirty ();
1233
- ReplicationSlotSave ();
1234
- ReplicationSlotRelease ();
1273
+ Assert (released_lock == !LWLockHeldByMe (ReplicationSlotControlLock ));
1235
1274
1236
- /* if we did anything, start from scratch */
1237
- goto restart ;
1275
+ return released_lock ;
1276
+ }
1277
+
1278
+ /*
1279
+ * Mark any slot that points to an LSN older than the given segment
1280
+ * as invalid; it requires WAL that's about to be removed.
1281
+ *
1282
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1283
+ */
1284
+ void
1285
+ InvalidateObsoleteReplicationSlots (XLogSegNo oldestSegno )
1286
+ {
1287
+ XLogRecPtr oldestLSN ;
1288
+
1289
+ XLogSegNoOffsetToRecPtr (oldestSegno , 0 , wal_segment_size , oldestLSN );
1290
+
1291
+ restart :
1292
+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
1293
+ for (int i = 0 ; i < max_replication_slots ; i ++ )
1294
+ {
1295
+ ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1296
+
1297
+ if (!s -> in_use )
1298
+ continue ;
1299
+
1300
+ if (InvalidatePossiblyObsoleteSlot (s , oldestLSN ))
1301
+ {
1302
+ /* if the lock was released, start from scratch */
1303
+ goto restart ;
1304
+ }
1238
1305
}
1239
1306
LWLockRelease (ReplicationSlotControlLock );
1240
1307
}
0 commit comments