Skip to content

Commit ae0aa5f

Browse files
committed
Delete old replicas on a CRON
1 parent 1e85039 commit ae0aa5f

File tree

6 files changed

+48
-48
lines changed

6 files changed

+48
-48
lines changed

coderd/database/databasefake/databasefake.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3238,15 +3238,3 @@ func (q *fakeQuerier) GetReplicasUpdatedAfter(_ context.Context, updatedAt time.
32383238
}
32393239
return replicas, nil
32403240
}
3241-
3242-
func (q *fakeQuerier) GetReplicaByID(_ context.Context, id uuid.UUID) (database.Replica, error) {
3243-
q.mutex.RLock()
3244-
defer q.mutex.RUnlock()
3245-
3246-
for _, replica := range q.replicas {
3247-
if replica.ID == id {
3248-
return replica, nil
3249-
}
3250-
}
3251-
return database.Replica{}, sql.ErrNoRows
3252-
}

coderd/database/querier.go

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 0 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/replicas.sql

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
-- name: GetReplicasUpdatedAfter :many
22
SELECT * FROM replicas WHERE updated_at > $1 AND stopped_at IS NULL;
33

4-
-- name: GetReplicaByID :one
5-
SELECT * FROM replicas WHERE id = $1;
6-
74
-- name: InsertReplica :one
85
INSERT INTO replicas (
96
id,

enterprise/replicasync/replicasync.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ var (
2626
)
2727

2828
type Options struct {
29-
UpdateInterval time.Duration
30-
PeerTimeout time.Duration
31-
RelayAddress string
32-
RegionID int32
33-
TLSConfig *tls.Config
29+
CleanupInterval time.Duration
30+
UpdateInterval time.Duration
31+
PeerTimeout time.Duration
32+
RelayAddress string
33+
RegionID int32
34+
TLSConfig *tls.Config
3435
}
3536

3637
// New registers the replica with the database and periodically updates to ensure
@@ -45,6 +46,11 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub data
4546
if options.UpdateInterval == 0 {
4647
options.UpdateInterval = 5 * time.Second
4748
}
49+
if options.CleanupInterval == 0 {
50+
// The cleanup interval can be quite long, because it's
51+
// primary purpose is to clean up dead replicas.
52+
options.CleanupInterval = 30 * time.Minute
53+
}
4854
hostname, err := os.Hostname()
4955
if err != nil {
5056
return nil, xerrors.Errorf("get hostname: %w", err)
@@ -123,16 +129,31 @@ type Manager struct {
123129
callback func()
124130
}
125131

132+
// updateInterval is used to determine a replicas state.
133+
// If the replica was updated > the time, it's considered healthy.
134+
// If the replica was updated < the time, it's considered stale.
135+
func (m *Manager) updateInterval() time.Time {
136+
return database.Now().Add(-3 * m.options.UpdateInterval)
137+
}
138+
126139
// loop runs the replica update sequence on an update interval.
127140
func (m *Manager) loop(ctx context.Context) {
128141
defer m.closeWait.Done()
129-
ticker := time.NewTicker(m.options.UpdateInterval)
130-
defer ticker.Stop()
142+
updateTicker := time.NewTicker(m.options.UpdateInterval)
143+
defer updateTicker.Stop()
144+
deleteTicker := time.NewTicker(m.options.CleanupInterval)
145+
defer deleteTicker.Stop()
131146
for {
132147
select {
133148
case <-ctx.Done():
134149
return
135-
case <-ticker.C:
150+
case <-deleteTicker.C:
151+
err := m.db.DeleteReplicasUpdatedBefore(ctx, m.updateInterval())
152+
if err != nil {
153+
m.logger.Warn(ctx, "delete old replicas", slog.Error(err))
154+
}
155+
continue
156+
case <-updateTicker.C:
136157
}
137158
err := m.syncReplicas(ctx)
138159
if err != nil && !errors.Is(err, context.Canceled) {
@@ -204,7 +225,7 @@ func (m *Manager) syncReplicas(ctx context.Context) error {
204225
defer m.closeWait.Done()
205226
// Expect replicas to update once every three times the interval...
206227
// If they don't, assume death!
207-
replicas, err := m.db.GetReplicasUpdatedAfter(ctx, database.Now().Add(-3*m.options.UpdateInterval))
228+
replicas, err := m.db.GetReplicasUpdatedAfter(ctx, m.updateInterval())
208229
if err != nil {
209230
return xerrors.Errorf("get replicas: %w", err)
210231
}

enterprise/replicasync/replicasync_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,24 @@ func TestReplica(t *testing.T) {
178178
}, testutil.WaitShort, testutil.IntervalFast)
179179
_ = server.Close()
180180
})
181+
t.Run("DeletesOld", func(t *testing.T) {
182+
t.Parallel()
183+
db, pubsub := dbtestutil.NewDB(t)
184+
_, err := db.InsertReplica(context.Background(), database.InsertReplicaParams{
185+
ID: uuid.New(),
186+
UpdatedAt: database.Now().Add(-time.Hour),
187+
})
188+
require.NoError(t, err)
189+
server, err := replicasync.New(context.Background(), slogtest.Make(t, nil), db, pubsub, &replicasync.Options{
190+
RelayAddress: "google.com",
191+
CleanupInterval: time.Millisecond,
192+
})
193+
require.NoError(t, err)
194+
defer server.Close()
195+
require.Eventually(t, func() bool {
196+
return len(server.Regional()) == 0
197+
}, testutil.WaitShort, testutil.IntervalFast)
198+
})
181199
t.Run("TwentyConcurrent", func(t *testing.T) {
182200
// Ensures that twenty concurrent replicas can spawn and all
183201
// discover each other in parallel!

0 commit comments

Comments
 (0)