Skip to content

Commit e826ae9

Browse files
committed
chore(dbpurge): refactor to use quartz functions instead of time.*
chore(dbpurge): use quartz.TickerFunc instead
1 parent 1a39e8b commit e826ae9

File tree

3 files changed

+96
-61
lines changed

3 files changed

+96
-61
lines changed

cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -985,7 +985,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
985985
defer shutdownConns()
986986

987987
// Ensures that old database entries are cleaned up over time!
988-
purger := dbpurge.New(ctx, logger.Named("dbpurge"), options.Database)
988+
purger := dbpurge.New(ctx, logger.Named("dbpurge"), options.Database, quartz.NewReal())
989989
defer purger.Close()
990990

991991
// Updates workspace usage

coderd/database/dbpurge/dbpurge.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/coder/coder/v2/coderd/database"
1313
"github.com/coder/coder/v2/coderd/database/dbauthz"
14+
"github.com/coder/quartz"
1415
)
1516

1617
const (
@@ -22,19 +23,16 @@ const (
2223
// It is the caller's responsibility to call Close on the returned instance.
2324
//
2425
// This is for cleaning up old, unused resources from the database that take up space.
25-
func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
26+
func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz.Clock) io.Closer {
2627
closed := make(chan struct{})
2728

2829
ctx, cancelFunc := context.WithCancel(ctx)
2930
//nolint:gocritic // The system purges old db records without user input.
3031
ctx = dbauthz.AsSystemRestricted(ctx)
3132

32-
// Use time.Nanosecond to force an initial tick. It will be reset to the
33-
// correct duration after executing once.
34-
ticker := time.NewTicker(time.Nanosecond)
33+
ticker := clk.NewTicker(time.Nanosecond)
3534
doTick := func(start time.Time) {
3635
defer ticker.Reset(delay)
37-
3836
// Start a transaction to grab advisory lock, we don't want to run
3937
// multiple purges at the same time (multiple replicas).
4038
if err := db.InTx(func(tx database.Store) error {
@@ -62,7 +60,7 @@ func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
6260
return xerrors.Errorf("failed to delete old notification messages: %w", err)
6361
}
6462

65-
logger.Info(ctx, "purged old database entries", slog.F("duration", time.Since(start)))
63+
logger.Info(ctx, "purged old database entries", slog.F("duration", clk.Since(start)))
6664

6765
return nil
6866
}, nil); err != nil {
@@ -78,12 +76,9 @@ func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
7876
select {
7977
case <-ctx.Done():
8078
return
81-
case now, ok := <-ticker.C:
82-
if !ok {
83-
return
84-
}
79+
case tick := <-ticker.C:
8580
ticker.Stop()
86-
doTick(now)
81+
doTick(tick)
8782
}
8883
}
8984
}()

coderd/database/dbpurge/dbpurge_test.go

Lines changed: 89 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"time"
1212

1313
"github.com/google/uuid"
14-
"github.com/stretchr/testify/assert"
1514
"github.com/stretchr/testify/require"
1615
"go.uber.org/goleak"
1716
"golang.org/x/exp/slices"
@@ -29,26 +28,48 @@ import (
2928
"github.com/coder/coder/v2/provisionerd/proto"
3029
"github.com/coder/coder/v2/provisionersdk"
3130
"github.com/coder/coder/v2/testutil"
31+
"github.com/coder/quartz"
3232
)
3333

3434
func TestMain(m *testing.M) {
3535
goleak.VerifyTestMain(m)
3636
}
3737

3838
// Ensures no goroutines leak.
39+
//
40+
//nolint:paralleltest // It uses LockIDDBPurge.
3941
func TestPurge(t *testing.T) {
40-
t.Parallel()
41-
purger := dbpurge.New(context.Background(), slogtest.Make(t, nil), dbmem.New())
42-
err := purger.Close()
43-
require.NoError(t, err)
42+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
43+
defer cancel()
44+
45+
clk := quartz.NewMock(t)
46+
47+
// We want to make sure dbpurge is actually started so that this test is meaningful.
48+
trapStop := clk.Trap().TickerStop()
49+
50+
purger := dbpurge.New(context.Background(), slogtest.Make(t, nil), dbmem.New(), clk)
51+
52+
// Wait for the initial nanosecond tick.
53+
clk.Advance(time.Nanosecond).MustWait(ctx)
54+
// Wait for ticker.Stop call that happens in the goroutine.
55+
trapStop.MustWait(ctx).Release()
56+
// Stop the trap now to avoid blocking further.
57+
trapStop.Close()
58+
59+
require.NoError(t, purger.Close())
4460
}
4561

4662
//nolint:paralleltest // It uses LockIDDBPurge.
4763
func TestDeleteOldWorkspaceAgentStats(t *testing.T) {
48-
db, _ := dbtestutil.NewDB(t)
49-
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
64+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
65+
defer cancel()
5066

5167
now := dbtime.Now()
68+
// TODO: must refactor DeleteOldWorkspaceAgentStats to allow passing in cutoff
69+
// before using quarts.NewMock()
70+
clk := quartz.NewReal()
71+
db, _ := dbtestutil.NewDB(t)
72+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
5273

5374
defer func() {
5475
if t.Failed() {
@@ -78,9 +99,6 @@ func TestDeleteOldWorkspaceAgentStats(t *testing.T) {
7899
}
79100
}()
80101

81-
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
82-
defer cancel()
83-
84102
// given
85103
// Note: We use increments of 2 hours to ensure we avoid any DST
86104
// conflicts, verifying DST behavior is beyond the scope of this
@@ -114,7 +132,7 @@ func TestDeleteOldWorkspaceAgentStats(t *testing.T) {
114132
})
115133

116134
// when
117-
closer := dbpurge.New(ctx, logger, db)
135+
closer := dbpurge.New(ctx, logger, db, clk)
118136
defer closer.Close()
119137

120138
// then
@@ -139,7 +157,7 @@ func TestDeleteOldWorkspaceAgentStats(t *testing.T) {
139157

140158
// Start a new purger to immediately trigger delete after rollup.
141159
_ = closer.Close()
142-
closer = dbpurge.New(ctx, logger, db)
160+
closer = dbpurge.New(ctx, logger, db, clk)
143161
defer closer.Close()
144162

145163
// then
@@ -177,50 +195,75 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) {
177195
t.Run("AgentHasNotConnectedSinceWeek_LogsExpired", func(t *testing.T) {
178196
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
179197
defer cancel()
198+
clk := quartz.NewMock(t)
199+
clk.Set(now).MustWait(ctx)
180200

181-
// given
182-
agent1 := mustCreateAgentWithLogs(ctx, t, db, user, org, tmpl, tv, now.Add(-8*24*time.Hour), t.Name()+"-1")
201+
// After dbpurge completes, the ticker is reset. Trap this call.
202+
trapReset := clk.Trap().TickerReset()
203+
defer trapReset.Close()
183204

184-
// when
185-
closer := dbpurge.New(ctx, logger, db)
186-
defer closer.Close()
205+
// given: an agent with logs older than threshold
206+
agent := mustCreateAgentWithLogs(ctx, t, db, user, org, tmpl, tv, now.Add(-8*24*time.Hour), t.Name())
187207

188-
// then
189-
assert.Eventually(t, func() bool {
190-
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
191-
AgentID: agent1,
192-
})
193-
if err != nil {
194-
return false
195-
}
196-
assert.NoError(t, err)
197-
assert.NotContains(t, agentLogs, t.Name())
198-
return !containsAgentLog(agentLogs, t.Name())
199-
}, testutil.WaitShort, testutil.IntervalFast)
208+
// when dbpurge runs
209+
closer := dbpurge.New(ctx, logger, db, clk)
210+
defer closer.Close()
211+
// Wait for the initial nanosecond tick.
212+
clk.Advance(time.Nanosecond).MustWait(ctx)
213+
214+
trapReset.MustWait(ctx).Release() // Wait for ticker.Reset()
215+
d, w := clk.AdvanceNext()
216+
require.Equal(t, 10*time.Minute, d)
217+
218+
closer.Close() // doTick() has now run.
219+
w.MustWait(ctx)
220+
221+
// then the logs should be gone
222+
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
223+
AgentID: agent,
224+
CreatedAfter: 0,
225+
})
226+
require.NoError(t, err)
227+
require.Empty(t, agentLogs, "expected agent logs to be empty")
200228
})
201229

