From 94519621620de2a075683a8aa12fa40893973f02 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 29 Aug 2024 16:54:29 +0100 Subject: [PATCH 1/9] chore: improve coverage of dbpurge tests for DeleteOldWorkspaceAgentLogs --- coderd/database/dbpurge/dbpurge_test.go | 248 ++++++++++++++---------- 1 file changed, 149 insertions(+), 99 deletions(-) diff --git a/coderd/database/dbpurge/dbpurge_test.go b/coderd/database/dbpurge/dbpurge_test.go index 13a93a99d4471..9f7f241bd89aa 100644 --- a/coderd/database/dbpurge/dbpurge_test.go +++ b/coderd/database/dbpurge/dbpurge_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" "golang.org/x/exp/slices" @@ -181,7 +182,16 @@ func containsWorkspaceAgentStat(stats []database.GetWorkspaceAgentStatsRow, need //nolint:paralleltest // It uses LockIDDBPurge. func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { - db, _ := dbtestutil.NewDB(t) + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + clk := quartz.NewMock(t) + now := dbtime.Now() + threshold := now.Add(-7 * 24 * time.Hour) + beforeThreshold := threshold.Add(-time.Hour) + afterThreshold := threshold.Add(time.Hour) + clk.Set(now).MustWait(ctx) + + db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure()) org := dbgen.Organization(t, db, database.Organization{}) user := dbgen.User(t, db, database.User{}) _ = dbgen.OrganizationMember(t, db, database.OrganizationMember{UserID: user.ID, OrganizationID: org.ID}) @@ -189,131 +199,171 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { tmpl := dbgen.Template(t, db, database.Template{OrganizationID: org.ID, ActiveVersionID: tv.ID, CreatedBy: user.ID}) logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}) - now := dbtime.Now() - //nolint:paralleltest // It uses LockIDDBPurge. - t.Run("AgentHasNotConnectedSinceWeek_LogsExpired", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) - defer cancel() - clk := quartz.NewMock(t) - clk.Set(now).MustWait(ctx) - - // After dbpurge completes, the ticker is reset. Trap this call. - trapReset := clk.Trap().TickerReset() - defer trapReset.Close() - - // given: an agent with logs older than threshold - agent := mustCreateAgentWithLogs(ctx, t, db, user, org, tmpl, tv, now.Add(-8*24*time.Hour), t.Name()) - - // when dbpurge runs - closer := dbpurge.New(ctx, logger, db, clk) - defer closer.Close() - // Wait for the initial nanosecond tick. - clk.Advance(time.Nanosecond).MustWait(ctx) - - trapReset.MustWait(ctx).Release() // Wait for ticker.Reset() - d, w := clk.AdvanceNext() - require.Equal(t, 10*time.Minute, d) - - closer.Close() // doTick() has now run. - w.MustWait(ctx) - - // then the logs should be gone - agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ - AgentID: agent, - CreatedAfter: 0, - }) - require.NoError(t, err) - require.Empty(t, agentLogs, "expected agent logs to be empty") - }) + // Given the following: + + // Workspace A was built once before the threshold, and never connected. + wsA := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wbA1 := mustCreateWorkspaceBuild(t, db, org, tv, wsA.ID, beforeThreshold, 1) + agentA1 := mustCreateAgent(t, db, wbA1) + mustCreateAgentLogs(ctx, t, db, agentA1.ID, nil, "agent a1 logs should be deleted") + + // Workspace B was built twice before the threshold. + wsB := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wbB1 := mustCreateWorkspaceBuild(t, db, org, tv, wsB.ID, beforeThreshold, 1) + wbB2 := mustCreateWorkspaceBuild(t, db, org, tv, wsB.ID, beforeThreshold, 2) + agentB1 := mustCreateAgent(t, db, wbB1) + agentB2 := mustCreateAgent(t, db, wbB2) + mustCreateAgentLogs(ctx, t, db, agentB1.ID, &beforeThreshold, "agent b1 logs should be deleted") + mustCreateAgentLogs(ctx, t, db, agentB2.ID, &beforeThreshold, "agent b2 logs should be retained") + + // Workspace C was built once before the threshold, and once after. + wsC := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wbC1 := mustCreateWorkspaceBuild(t, db, org, tv, wsC.ID, beforeThreshold, 1) + wbC2 := mustCreateWorkspaceBuild(t, db, org, tv, wsC.ID, afterThreshold, 2) + agentC1 := mustCreateAgent(t, db, wbC1) + agentC2 := mustCreateAgent(t, db, wbC2) + mustCreateAgentLogs(ctx, t, db, agentC1.ID, &beforeThreshold, "agent c1 logs should be deleted") + mustCreateAgentLogs(ctx, t, db, agentC2.ID, &afterThreshold, "agent c2 logs should be retained") + + // Workspace D was built twice after the threshold. + wsD := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wbD1 := mustCreateWorkspaceBuild(t, db, org, tv, wsD.ID, afterThreshold, 1) + wbD2 := mustCreateWorkspaceBuild(t, db, org, tv, wsD.ID, afterThreshold, 2) + agentD1 := mustCreateAgent(t, db, wbD1) + agentD2 := mustCreateAgent(t, db, wbD2) + mustCreateAgentLogs(ctx, t, db, agentD1.ID, &afterThreshold, "agent d1 logs should be retained") + mustCreateAgentLogs(ctx, t, db, agentD2.ID, &afterThreshold, "agent d2 logs should be retained") + + // Workspace E was build once after threshold but never connected. + wsE := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wbE1 := mustCreateWorkspaceBuild(t, db, org, tv, wsE.ID, beforeThreshold, 1) + agentE1 := mustCreateAgent(t, db, wbE1) + mustCreateAgentLogs(ctx, t, db, agentE1.ID, nil, "agent e1 logs should be retained") + + // when dbpurge runs + + // After dbpurge completes, the ticker is reset. Trap this call. + trapReset := clk.Trap().TickerReset() + defer trapReset.Close() - //nolint:paralleltest // It uses LockIDDBPurge. - t.Run("AgentConnectedSixDaysAgo_LogsValid", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) - defer cancel() - clk := quartz.NewMock(t) - clk.Set(now).MustWait(ctx) - - // After dbpurge completes, the ticker is reset. Trap this call. - trapReset := clk.Trap().TickerReset() - defer trapReset.Close() - - // given: an agent with logs newer than threshold - agent := mustCreateAgentWithLogs(ctx, t, db, user, org, tmpl, tv, now.Add(-6*24*time.Hour), t.Name()) - - // when dbpurge runs - closer := dbpurge.New(ctx, logger, db, clk) - defer closer.Close() - - // Wait for the initial nanosecond tick. - clk.Advance(time.Nanosecond).MustWait(ctx) - - trapReset.MustWait(ctx).Release() // Wait for ticker.Reset() - d, w := clk.AdvanceNext() - require.Equal(t, 10*time.Minute, d) - - closer.Close() // doTick() has now run. - w.MustWait(ctx) - - // then the logs should still be there - agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ - AgentID: agent, - }) - require.NoError(t, err) - require.NotEmpty(t, agentLogs) - for _, al := range agentLogs { - require.Equal(t, t.Name(), al.Output) - } - }) -} + closer := dbpurge.New(ctx, logger, db, clk) + defer closer.Close() + // Wait for the initial nanosecond tick. + clk.Advance(time.Nanosecond).MustWait(ctx) -func mustCreateAgentWithLogs(ctx context.Context, t *testing.T, db database.Store, user database.User, org database.Organization, tmpl database.Template, tv database.TemplateVersion, agentLastConnectedAt time.Time, output string) uuid.UUID { - agent := mustCreateAgent(t, db, user, org, tmpl, tv) + trapReset.MustWait(ctx).Release() // Wait for ticker.Reset() + d, w := clk.AdvanceNext() + require.Equal(t, 10*time.Minute, d) + + closer.Close() // doTick() has now run. + w.MustWait(ctx) + + // then logs related to the following agents should be deleted: + // Agent A1 never connected and was created before the threshold. + assertNoWorkspaceAgentLogs(ctx, t, db, agentA1.ID) + // Agent B1 is not the latest build and the logs are from before threshold. + assertNoWorkspaceAgentLogs(ctx, t, db, agentB1.ID) + // Agent C1 is not the latest build and the logs are from before threshold. + assertNoWorkspaceAgentLogs(ctx, t, db, agentC1.ID) + + // then logs related to the following agents should be retained: + // Agent B2 is the latest build. + assertWorkspaceAgentLogs(ctx, t, db, agentB2.ID, "agent b2 logs should be retained") + // Agent C2 is the latest build. + assertWorkspaceAgentLogs(ctx, t, db, agentC2.ID, "agent c2 logs should be retained") + // Agents D1, D2, and E1 are all after threshold. + assertWorkspaceAgentLogs(ctx, t, db, agentD1.ID, "agent d1 logs should be retained") + assertWorkspaceAgentLogs(ctx, t, db, agentD2.ID, "agent d2 logs should be retained") + assertWorkspaceAgentLogs(ctx, t, db, agentE1.ID, "agent e1 logs should be retained") +} - err := db.UpdateWorkspaceAgentConnectionByID(ctx, database.UpdateWorkspaceAgentConnectionByIDParams{ - ID: agent.ID, - LastConnectedAt: sql.NullTime{Time: agentLastConnectedAt, Valid: true}, - }) - require.NoError(t, err) - _, err = db.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{ - AgentID: agent.ID, - CreatedAt: agentLastConnectedAt, - Output: []string{output}, - Level: []database.LogLevel{database.LogLevelDebug}, +func assertNoWorkspaceAgentLogs(ctx context.Context, t *testing.T, db database.Store, agentID uuid.UUID) { + t.Helper() + agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ + AgentID: agentID, + CreatedAfter: 0, }) require.NoError(t, err) - // Make sure that agent logs have been collected. + assert.Empty(t, agentLogs) +} + +func assertWorkspaceAgentLogs(ctx context.Context, t *testing.T, db database.Store, agentID uuid.UUID, msg string) { + t.Helper() agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ - AgentID: agent.ID, + AgentID: agentID, + CreatedAfter: 0, }) require.NoError(t, err) - require.NotZero(t, agentLogs, "agent logs must be present") - return agent.ID + assert.NotEmpty(t, agentLogs) + for _, al := range agentLogs { + assert.Equal(t, msg, al.Output) + } } -func mustCreateAgent(t *testing.T, db database.Store, user database.User, org database.Organization, tmpl database.Template, tv database.TemplateVersion) database.WorkspaceAgent { - workspace := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) +func mustCreateWorkspaceBuild(t *testing.T, db database.Store, org database.Organization, tv database.TemplateVersion, wsID uuid.UUID, createdAt time.Time, n int32) database.WorkspaceBuild { + t.Helper() job := dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{ + CreatedAt: createdAt, OrganizationID: org.ID, Type: database.ProvisionerJobTypeWorkspaceBuild, Provisioner: database.ProvisionerTypeEcho, StorageMethod: database.ProvisionerStorageMethodFile, }) - _ = dbgen.WorkspaceBuild(t, db, database.WorkspaceBuild{ - WorkspaceID: workspace.ID, + wb := dbgen.WorkspaceBuild(t, db, database.WorkspaceBuild{ + CreatedAt: createdAt, + WorkspaceID: wsID, JobID: job.ID, TemplateVersionID: tv.ID, Transition: database.WorkspaceTransitionStart, Reason: database.BuildReasonInitiator, + BuildNumber: n, }) + require.Equal(t, createdAt.UTC(), wb.CreatedAt.UTC()) + return wb +} + +func mustCreateAgent(t *testing.T, db database.Store, wb database.WorkspaceBuild) database.WorkspaceAgent { + t.Helper() resource := dbgen.WorkspaceResource(t, db, database.WorkspaceResource{ - JobID: job.ID, + JobID: wb.JobID, Transition: database.WorkspaceTransitionStart, + CreatedAt: wb.CreatedAt, }) - return dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{ - ResourceID: resource.ID, + wa := dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{ + ResourceID: resource.ID, + CreatedAt: wb.CreatedAt, + FirstConnectedAt: sql.NullTime{}, + DisconnectedAt: sql.NullTime{}, + LastConnectedAt: sql.NullTime{}, }) + require.Equal(t, wb.CreatedAt.UTC(), wa.CreatedAt.UTC()) + return wa +} + +func mustCreateAgentLogs(ctx context.Context, t *testing.T, db database.Store, agentID uuid.UUID, agentLastConnectedAt *time.Time, output string) uuid.UUID { + t.Helper() + if agentLastConnectedAt != nil { + require.NoError(t, db.UpdateWorkspaceAgentConnectionByID(ctx, database.UpdateWorkspaceAgentConnectionByIDParams{ + ID: agentID, + LastConnectedAt: sql.NullTime{Time: *agentLastConnectedAt, Valid: true}, + })) + } + _, err := db.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{ + AgentID: agentID, + // CreatedAt: agentLastConnectedAt, + Output: []string{output}, + Level: []database.LogLevel{database.LogLevelDebug}, + }) + require.NoError(t, err) + // Make sure that agent logs have been collected. + agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ + AgentID: agentID, + }) + require.NoError(t, err) + require.NotEmpty(t, agentLogs, "agent logs must be present") + return agentID } //nolint:paralleltest // It uses LockIDDBPurge. From 0c14a3ec492dfc9ac9f1f1b0143d64da4ba46840 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 29 Aug 2024 16:57:42 +0100 Subject: [PATCH 2/9] feat(dbpurge): update DeleteOldWorkspaceAgentLogs query to keep latest builds and delete never-connected workspaces --- coderd/database/querier.go | 1 + coderd/database/queries.sql.go | 44 +++++++++++++++++++-- coderd/database/queries/workspaceagents.sql | 44 +++++++++++++++++++-- 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 8b0ac931d9c7b..c614a03834a9b 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -88,6 +88,7 @@ type sqlcQuerier interface { // connectivity issues (no provisioner daemon activity since registration). DeleteOldProvisionerDaemons(ctx context.Context) error // If an agent hasn't connected in the last 7 days, we purge it's logs. + // Exception: if the logs are related to the latest build, we keep those around. // Logs can take up a lot of space, so it's important we clean up frequently. DeleteOldWorkspaceAgentLogs(ctx context.Context, threshold time.Time) error DeleteOldWorkspaceAgentStats(ctx context.Context) error diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index d152bbefbab80..3f7f18f4e1412 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -10482,12 +10482,50 @@ func (q *sqlQuerier) UpsertWorkspaceAgentPortShare(ctx context.Context, arg Upse } const deleteOldWorkspaceAgentLogs = `-- name: DeleteOldWorkspaceAgentLogs :exec -DELETE FROM workspace_agent_logs WHERE agent_id IN - (SELECT id FROM workspace_agents WHERE last_connected_at IS NOT NULL - AND last_connected_at < $1 :: timestamptz) +WITH + latest_builds AS ( + SELECT + workspace_id, max(build_number) AS max_build_number + FROM + workspace_builds + GROUP BY + workspace_id + ), + old_agents AS ( + SELECT + wa.id, wa.last_connected_at, wb.build_number, wb.workspace_id + FROM + workspace_agents AS wa + JOIN + workspace_resources AS wr + ON + wa.resource_id = wr.id + JOIN + workspace_builds AS wb + ON + wb.job_id = wr.job_id + LEFT JOIN + latest_builds + ON + latest_builds.workspace_id = wb.workspace_id + AND + latest_builds.max_build_number = wb.build_number + WHERE + -- Filter out the latest builds for each workspace. + latest_builds.workspace_id IS NULL + AND CASE + -- If the last time the agent connected was before @threshold + WHEN wa.last_connected_at IS NOT NULL THEN + wa.last_connected_at < $1 :: timestamptz + -- The agent never connected, and was created before @threshold + ELSE wa.created_at < $1 :: timestamptz + END + ) +DELETE FROM workspace_agent_logs WHERE agent_id IN (SELECT id FROM old_agents) ` // If an agent hasn't connected in the last 7 days, we purge it's logs. +// Exception: if the logs are related to the latest build, we keep those around. // Logs can take up a lot of space, so it's important we clean up frequently. func (q *sqlQuerier) DeleteOldWorkspaceAgentLogs(ctx context.Context, threshold time.Time) error { _, err := q.db.ExecContext(ctx, deleteOldWorkspaceAgentLogs, threshold) diff --git a/coderd/database/queries/workspaceagents.sql b/coderd/database/queries/workspaceagents.sql index 2586cbca4841c..2c8031ff490bf 100644 --- a/coderd/database/queries/workspaceagents.sql +++ b/coderd/database/queries/workspaceagents.sql @@ -188,11 +188,49 @@ INSERT INTO SELECT * FROM workspace_agent_log_sources WHERE workspace_agent_id = ANY(@ids :: uuid [ ]); -- If an agent hasn't connected in the last 7 days, we purge it's logs. +-- Exception: if the logs are related to the latest build, we keep those around. -- Logs can take up a lot of space, so it's important we clean up frequently. -- name: DeleteOldWorkspaceAgentLogs :exec -DELETE FROM workspace_agent_logs WHERE agent_id IN - (SELECT id FROM workspace_agents WHERE last_connected_at IS NOT NULL - AND last_connected_at < @threshold :: timestamptz); +WITH + latest_builds AS ( + SELECT + workspace_id, max(build_number) AS max_build_number + FROM + workspace_builds + GROUP BY + workspace_id + ), + old_agents AS ( + SELECT + wa.id, wa.last_connected_at, wb.build_number, wb.workspace_id + FROM + workspace_agents AS wa + JOIN + workspace_resources AS wr + ON + wa.resource_id = wr.id + JOIN + workspace_builds AS wb + ON + wb.job_id = wr.job_id + LEFT JOIN + latest_builds + ON + latest_builds.workspace_id = wb.workspace_id + AND + latest_builds.max_build_number = wb.build_number + WHERE + -- Filter out the latest builds for each workspace. + latest_builds.workspace_id IS NULL + AND CASE + -- If the last time the agent connected was before @threshold + WHEN wa.last_connected_at IS NOT NULL THEN + wa.last_connected_at < @threshold :: timestamptz + -- The agent never connected, and was created before @threshold + ELSE wa.created_at < @threshold :: timestamptz + END + ) +DELETE FROM workspace_agent_logs WHERE agent_id IN (SELECT id FROM old_agents); -- name: GetWorkspaceAgentsInLatestBuildByWorkspaceID :many SELECT From 0af051e17753de4f6d5b581b3b2c45808d34aef1 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 29 Aug 2024 22:22:00 +0100 Subject: [PATCH 3/9] remove unnecessary columns from CTE --- coderd/database/queries.sql.go | 2 +- coderd/database/queries/workspaceagents.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 3f7f18f4e1412..fc388e55247d0 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -10493,7 +10493,7 @@ WITH ), old_agents AS ( SELECT - wa.id, wa.last_connected_at, wb.build_number, wb.workspace_id + wa.id FROM workspace_agents AS wa JOIN diff --git a/coderd/database/queries/workspaceagents.sql b/coderd/database/queries/workspaceagents.sql index 2c8031ff490bf..9c5860bf494a6 100644 --- a/coderd/database/queries/workspaceagents.sql +++ b/coderd/database/queries/workspaceagents.sql @@ -202,7 +202,7 @@ WITH ), old_agents AS ( SELECT - wa.id, wa.last_connected_at, wb.build_number, wb.workspace_id + wa.id FROM workspace_agents AS wa JOIN From dd97358e21ccf4ff35290ec7b72265000fcb62e0 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 29 Aug 2024 22:22:22 +0100 Subject: [PATCH 4/9] fix test --- coderd/database/dbpurge/dbpurge_test.go | 63 ++++++++++++++----------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/coderd/database/dbpurge/dbpurge_test.go b/coderd/database/dbpurge/dbpurge_test.go index 9f7f241bd89aa..487471d2c8e86 100644 --- a/coderd/database/dbpurge/dbpurge_test.go +++ b/coderd/database/dbpurge/dbpurge_test.go @@ -182,13 +182,12 @@ func containsWorkspaceAgentStat(stats []database.GetWorkspaceAgentStatsRow, need //nolint:paralleltest // It uses LockIDDBPurge. func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) - defer cancel() + ctx := testutil.Context(t, testutil.WaitShort) clk := quartz.NewMock(t) now := dbtime.Now() threshold := now.Add(-7 * 24 * time.Hour) - beforeThreshold := threshold.Add(-time.Hour) - afterThreshold := threshold.Add(time.Hour) + beforeThreshold := threshold.Add(-24 * time.Hour) + afterThreshold := threshold.Add(24 * time.Hour) clk.Set(now).MustWait(ctx) db, _ := dbtestutil.NewDB(t, dbtestutil.WithDumpOnFailure()) @@ -202,44 +201,48 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { // Given the following: - // Workspace A was built once before the threshold, and never connected. - wsA := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + // Workspace A was built twice before the threshold, and never connected on + // either attempt. + wsA := dbgen.Workspace(t, db, database.Workspace{Name: "a", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) wbA1 := mustCreateWorkspaceBuild(t, db, org, tv, wsA.ID, beforeThreshold, 1) + wbA2 := mustCreateWorkspaceBuild(t, db, org, tv, wsA.ID, beforeThreshold, 2) agentA1 := mustCreateAgent(t, db, wbA1) - mustCreateAgentLogs(ctx, t, db, agentA1.ID, nil, "agent a1 logs should be deleted") + agentA2 := mustCreateAgent(t, db, wbA2) + mustCreateAgentLogs(ctx, t, db, agentA1, nil, "agent a1 logs should be deleted") + mustCreateAgentLogs(ctx, t, db, agentA2, nil, "agent a2 logs should be retained") // Workspace B was built twice before the threshold. - wsB := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wsB := dbgen.Workspace(t, db, database.Workspace{Name: "b", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) wbB1 := mustCreateWorkspaceBuild(t, db, org, tv, wsB.ID, beforeThreshold, 1) wbB2 := mustCreateWorkspaceBuild(t, db, org, tv, wsB.ID, beforeThreshold, 2) agentB1 := mustCreateAgent(t, db, wbB1) agentB2 := mustCreateAgent(t, db, wbB2) - mustCreateAgentLogs(ctx, t, db, agentB1.ID, &beforeThreshold, "agent b1 logs should be deleted") - mustCreateAgentLogs(ctx, t, db, agentB2.ID, &beforeThreshold, "agent b2 logs should be retained") + mustCreateAgentLogs(ctx, t, db, agentB1, &beforeThreshold, "agent b1 logs should be deleted") + mustCreateAgentLogs(ctx, t, db, agentB2, &beforeThreshold, "agent b2 logs should be retained") // Workspace C was built once before the threshold, and once after. - wsC := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wsC := dbgen.Workspace(t, db, database.Workspace{Name: "c", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) wbC1 := mustCreateWorkspaceBuild(t, db, org, tv, wsC.ID, beforeThreshold, 1) wbC2 := mustCreateWorkspaceBuild(t, db, org, tv, wsC.ID, afterThreshold, 2) agentC1 := mustCreateAgent(t, db, wbC1) agentC2 := mustCreateAgent(t, db, wbC2) - mustCreateAgentLogs(ctx, t, db, agentC1.ID, &beforeThreshold, "agent c1 logs should be deleted") - mustCreateAgentLogs(ctx, t, db, agentC2.ID, &afterThreshold, "agent c2 logs should be retained") + mustCreateAgentLogs(ctx, t, db, agentC1, &beforeThreshold, "agent c1 logs should be deleted") + mustCreateAgentLogs(ctx, t, db, agentC2, &afterThreshold, "agent c2 logs should be retained") // Workspace D was built twice after the threshold. - wsD := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wsD := dbgen.Workspace(t, db, database.Workspace{Name: "d", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) wbD1 := mustCreateWorkspaceBuild(t, db, org, tv, wsD.ID, afterThreshold, 1) wbD2 := mustCreateWorkspaceBuild(t, db, org, tv, wsD.ID, afterThreshold, 2) agentD1 := mustCreateAgent(t, db, wbD1) agentD2 := mustCreateAgent(t, db, wbD2) - mustCreateAgentLogs(ctx, t, db, agentD1.ID, &afterThreshold, "agent d1 logs should be retained") - mustCreateAgentLogs(ctx, t, db, agentD2.ID, &afterThreshold, "agent d2 logs should be retained") + mustCreateAgentLogs(ctx, t, db, agentD1, &afterThreshold, "agent d1 logs should be retained") + mustCreateAgentLogs(ctx, t, db, agentD2, &afterThreshold, "agent d2 logs should be retained") // Workspace E was build once after threshold but never connected. - wsE := dbgen.Workspace(t, db, database.Workspace{OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) + wsE := dbgen.Workspace(t, db, database.Workspace{Name: "e", OwnerID: user.ID, OrganizationID: org.ID, TemplateID: tmpl.ID}) wbE1 := mustCreateWorkspaceBuild(t, db, org, tv, wsE.ID, beforeThreshold, 1) agentE1 := mustCreateAgent(t, db, wbE1) - mustCreateAgentLogs(ctx, t, db, agentE1.ID, nil, "agent e1 logs should be retained") + mustCreateAgentLogs(ctx, t, db, agentE1, nil, "agent e1 logs should be retained") // when dbpurge runs @@ -260,7 +263,8 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { w.MustWait(ctx) // then logs related to the following agents should be deleted: - // Agent A1 never connected and was created before the threshold. + // Agent A1 never connected, was created before the threshold, and is not the + // latest build. assertNoWorkspaceAgentLogs(ctx, t, db, agentA1.ID) // Agent B1 is not the latest build and the logs are from before threshold. assertNoWorkspaceAgentLogs(ctx, t, db, agentB1.ID) @@ -268,6 +272,8 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { assertNoWorkspaceAgentLogs(ctx, t, db, agentC1.ID) // then logs related to the following agents should be retained: + // Agent A2 is the latest build. + assertWorkspaceAgentLogs(ctx, t, db, agentA2.ID, "agent a2 logs should be retained") // Agent B2 is the latest build. assertWorkspaceAgentLogs(ctx, t, db, agentB2.ID, "agent b2 logs should be retained") // Agent C2 is the latest build. @@ -331,7 +337,11 @@ func mustCreateAgent(t *testing.T, db database.Store, wb database.WorkspaceBuild CreatedAt: wb.CreatedAt, }) + ws, err := db.GetWorkspaceByID(context.Background(), wb.WorkspaceID) + require.NoError(t, err) + wa := dbgen.WorkspaceAgent(t, db, database.WorkspaceAgent{ + Name: fmt.Sprintf("%s%d", ws.Name, wb.BuildNumber), ResourceID: resource.ID, CreatedAt: wb.CreatedAt, FirstConnectedAt: sql.NullTime{}, @@ -342,28 +352,27 @@ func mustCreateAgent(t *testing.T, db database.Store, wb database.WorkspaceBuild return wa } -func mustCreateAgentLogs(ctx context.Context, t *testing.T, db database.Store, agentID uuid.UUID, agentLastConnectedAt *time.Time, output string) uuid.UUID { +func mustCreateAgentLogs(ctx context.Context, t *testing.T, db database.Store, agent database.WorkspaceAgent, agentLastConnectedAt *time.Time, output string) { t.Helper() if agentLastConnectedAt != nil { require.NoError(t, db.UpdateWorkspaceAgentConnectionByID(ctx, database.UpdateWorkspaceAgentConnectionByIDParams{ - ID: agentID, + ID: agent.ID, LastConnectedAt: sql.NullTime{Time: *agentLastConnectedAt, Valid: true}, })) } _, err := db.InsertWorkspaceAgentLogs(ctx, database.InsertWorkspaceAgentLogsParams{ - AgentID: agentID, - // CreatedAt: agentLastConnectedAt, - Output: []string{output}, - Level: []database.LogLevel{database.LogLevelDebug}, + AgentID: agent.ID, + CreatedAt: agent.CreatedAt, + Output: []string{output}, + Level: []database.LogLevel{database.LogLevelDebug}, }) require.NoError(t, err) // Make sure that agent logs have been collected. agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ - AgentID: agentID, + AgentID: agent.ID, }) require.NoError(t, err) require.NotEmpty(t, agentLogs, "agent logs must be present") - return agentID } //nolint:paralleltest // It uses LockIDDBPurge. From 2017434b6d29c74d0a6f9cf1cabee6ed979eb178 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 29 Aug 2024 22:22:38 +0100 Subject: [PATCH 5/9] implement dbmem --- coderd/database/dbmem/dbmem.go | 91 ++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 10 deletions(-) diff --git a/coderd/database/dbmem/dbmem.go b/coderd/database/dbmem/dbmem.go index 9037e2ddaef38..b1d2178e66a29 100644 --- a/coderd/database/dbmem/dbmem.go +++ b/coderd/database/dbmem/dbmem.go @@ -1710,19 +1710,90 @@ func (q *FakeQuerier) DeleteOldWorkspaceAgentLogs(_ context.Context, threshold t q.mutex.Lock() defer q.mutex.Unlock() - var validLogs []database.WorkspaceAgentLog - for _, log := range q.workspaceAgentLogs { - var toBeDeleted bool - for _, agent := range q.workspaceAgents { - if agent.ID == log.AgentID && agent.LastConnectedAt.Valid && agent.LastConnectedAt.Time.Before(threshold) { - toBeDeleted = true - break - } + /* + WITH + latest_builds AS ( + SELECT + workspace_id, max(build_number) AS max_build_number + FROM + workspace_builds + GROUP BY + workspace_id + ), + */ + latestBuilds := make(map[uuid.UUID]int32) + for _, wb := range q.workspaceBuilds { + if lastBuildNumber, found := latestBuilds[wb.WorkspaceID]; found && lastBuildNumber > wb.BuildNumber { + continue } + // not found or newer build number + latestBuilds[wb.WorkspaceID] = wb.BuildNumber + } - if !toBeDeleted { - validLogs = append(validLogs, log) + /* + old_agents AS ( + SELECT + wa.id + FROM + workspace_agents AS wa + JOIN + workspace_resources AS wr + ON + wa.resource_id = wr.id + JOIN + workspace_builds AS wb + ON + wb.job_id = wr.job_id + LEFT JOIN + latest_builds + ON + latest_builds.workspace_id = wb.workspace_id + AND + latest_builds.max_build_number = wb.build_number + WHERE + -- Filter out the latest builds for each workspace. + latest_builds.workspace_id IS NULL + AND CASE + -- If the last time the agent connected was before @threshold + WHEN wa.last_connected_at IS NOT NULL THEN + wa.last_connected_at < @threshold :: timestamptz + -- The agent never connected, and was created before @threshold + ELSE wa.created_at < @threshold :: timestamptz + END + ) + */ + oldAgents := make(map[uuid.UUID]struct{}) + for _, wa := range q.workspaceAgents { + for _, wr := range q.workspaceResources { + if wr.ID != wa.ResourceID { + continue + } + for _, wb := range q.workspaceBuilds { + if wb.JobID != wr.JobID { + continue + } + latestBuildNumber, found := latestBuilds[wb.WorkspaceID] + if !found { + panic("workspaceBuilds got modified somehow while q was locked! This is a bug in dbmem!") + } + if latestBuildNumber == wb.BuildNumber { + continue + } + if wa.LastConnectedAt.Valid && wa.LastConnectedAt.Time.Before(threshold) || wa.CreatedAt.Before(threshold) { + oldAgents[wa.ID] = struct{}{} + } + } + } + } + /* + DELETE FROM workspace_agent_logs WHERE agent_id IN (SELECT id FROM old_agents); + */ + var validLogs []database.WorkspaceAgentLog + for _, log := range q.workspaceAgentLogs { + if _, found := oldAgents[log.AgentID]; found { + continue } + validLogs = append(validLogs, log) } q.workspaceAgentLogs = validLogs return nil From 725eeb0ee651acf2835ab499269f5c72be0b5a98 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 29 Aug 2024 22:25:13 +0100 Subject: [PATCH 6/9] ensure that ticks from quartz are dbtimey --- coderd/database/dbpurge/dbpurge.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/coderd/database/dbpurge/dbpurge.go b/coderd/database/dbpurge/dbpurge.go index 08929a08e4483..4b24336b15722 100644 --- a/coderd/database/dbpurge/dbpurge.go +++ b/coderd/database/dbpurge/dbpurge.go @@ -11,6 +11,7 @@ import ( "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" + "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/quartz" ) @@ -47,7 +48,8 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz. return nil } - if err := tx.DeleteOldWorkspaceAgentLogs(ctx, start.Add(-maxAgentLogAge)); err != nil { + deleteOldWorkspaceAgentLogsBefore := start.Add(-maxAgentLogAge) + if err := tx.DeleteOldWorkspaceAgentLogs(ctx, deleteOldWorkspaceAgentLogsBefore); err != nil { return xerrors.Errorf("failed to delete old workspace agent logs: %w", err) } if err := tx.DeleteOldWorkspaceAgentStats(ctx); err != nil { @@ -78,7 +80,7 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz. return case tick := <-ticker.C: ticker.Stop() - doTick(tick) + doTick(dbtime.Time(tick).UTC()) } } }() From 591dd9106bb444021339a5ed7a23c5a9710d38b4 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 30 Aug 2024 11:52:10 +0100 Subject: [PATCH 7/9] extract awaitDoTick() test helper --- coderd/database/dbpurge/dbpurge_test.go | 56 +++++++++++++++---------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/coderd/database/dbpurge/dbpurge_test.go b/coderd/database/dbpurge/dbpurge_test.go index 487471d2c8e86..34e0e040698cf 100644 --- a/coderd/database/dbpurge/dbpurge_test.go +++ b/coderd/database/dbpurge/dbpurge_test.go @@ -43,20 +43,11 @@ func TestPurge(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) defer cancel() - clk := quartz.NewMock(t) - // We want to make sure dbpurge is actually started so that this test is meaningful. - trapStop := clk.Trap().TickerStop() - + clk := quartz.NewMock(t) + done := awaitDoTick(ctx, t, clk) purger := dbpurge.New(context.Background(), slogtest.Make(t, nil), dbmem.New(), clk) - - // Wait for the initial nanosecond tick. - clk.Advance(time.Nanosecond).MustWait(ctx) - // Wait for ticker.Stop call that happens in the goroutine. - trapStop.MustWait(ctx).Release() - // Stop the trap now to avoid blocking further. - trapStop.Close() - + <-done // wait for doTick() to run. require.NoError(t, purger.Close()) } @@ -247,20 +238,11 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { // when dbpurge runs // After dbpurge completes, the ticker is reset. Trap this call. - trapReset := clk.Trap().TickerReset() - defer trapReset.Close() + done := awaitDoTick(ctx, t, clk) closer := dbpurge.New(ctx, logger, db, clk) defer closer.Close() - // Wait for the initial nanosecond tick. - clk.Advance(time.Nanosecond).MustWait(ctx) - - trapReset.MustWait(ctx).Release() // Wait for ticker.Reset() - d, w := clk.AdvanceNext() - require.Equal(t, 10*time.Minute, d) - - closer.Close() // doTick() has now run. - w.MustWait(ctx) + <-done // doTick() has now run. // then logs related to the following agents should be deleted: // Agent A1 never connected, was created before the threshold, and is not the @@ -284,6 +266,34 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { assertWorkspaceAgentLogs(ctx, t, db, agentE1.ID, "agent e1 logs should be retained") } +func awaitDoTick(ctx context.Context, t *testing.T, clk *quartz.Mock) chan struct{} { + t.Helper() + ch := make(chan struct{}) + trapStop := clk.Trap().TickerStop() + trapReset := clk.Trap().TickerReset() + go func() { + defer close(ch) + defer trapStop.Close() + defer trapReset.Close() + // Wait for the initial nanosecond tick. + trapReset.MustWait(ctx).Release() + clk.Advance(time.Nanosecond).MustWait(ctx) + // Wait for the ticker stop event. + trapStop.MustWait(ctx).Release() + // doTick runs here. Wait for the next + // ticker reset event that signifies it's completed. + trapReset.MustWait(ctx).Release() + // Ensure that the duration is reset to the original delay. + d, w := clk.AdvanceNext() + assert.Equal(t, 10*time.Minute, d) + if !assert.NoError(t, w.Wait(ctx)) { + return + } + }() + + return ch +} + func assertNoWorkspaceAgentLogs(ctx context.Context, t *testing.T, db database.Store, agentID uuid.UUID) { t.Helper() agentLogs, err := db.GetWorkspaceAgentLogsAfter(ctx, database.GetWorkspaceAgentLogsAfterParams{ From 0f8ab25630db572c4786940af48179bd260c0027 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 30 Aug 2024 11:52:21 +0100 Subject: [PATCH 8/9] address race condition in initial tick --- coderd/database/dbpurge/dbpurge.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/coderd/database/dbpurge/dbpurge.go b/coderd/database/dbpurge/dbpurge.go index 4b24336b15722..e9d1802d9632f 100644 --- a/coderd/database/dbpurge/dbpurge.go +++ b/coderd/database/dbpurge/dbpurge.go @@ -31,7 +31,8 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz. //nolint:gocritic // The system purges old db records without user input. ctx = dbauthz.AsSystemRestricted(ctx) - ticker := clk.NewTicker(time.Nanosecond) + // Start the ticker with the initial delay. + ticker := clk.NewTicker(delay) doTick := func(start time.Time) { defer ticker.Reset(delay) // Start a transaction to grab advisory lock, we don't want to run @@ -74,6 +75,8 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz. go func() { defer close(closed) defer ticker.Stop() + // Force an initial tick immediately. + ticker.Reset(time.Nanosecond) for { select { case <-ctx.Done(): From 5c77b5e04358db66bd7dbe7a4bd19dae86e0d6de Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 30 Aug 2024 17:24:34 +0100 Subject: [PATCH 9/9] fix double-tick startup race --- coderd/database/dbpurge/dbpurge.go | 4 ++-- coderd/database/dbpurge/dbpurge_test.go | 19 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/coderd/database/dbpurge/dbpurge.go b/coderd/database/dbpurge/dbpurge.go index e9d1802d9632f..00244cfd63533 100644 --- a/coderd/database/dbpurge/dbpurge.go +++ b/coderd/database/dbpurge/dbpurge.go @@ -75,8 +75,8 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, clk quartz. go func() { defer close(closed) defer ticker.Stop() - // Force an initial tick immediately. - ticker.Reset(time.Nanosecond) + // Force an initial tick. + doTick(dbtime.Time(clk.Now()).UTC()) for { select { case <-ctx.Done(): diff --git a/coderd/database/dbpurge/dbpurge_test.go b/coderd/database/dbpurge/dbpurge_test.go index 34e0e040698cf..c4fdb27eb1e46 100644 --- a/coderd/database/dbpurge/dbpurge_test.go +++ b/coderd/database/dbpurge/dbpurge_test.go @@ -269,26 +269,27 @@ func TestDeleteOldWorkspaceAgentLogs(t *testing.T) { func awaitDoTick(ctx context.Context, t *testing.T, clk *quartz.Mock) chan struct{} { t.Helper() ch := make(chan struct{}) + trapNow := clk.Trap().Now() trapStop := clk.Trap().TickerStop() trapReset := clk.Trap().TickerReset() go func() { defer close(ch) - defer trapStop.Close() defer trapReset.Close() - // Wait for the initial nanosecond tick. - trapReset.MustWait(ctx).Release() - clk.Advance(time.Nanosecond).MustWait(ctx) - // Wait for the ticker stop event. - trapStop.MustWait(ctx).Release() + defer trapStop.Close() + defer trapNow.Close() + // Wait for the initial tick signified by a call to Now(). + trapNow.MustWait(ctx).Release() // doTick runs here. Wait for the next // ticker reset event that signifies it's completed. trapReset.MustWait(ctx).Release() - // Ensure that the duration is reset to the original delay. + // Ensure that the next tick happens in 10 minutes from start. d, w := clk.AdvanceNext() - assert.Equal(t, 10*time.Minute, d) - if !assert.NoError(t, w.Wait(ctx)) { + if !assert.Equal(t, 10*time.Minute, d) { return } + w.MustWait(ctx) + // Wait for the ticker stop event. + trapStop.MustWait(ctx).Release() }() return ch