Skip to content

Commit ba9d038

Browse files
authored
feat: add periodic cleanup of PG Coordinator state (coder#8142)
* PG Coordinator cleans orphaned state Signed-off-by: Spike Curtis <spike@coder.com> * Don't need pubsub Signed-off-by: Spike Curtis <spike@coder.com> --------- Signed-off-by: Spike Curtis <spike@coder.com>
1 parent c594f02 commit ba9d038

File tree

9 files changed

+135
-0
lines changed

9 files changed

+135
-0
lines changed

coderd/database/dbauthz/dbauthz.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,13 @@ func (q *querier) AcquireProvisionerJob(ctx context.Context, arg database.Acquir
683683
return q.db.AcquireProvisionerJob(ctx, arg)
684684
}
685685

686+
func (q *querier) CleanTailnetCoordinators(ctx context.Context) error {
687+
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
688+
return err
689+
}
690+
return q.db.CleanTailnetCoordinators(ctx)
691+
}
692+
686693
func (q *querier) DeleteAPIKeyByID(ctx context.Context, id string) error {
687694
return deleteQ(q.log, q.auth, q.db.GetAPIKeyByID, q.db.DeleteAPIKeyByID)(ctx, id)
688695
}

coderd/database/dbfake/dbfake.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,10 @@ func (q *fakeQuerier) AcquireProvisionerJob(_ context.Context, arg database.Acqu
10351035
return database.ProvisionerJob{}, sql.ErrNoRows
10361036
}
10371037

1038+
func (*fakeQuerier) CleanTailnetCoordinators(_ context.Context) error {
1039+
return ErrUnimplemented
1040+
}
1041+
10381042
func (q *fakeQuerier) DeleteAPIKeyByID(_ context.Context, id string) error {
10391043
q.mutex.Lock()
10401044
defer q.mutex.Unlock()

coderd/database/dbmetrics/dbmetrics.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/dbmock/dbmock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/tailnet.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,8 @@ DO UPDATE SET
7777
id = $1,
7878
heartbeat_at = now() at time zone 'utc'
7979
RETURNING *;
80+
81+
-- name: CleanTailnetCoordinators :exec
82+
DELETE
83+
FROM tailnet_coordinators
84+
WHERE heartbeat_at < now() - INTERVAL '24 HOURS';

enterprise/tailnet/pgcoord.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const (
3131
numQuerierWorkers = 10
3232
numBinderWorkers = 10
3333
dbMaxBackoff = 10 * time.Second
34+
cleanupPeriod = time.Hour
3435
)
3536

3637
// pgCoord is a postgres-backed coordinator
@@ -1041,6 +1042,9 @@ type heartbeats struct {
10411042
lock sync.RWMutex
10421043
coordinators map[uuid.UUID]time.Time
10431044
timer *time.Timer
1045+
1046+
// overwritten in tests, but otherwise constant
1047+
cleanupPeriod time.Duration
10441048
}
10451049

10461050
func newHeartbeats(
@@ -1058,9 +1062,11 @@ func newHeartbeats(
10581062
update: update,
10591063
firstHeartbeat: firstHeartbeat,
10601064
coordinators: make(map[uuid.UUID]time.Time),
1065+
cleanupPeriod: cleanupPeriod,
10611066
}
10621067
go h.subscribe()
10631068
go h.sendBeats()
1069+
go h.cleanupLoop()
10641070
return h
10651071
}
10661072

@@ -1211,3 +1217,31 @@ func (h *heartbeats) sendDelete() {
12111217
}
12121218
h.logger.Debug(h.ctx, "deleted coordinator")
12131219
}
1220+
1221+
func (h *heartbeats) cleanupLoop() {
1222+
h.cleanup()
1223+
tkr := time.NewTicker(h.cleanupPeriod)
1224+
defer tkr.Stop()
1225+
for {
1226+
select {
1227+
case <-h.ctx.Done():
1228+
h.logger.Debug(h.ctx, "ending cleanupLoop", slog.Error(h.ctx.Err()))
1229+
return
1230+
case <-tkr.C:
1231+
h.cleanup()
1232+
}
1233+
}
1234+
}
1235+
1236+
// cleanup issues a DB command to clean out any old expired coordinators state. The cleanup is idempotent, so no need
1237+
// to synchronize with other coordinators.
1238+
func (h *heartbeats) cleanup() {
1239+
err := h.store.CleanTailnetCoordinators(h.ctx)
1240+
if err != nil {
1241+
// the records we are attempting to clean up do no serious harm other than
1242+
// accumulating in the tables, so we don't bother retrying if it fails.
1243+
h.logger.Error(h.ctx, "failed to cleanup old coordinators", slog.Error(err))
1244+
return
1245+
}
1246+
h.logger.Debug(h.ctx, "cleaned up old coordinators")
1247+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package tailnet
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/golang/mock/gomock"
9+
10+
"cdr.dev/slog"
11+
"cdr.dev/slog/sloggers/slogtest"
12+
13+
"github.com/coder/coder/coderd/database/dbmock"
14+
"github.com/coder/coder/testutil"
15+
)
16+
17+
// TestHeartbeat_Cleanup is internal so that we can overwrite the cleanup period and not wait an hour for the timed
18+
// cleanup.
19+
func TestHeartbeat_Cleanup(t *testing.T) {
20+
t.Parallel()
21+
22+
ctrl := gomock.NewController(t)
23+
mStore := dbmock.NewMockStore(ctrl)
24+
25+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
26+
defer cancel()
27+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
28+
29+
waitForCleanup := make(chan struct{})
30+
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).MinTimes(2).DoAndReturn(func(_ context.Context) error {
31+
<-waitForCleanup
32+
return nil
33+
})
34+
35+
uut := &heartbeats{
36+
ctx: ctx,
37+
logger: logger,
38+
store: mStore,
39+
cleanupPeriod: time.Millisecond,
40+
}
41+
go uut.cleanupLoop()
42+
43+
for i := 0; i < 2; i++ {
44+
select {
45+
case <-ctx.Done():
46+
t.Fatal("timeout")
47+
case waitForCleanup <- struct{}{}:
48+
// ok
49+
}
50+
}
51+
close(waitForCleanup)
52+
}

0 commit comments

Comments
 (0)