202230
//nolint:paralleltest // It uses LockIDDBPurge.
203231
t.Run("AgentConnectedSixDaysAgo_LogsValid", func(t *testing.T) {
204232
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
205233
defer cancel()
234+
clk := quartz.NewMock(t)
235+
clk.Set(now).MustWait(ctx)
206236

207-
// given
237+
// After dbpurge completes, the ticker is reset. Trap this call.
238+
trapReset := clk.Trap().TickerReset()
239+
defer trapReset.Close()
240+
241+
// given: an agent with logs newer than threshold
208242
agent := mustCreateAgentWithLogs(ctx, t, db, user, org, tmpl, tv, now.Add(-6*24*time.Hour), t.Name())
209243

210-
// when
211-
closer := dbpurge.New(ctx, logger, db)
244+
// when dbpurge runs
245+
closer := dbpurge.New(ctx, logger, db, clk)
212246
defer closer.Close()
213247

214-
// then
215-
require.Eventually(t, func() bool {
216-
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
217-
AgentID: agent,
218-
})
219-
if err != nil {
220-
return false
221-
}
222-
return containsAgentLog(agentLogs, t.Name())
223-
}, testutil.WaitShort, testutil.IntervalFast)
248+
// Wait for the initial nanosecond tick.
249+
clk.Advance(time.Nanosecond).MustWait(ctx)
250+
251+
trapReset.MustWait(ctx).Release() // Wait for ticker.Reset()
252+
d, w := clk.AdvanceNext()
253+
require.Equal(t, 10*time.Minute, d)
254+
255+
closer.Close() // doTick() has now run.
256+
w.MustWait(ctx)
257+
258+
// then the logs should still be there
259+
agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{
260+
AgentID: agent,
261+
})
262+
require.NoError(t, err)
263+
require.NotEmpty(t, agentLogs)
264+
for _, al := range agentLogs {
265+
require.Equal(t, t.Name(), al.Output)
266+
}
224267
})
225268
}
226269

@@ -273,14 +316,11 @@ func mustCreateAgent(t *testing.T, db database.Store, user database.User, org da
273316
})
274317
}
275318

276-
func containsAgentLog(daemons []database.WorkspaceAgentLog, output string) bool {
277-
return slices.ContainsFunc(daemons, func(d database.WorkspaceAgentLog) bool {
278-
return d.Output == output
279-
})
280-
}
281-
282319
//nolint:paralleltest // It uses LockIDDBPurge.
283320
func TestDeleteOldProvisionerDaemons(t *testing.T) {
321+
// TODO: must refactor DeleteOldProvisionerDaemons to allow passing in cutoff
322+
// before using quartz.NewMock
323+
clk := quartz.NewReal()
284324
db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure())
285325
defaultOrg := dbgen.Organization(t, db, database.Organization{})
286326
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
@@ -347,7 +387,7 @@ func TestDeleteOldProvisionerDaemons(t *testing.T) {
347387
require.NoError(t, err)
348388

349389
// when
350-
closer := dbpurge.New(ctx, logger, db)
390+
closer := dbpurge.New(ctx, logger, db, clk)
351391
defer closer.Close()
352392

353393
// then

0 commit comments

Comments
 (0)