From 0400a53d1322830456a11ffc6d84a4d800c0b7f5 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 29 May 2024 15:51:12 +0000 Subject: [PATCH 1/6] chore: remove stats batcher --- cli/server.go | 11 - coderd/agentapi/stats.go | 6 - coderd/agentapi/stats_test.go | 45 --- coderd/batchstats/batcher.go | 302 ------------------ coderd/batchstats/batcher_internal_test.go | 228 ------------- coderd/coderd.go | 7 - coderd/coderdtest/coderdtest.go | 18 +- coderd/insights_test.go | 99 +++--- .../prometheusmetrics_test.go | 22 +- coderd/workspacestats/reporter.go | 43 ++- 10 files changed, 91 insertions(+), 690 deletions(-) delete mode 100644 coderd/batchstats/batcher.go delete mode 100644 coderd/batchstats/batcher_internal_test.go diff --git a/cli/server.go b/cli/server.go index 3706b2ee1bc92..ab456220e911d 100644 --- a/cli/server.go +++ b/cli/server.go @@ -62,7 +62,6 @@ import ( "github.com/coder/coder/v2/cli/config" "github.com/coder/coder/v2/coderd" "github.com/coder/coder/v2/coderd/autobuild" - "github.com/coder/coder/v2/coderd/batchstats" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/awsiamrds" "github.com/coder/coder/v2/coderd/database/dbmem" @@ -869,16 +868,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. options.SwaggerEndpoint = vals.Swagger.Enable.Value() } - batcher, closeBatcher, err := batchstats.New(ctx, - batchstats.WithLogger(options.Logger.Named("batchstats")), - batchstats.WithStore(options.Database), - ) - if err != nil { - return xerrors.Errorf("failed to create agent stats batcher: %w", err) - } - options.StatsBatcher = batcher - defer closeBatcher() - // We use a separate coderAPICloser so the Enterprise API // can have its own close functions. This is cleaner // than abstracting the Coder API itself. diff --git a/coderd/agentapi/stats.go b/coderd/agentapi/stats.go index ee17897572f3d..a167fb5d6f275 100644 --- a/coderd/agentapi/stats.go +++ b/coderd/agentapi/stats.go @@ -7,8 +7,6 @@ import ( "golang.org/x/xerrors" "google.golang.org/protobuf/types/known/durationpb" - "github.com/google/uuid" - "cdr.dev/slog" agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/database" @@ -16,10 +14,6 @@ import ( "github.com/coder/coder/v2/coderd/workspacestats" ) -type StatsBatcher interface { - Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats) error -} - type StatsAPI struct { AgentFn func(context.Context) (database.WorkspaceAgent, error) Database database.Store diff --git a/coderd/agentapi/stats_test.go b/coderd/agentapi/stats_test.go index c304dea93ecc9..5c372eecdefe4 100644 --- a/coderd/agentapi/stats_test.go +++ b/coderd/agentapi/stats_test.go @@ -3,7 +3,6 @@ package agentapi_test import ( "context" "database/sql" - "sync" "sync/atomic" "testing" "time" @@ -27,33 +26,6 @@ import ( "github.com/coder/coder/v2/testutil" ) -type statsBatcher struct { - mu sync.Mutex - - called int64 - lastTime time.Time - lastAgentID uuid.UUID - lastTemplateID uuid.UUID - lastUserID uuid.UUID - lastWorkspaceID uuid.UUID - lastStats *agentproto.Stats -} - -var _ agentapi.StatsBatcher = &statsBatcher{} - -func (b *statsBatcher) Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats) error { - b.mu.Lock() - defer b.mu.Unlock() - b.called++ - b.lastTime = now - b.lastAgentID = agentID - b.lastTemplateID = templateID - b.lastUserID = userID - b.lastWorkspaceID = workspaceID - b.lastStats = st - return nil -} - func TestUpdateStates(t *testing.T) { t.Parallel() @@ -94,7 +66,6 @@ func TestUpdateStates(t *testing.T) { panic("not implemented") }, } - batcher = &statsBatcher{} updateAgentMetricsFnCalled = false req = &agentproto.UpdateStatsRequest{ @@ -134,7 +105,6 @@ func TestUpdateStates(t *testing.T) { StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ Database: dbM, Pubsub: ps, - StatsBatcher: batcher, TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) { updateAgentMetricsFnCalled = true @@ -187,16 +157,6 @@ func TestUpdateStates(t *testing.T) { require.Equal(t, &agentproto.UpdateStatsResponse{ ReportInterval: durationpb.New(10 * time.Second), }, resp) - - batcher.mu.Lock() - defer batcher.mu.Unlock() - require.Equal(t, int64(1), batcher.called) - require.Equal(t, now, batcher.lastTime) - require.Equal(t, agent.ID, batcher.lastAgentID) - require.Equal(t, template.ID, batcher.lastTemplateID) - require.Equal(t, user.ID, batcher.lastUserID) - require.Equal(t, workspace.ID, batcher.lastWorkspaceID) - require.Equal(t, req.Stats, batcher.lastStats) ctx := testutil.Context(t, testutil.WaitShort) select { case <-ctx.Done(): @@ -222,7 +182,6 @@ func TestUpdateStates(t *testing.T) { panic("not implemented") }, } - batcher = &statsBatcher{} req = &agentproto.UpdateStatsRequest{ Stats: &agentproto.Stats{ @@ -240,7 +199,6 @@ func TestUpdateStates(t *testing.T) { StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ Database: dbM, Pubsub: ps, - StatsBatcher: batcher, TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), // Ignored when nil. UpdateAgentMetricsFn: nil, @@ -285,7 +243,6 @@ func TestUpdateStates(t *testing.T) { StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ Database: dbM, Pubsub: ps, - StatsBatcher: nil, // should not be called TemplateScheduleStore: nil, // should not be called UpdateAgentMetricsFn: nil, // should not be called }), @@ -336,7 +293,6 @@ func TestUpdateStates(t *testing.T) { panic("not implemented") }, } - batcher = &statsBatcher{} updateAgentMetricsFnCalled = false req = &agentproto.UpdateStatsRequest{ @@ -357,7 +313,6 @@ func TestUpdateStates(t *testing.T) { StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{ Database: dbM, Pubsub: ps, - StatsBatcher: batcher, TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore), UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) { updateAgentMetricsFnCalled = true diff --git a/coderd/batchstats/batcher.go b/coderd/batchstats/batcher.go deleted file mode 100644 index bbff38b0413c0..0000000000000 --- a/coderd/batchstats/batcher.go +++ /dev/null @@ -1,302 +0,0 @@ -package batchstats - -import ( - "context" - "encoding/json" - "os" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - "golang.org/x/xerrors" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/sloghuman" - agentproto "github.com/coder/coder/v2/agent/proto" - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbauthz" - "github.com/coder/coder/v2/coderd/database/dbtime" -) - -const ( - defaultBufferSize = 1024 - defaultFlushInterval = time.Second -) - -// Batcher holds a buffer of agent stats and periodically flushes them to -// its configured store. It also updates the workspace's last used time. -type Batcher struct { - store database.Store - log slog.Logger - - mu sync.Mutex - // TODO: make this a buffered chan instead? - buf *database.InsertWorkspaceAgentStatsParams - // NOTE: we batch this separately as it's a jsonb field and - // pq.Array + unnest doesn't play nicely with this. - connectionsByProto []map[string]int64 - batchSize int - - // tickCh is used to periodically flush the buffer. - tickCh <-chan time.Time - ticker *time.Ticker - interval time.Duration - // flushLever is used to signal the flusher to flush the buffer immediately. - flushLever chan struct{} - flushForced atomic.Bool - // flushed is used during testing to signal that a flush has completed. - flushed chan<- int -} - -// Option is a functional option for configuring a Batcher. -type Option func(b *Batcher) - -// WithStore sets the store to use for storing stats. -func WithStore(store database.Store) Option { - return func(b *Batcher) { - b.store = store - } -} - -// WithBatchSize sets the number of stats to store in a batch. -func WithBatchSize(size int) Option { - return func(b *Batcher) { - b.batchSize = size - } -} - -// WithInterval sets the interval for flushes. -func WithInterval(d time.Duration) Option { - return func(b *Batcher) { - b.interval = d - } -} - -// WithLogger sets the logger to use for logging. -func WithLogger(log slog.Logger) Option { - return func(b *Batcher) { - b.log = log - } -} - -// New creates a new Batcher and starts it. -func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) { - b := &Batcher{} - b.log = slog.Make(sloghuman.Sink(os.Stderr)) - b.flushLever = make(chan struct{}, 1) // Buffered so that it doesn't block. - for _, opt := range opts { - opt(b) - } - - if b.store == nil { - return nil, nil, xerrors.Errorf("no store configured for batcher") - } - - if b.interval == 0 { - b.interval = defaultFlushInterval - } - - if b.batchSize == 0 { - b.batchSize = defaultBufferSize - } - - if b.tickCh == nil { - b.ticker = time.NewTicker(b.interval) - b.tickCh = b.ticker.C - } - - b.initBuf(b.batchSize) - - cancelCtx, cancelFunc := context.WithCancel(ctx) - done := make(chan struct{}) - go func() { - b.run(cancelCtx) - close(done) - }() - - closer := func() { - cancelFunc() - if b.ticker != nil { - b.ticker.Stop() - } - <-done - } - - return b, closer, nil -} - -// Add adds a stat to the batcher for the given workspace and agent. -func (b *Batcher) Add( - now time.Time, - agentID uuid.UUID, - templateID uuid.UUID, - userID uuid.UUID, - workspaceID uuid.UUID, - st *agentproto.Stats, -) error { - b.mu.Lock() - defer b.mu.Unlock() - - now = dbtime.Time(now) - - b.buf.ID = append(b.buf.ID, uuid.New()) - b.buf.CreatedAt = append(b.buf.CreatedAt, now) - b.buf.AgentID = append(b.buf.AgentID, agentID) - b.buf.UserID = append(b.buf.UserID, userID) - b.buf.TemplateID = append(b.buf.TemplateID, templateID) - b.buf.WorkspaceID = append(b.buf.WorkspaceID, workspaceID) - - // Store the connections by proto separately as it's a jsonb field. We marshal on flush. - // b.buf.ConnectionsByProto = append(b.buf.ConnectionsByProto, st.ConnectionsByProto) - b.connectionsByProto = append(b.connectionsByProto, st.ConnectionsByProto) - - b.buf.ConnectionCount = append(b.buf.ConnectionCount, st.ConnectionCount) - b.buf.RxPackets = append(b.buf.RxPackets, st.RxPackets) - b.buf.RxBytes = append(b.buf.RxBytes, st.RxBytes) - b.buf.TxPackets = append(b.buf.TxPackets, st.TxPackets) - b.buf.TxBytes = append(b.buf.TxBytes, st.TxBytes) - b.buf.SessionCountVSCode = append(b.buf.SessionCountVSCode, st.SessionCountVscode) - b.buf.SessionCountJetBrains = append(b.buf.SessionCountJetBrains, st.SessionCountJetbrains) - b.buf.SessionCountReconnectingPTY = append(b.buf.SessionCountReconnectingPTY, st.SessionCountReconnectingPty) - b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSsh) - b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMs) - - // If the buffer is over 80% full, signal the flusher to flush immediately. - // We want to trigger flushes early to reduce the likelihood of - // accidentally growing the buffer over batchSize. - filled := float64(len(b.buf.ID)) / float64(b.batchSize) - if filled >= 0.8 && !b.flushForced.Load() { - b.flushLever <- struct{}{} - b.flushForced.Store(true) - } - return nil -} - -// Run runs the batcher. -func (b *Batcher) run(ctx context.Context) { - // nolint:gocritic // This is only ever used for one thing - inserting agent stats. - authCtx := dbauthz.AsSystemRestricted(ctx) - for { - select { - case <-b.tickCh: - b.flush(authCtx, false, "scheduled") - case <-b.flushLever: - // If the flush lever is depressed, flush the buffer immediately. - b.flush(authCtx, true, "reaching capacity") - case <-ctx.Done(): - b.log.Debug(ctx, "context done, flushing before exit") - - // We must create a new context here as the parent context is done. - ctxTimeout, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() //nolint:revive // We're returning, defer is fine. - - // nolint:gocritic // This is only ever used for one thing - inserting agent stats. - b.flush(dbauthz.AsSystemRestricted(ctxTimeout), true, "exit") - return - } - } -} - -// flush flushes the batcher's buffer. -func (b *Batcher) flush(ctx context.Context, forced bool, reason string) { - b.mu.Lock() - b.flushForced.Store(true) - start := time.Now() - count := len(b.buf.ID) - defer func() { - b.flushForced.Store(false) - b.mu.Unlock() - if count > 0 { - elapsed := time.Since(start) - b.log.Debug(ctx, "flush complete", - slog.F("count", count), - slog.F("elapsed", elapsed), - slog.F("forced", forced), - slog.F("reason", reason), - ) - } - // Notify that a flush has completed. This only happens in tests. - if b.flushed != nil { - select { - case <-ctx.Done(): - close(b.flushed) - default: - b.flushed <- count - } - } - }() - - if len(b.buf.ID) == 0 { - return - } - - // marshal connections by proto - payload, err := json.Marshal(b.connectionsByProto) - if err != nil { - b.log.Error(ctx, "unable to marshal agent connections by proto, dropping data", slog.Error(err)) - b.buf.ConnectionsByProto = json.RawMessage(`[]`) - } else { - b.buf.ConnectionsByProto = payload - } - - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - err = b.store.InsertWorkspaceAgentStats(ctx, *b.buf) - elapsed := time.Since(start) - if err != nil { - if database.IsQueryCanceledError(err) { - b.log.Debug(ctx, "query canceled, skipping insert of workspace agent stats", slog.F("elapsed", elapsed)) - return - } - b.log.Error(ctx, "error inserting workspace agent stats", slog.Error(err), slog.F("elapsed", elapsed)) - return - } - - b.resetBuf() -} - -// initBuf resets the buffer. b MUST be locked. -func (b *Batcher) initBuf(size int) { - b.buf = &database.InsertWorkspaceAgentStatsParams{ - ID: make([]uuid.UUID, 0, b.batchSize), - CreatedAt: make([]time.Time, 0, b.batchSize), - UserID: make([]uuid.UUID, 0, b.batchSize), - WorkspaceID: make([]uuid.UUID, 0, b.batchSize), - TemplateID: make([]uuid.UUID, 0, b.batchSize), - AgentID: make([]uuid.UUID, 0, b.batchSize), - ConnectionsByProto: json.RawMessage("[]"), - ConnectionCount: make([]int64, 0, b.batchSize), - RxPackets: make([]int64, 0, b.batchSize), - RxBytes: make([]int64, 0, b.batchSize), - TxPackets: make([]int64, 0, b.batchSize), - TxBytes: make([]int64, 0, b.batchSize), - SessionCountVSCode: make([]int64, 0, b.batchSize), - SessionCountJetBrains: make([]int64, 0, b.batchSize), - SessionCountReconnectingPTY: make([]int64, 0, b.batchSize), - SessionCountSSH: make([]int64, 0, b.batchSize), - ConnectionMedianLatencyMS: make([]float64, 0, b.batchSize), - } - - b.connectionsByProto = make([]map[string]int64, 0, size) -} - -func (b *Batcher) resetBuf() { - b.buf.ID = b.buf.ID[:0] - b.buf.CreatedAt = b.buf.CreatedAt[:0] - b.buf.UserID = b.buf.UserID[:0] - b.buf.WorkspaceID = b.buf.WorkspaceID[:0] - b.buf.TemplateID = b.buf.TemplateID[:0] - b.buf.AgentID = b.buf.AgentID[:0] - b.buf.ConnectionsByProto = json.RawMessage(`[]`) - b.buf.ConnectionCount = b.buf.ConnectionCount[:0] - b.buf.RxPackets = b.buf.RxPackets[:0] - b.buf.RxBytes = b.buf.RxBytes[:0] - b.buf.TxPackets = b.buf.TxPackets[:0] - b.buf.TxBytes = b.buf.TxBytes[:0] - b.buf.SessionCountVSCode = b.buf.SessionCountVSCode[:0] - b.buf.SessionCountJetBrains = b.buf.SessionCountJetBrains[:0] - b.buf.SessionCountReconnectingPTY = b.buf.SessionCountReconnectingPTY[:0] - b.buf.SessionCountSSH = b.buf.SessionCountSSH[:0] - b.buf.ConnectionMedianLatencyMS = b.buf.ConnectionMedianLatencyMS[:0] - b.connectionsByProto = b.connectionsByProto[:0] -} diff --git a/coderd/batchstats/batcher_internal_test.go b/coderd/batchstats/batcher_internal_test.go deleted file mode 100644 index 8954fa5455fcd..0000000000000 --- a/coderd/batchstats/batcher_internal_test.go +++ /dev/null @@ -1,228 +0,0 @@ -package batchstats - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/slogtest" - - agentproto "github.com/coder/coder/v2/agent/proto" - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbgen" - "github.com/coder/coder/v2/coderd/database/dbtestutil" - "github.com/coder/coder/v2/coderd/database/dbtime" - "github.com/coder/coder/v2/coderd/database/pubsub" - "github.com/coder/coder/v2/coderd/rbac" - "github.com/coder/coder/v2/cryptorand" -) - -func TestBatchStats(t *testing.T) { - t.Parallel() - - // Given: a fresh batcher with no data - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - log := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) - store, ps := dbtestutil.NewDB(t) - - // Set up some test dependencies. - deps1 := setupDeps(t, store, ps) - deps2 := setupDeps(t, store, ps) - tick := make(chan time.Time) - flushed := make(chan int, 1) - - b, closer, err := New(ctx, - WithStore(store), - WithLogger(log), - func(b *Batcher) { - b.tickCh = tick - b.flushed = flushed - }, - ) - require.NoError(t, err) - t.Cleanup(closer) - - // Given: no data points are added for workspace - // When: it becomes time to report stats - t1 := dbtime.Now() - // Signal a tick and wait for a flush to complete. - tick <- t1 - f := <-flushed - require.Equal(t, 0, f, "expected no data to be flushed") - t.Logf("flush 1 completed") - - // Then: it should report no stats. - stats, err := store.GetWorkspaceAgentStats(ctx, t1) - require.NoError(t, err, "should not error getting stats") - require.Empty(t, stats, "should have no stats for workspace") - - // Given: a single data point is added for workspace - t2 := t1.Add(time.Second) - t.Logf("inserting 1 stat") - require.NoError(t, b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t))) - - // When: it becomes time to report stats - // Signal a tick and wait for a flush to complete. - tick <- t2 - f = <-flushed // Wait for a flush to complete. - require.Equal(t, 1, f, "expected one stat to be flushed") - t.Logf("flush 2 completed") - - // Then: it should report a single stat. - stats, err = store.GetWorkspaceAgentStats(ctx, t2) - require.NoError(t, err, "should not error getting stats") - require.Len(t, stats, 1, "should have stats for workspace") - - // Given: a lot of data points are added for both workspaces - // (equal to batch size) - t3 := t2.Add(time.Second) - done := make(chan struct{}) - - go func() { - defer close(done) - t.Logf("inserting %d stats", defaultBufferSize) - for i := 0; i < defaultBufferSize; i++ { - if i%2 == 0 { - require.NoError(t, b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t))) - } else { - require.NoError(t, b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randStats(t))) - } - } - }() - - // When: the buffer comes close to capacity - // Then: The buffer will force-flush once. - f = <-flushed - t.Logf("flush 3 completed") - require.Greater(t, f, 819, "expected at least 819 stats to be flushed (>=80% of buffer)") - // And we should finish inserting the stats - <-done - - stats, err = store.GetWorkspaceAgentStats(ctx, t3) - require.NoError(t, err, "should not error getting stats") - require.Len(t, stats, 2, "should have stats for both workspaces") - - // Ensures that a subsequent flush pushes all the remaining data - t4 := t3.Add(time.Second) - tick <- t4 - f2 := <-flushed - t.Logf("flush 4 completed") - expectedCount := defaultBufferSize - f - require.Equal(t, expectedCount, f2, "did not flush expected remaining rows") - - // Ensure that a subsequent flush does not push stale data. - t5 := t4.Add(time.Second) - tick <- t5 - f = <-flushed - require.Zero(t, f, "expected zero stats to have been flushed") - t.Logf("flush 5 completed") - - stats, err = store.GetWorkspaceAgentStats(ctx, t5) - require.NoError(t, err, "should not error getting stats") - require.Len(t, stats, 0, "should have no stats for workspace") - - // Ensure that buf never grew beyond what we expect - require.Equal(t, defaultBufferSize, cap(b.buf.ID), "buffer grew beyond expected capacity") -} - -// randStats returns a random agentproto.Stats -func randStats(t *testing.T, opts ...func(*agentproto.Stats)) *agentproto.Stats { - t.Helper() - s := &agentproto.Stats{ - ConnectionsByProto: map[string]int64{ - "ssh": mustRandInt64n(t, 9) + 1, - "vscode": mustRandInt64n(t, 9) + 1, - "jetbrains": mustRandInt64n(t, 9) + 1, - "reconnecting_pty": mustRandInt64n(t, 9) + 1, - }, - ConnectionCount: mustRandInt64n(t, 99) + 1, - ConnectionMedianLatencyMs: float64(mustRandInt64n(t, 99) + 1), - RxPackets: mustRandInt64n(t, 99) + 1, - RxBytes: mustRandInt64n(t, 99) + 1, - TxPackets: mustRandInt64n(t, 99) + 1, - TxBytes: mustRandInt64n(t, 99) + 1, - SessionCountVscode: mustRandInt64n(t, 9) + 1, - SessionCountJetbrains: mustRandInt64n(t, 9) + 1, - SessionCountReconnectingPty: mustRandInt64n(t, 9) + 1, - SessionCountSsh: mustRandInt64n(t, 9) + 1, - Metrics: []*agentproto.Stats_Metric{}, - } - for _, opt := range opts { - opt(s) - } - return s -} - -// deps is a set of test dependencies. -type deps struct { - Agent database.WorkspaceAgent - Template database.Template - User database.User - Workspace database.Workspace -} - -// setupDeps sets up a set of test dependencies. -// It creates an organization, user, template, workspace, and agent -// along with all the other miscellaneous plumbing required to link -// them together. -func setupDeps(t *testing.T, store database.Store, ps pubsub.Pubsub) deps { - t.Helper() - - org := dbgen.Organization(t, store, database.Organization{}) - user := dbgen.User(t, store, database.User{}) - _, err := store.InsertOrganizationMember(context.Background(), database.InsertOrganizationMemberParams{ - OrganizationID: org.ID, - UserID: user.ID, - Roles: []string{rbac.RoleOrgMember(org.ID)}, - }) - require.NoError(t, err) - tv := dbgen.TemplateVersion(t, store, database.TemplateVersion{ - OrganizationID: org.ID, - CreatedBy: user.ID, - }) - tpl := dbgen.Template(t, store, database.Template{ - CreatedBy: user.ID, - OrganizationID: org.ID, - ActiveVersionID: tv.ID, - }) - ws := dbgen.Workspace(t, store, database.Workspace{ - TemplateID: tpl.ID, - OwnerID: user.ID, - OrganizationID: org.ID, - LastUsedAt: time.Now().Add(-time.Hour), - }) - pj := dbgen.ProvisionerJob(t, store, ps, database.ProvisionerJob{ - InitiatorID: user.ID, - OrganizationID: org.ID, - }) - _ = dbgen.WorkspaceBuild(t, store, database.WorkspaceBuild{ - TemplateVersionID: tv.ID, - WorkspaceID: ws.ID, - JobID: pj.ID, - }) - res := dbgen.WorkspaceResource(t, store, database.WorkspaceResource{ - Transition: database.WorkspaceTransitionStart, - JobID: pj.ID, - }) - agt := dbgen.WorkspaceAgent(t, store, database.WorkspaceAgent{ - ResourceID: res.ID, - }) - return deps{ - Agent: agt, - Template: tpl, - User: user, - Workspace: ws, - } -} - -// mustRandInt64n returns a random int64 in the range [0, n). -func mustRandInt64n(t *testing.T, n int64) int64 { - t.Helper() - i, err := cryptorand.Intn(int(n)) - require.NoError(t, err) - return int64(i) -} diff --git a/coderd/coderd.go b/coderd/coderd.go index 25763530db702..2182894232601 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -43,7 +43,6 @@ import ( "github.com/coder/coder/v2/coderd/appearance" "github.com/coder/coder/v2/coderd/audit" "github.com/coder/coder/v2/coderd/awsidentity" - "github.com/coder/coder/v2/coderd/batchstats" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbrollup" @@ -189,7 +188,6 @@ type Options struct { HTTPClient *http.Client UpdateAgentMetrics func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) - StatsBatcher *batchstats.Batcher WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions @@ -352,10 +350,6 @@ func New(options *Options) *API { options.UserQuietHoursScheduleStore.Store(&v) } - if options.StatsBatcher == nil { - panic("developer error: options.StatsBatcher is nil") - } - siteCacheDir := options.CacheDir if siteCacheDir != "" { siteCacheDir = filepath.Join(siteCacheDir, "site") @@ -556,7 +550,6 @@ func New(options *Options) *API { Logger: options.Logger.Named("workspacestats"), Pubsub: options.Pubsub, TemplateScheduleStore: options.TemplateScheduleStore, - StatsBatcher: options.StatsBatcher, UpdateAgentMetricsFn: options.UpdateAgentMetrics, AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, }) diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 6153f1a68abcb..91ce5ea82e05c 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -48,13 +48,11 @@ import ( "tailscale.com/types/nettype" "cdr.dev/slog" - "cdr.dev/slog/sloggers/sloghuman" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/coderd" "github.com/coder/coder/v2/coderd/audit" "github.com/coder/coder/v2/coderd/autobuild" "github.com/coder/coder/v2/coderd/awsidentity" - "github.com/coder/coder/v2/coderd/batchstats" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbrollup" @@ -143,8 +141,7 @@ type Options struct { SwaggerEndpoint bool // Logger should only be overridden if you expect errors // as part of your test. - Logger *slog.Logger - StatsBatcher *batchstats.Batcher + Logger *slog.Logger WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions AllowWorkspaceRenames bool @@ -268,18 +265,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can if options.FilesRateLimit == 0 { options.FilesRateLimit = -1 } - if options.StatsBatcher == nil { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - batcher, closeBatcher, err := batchstats.New(ctx, - batchstats.WithStore(options.Database), - // Avoid cluttering up test output. - batchstats.WithLogger(slog.Make(sloghuman.Sink(io.Discard))), - ) - require.NoError(t, err, "create stats batcher") - options.StatsBatcher = batcher - t.Cleanup(closeBatcher) - } var templateScheduleStore atomic.Pointer[schedule.TemplateScheduleStore] if options.TemplateScheduleStore == nil { @@ -490,7 +475,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can HealthcheckFunc: options.HealthcheckFunc, HealthcheckTimeout: options.HealthcheckTimeout, HealthcheckRefresh: options.HealthcheckRefresh, - StatsBatcher: options.StatsBatcher, WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions, AllowWorkspaceRenames: options.AllowWorkspaceRenames, NewTicker: options.NewTicker, diff --git a/coderd/insights_test.go b/coderd/insights_test.go index 22e7ed6947bac..4c3bb406dc830 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -21,7 +21,6 @@ import ( "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/agent/agenttest" agentproto "github.com/coder/coder/v2/agent/proto" - "github.com/coder/coder/v2/coderd/batchstats" "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" @@ -680,18 +679,10 @@ func TestTemplateInsights_Golden(t *testing.T) { ctx := testutil.Context(t, testutil.WaitSuperLong) - // Use agent stats batcher to insert agent stats, similar to live system. - // NOTE(mafredri): Ideally we would pass batcher as a coderd option and - // insert using the agentClient, but we have a circular dependency on - // the database. - batcher, batcherCloser, err := batchstats.New( - ctx, - batchstats.WithStore(db), - batchstats.WithLogger(logger.Named("batchstats")), - batchstats.WithInterval(time.Hour), - ) - require.NoError(t, err) - defer batcherCloser() // Flushes the stats, this is to ensure they're written. + reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: db, + AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, + }) for workspace, data := range testData { for _, stat := range data.agentStats { @@ -701,13 +692,26 @@ func TestTemplateInsights_Golden(t *testing.T) { connectionCount = 0 } for createdAt.Before(stat.endedAt) { - err = batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{ - ConnectionCount: connectionCount, - SessionCountVscode: stat.sessionCountVSCode, - SessionCountJetbrains: stat.sessionCountJetBrains, - SessionCountReconnectingPty: stat.sessionCountReconnectingPTY, - SessionCountSsh: stat.sessionCountSSH, - }) + err := reporter.ReportAgentStats( + ctx, + createdAt, + database.Workspace{ + ID: workspace.id, + OwnerID: workspace.user.(*testUser).sdk.ID, + TemplateID: workspace.template.id, + }, + database.WorkspaceAgent{ + ID: workspace.agentID, + }, + workspace.template.name, + &agentproto.Stats{ + ConnectionCount: connectionCount, + SessionCountVscode: stat.sessionCountVSCode, + SessionCountJetbrains: stat.sessionCountJetBrains, + SessionCountReconnectingPty: stat.sessionCountReconnectingPTY, + SessionCountSsh: stat.sessionCountSSH, + }, + ) require.NoError(t, err, "want no error inserting agent stats") createdAt = createdAt.Add(30 * time.Second) } @@ -737,12 +741,8 @@ func TestTemplateInsights_Golden(t *testing.T) { }) } } - reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ - Database: db, - AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, - }) //nolint:gocritic // This is a test. - err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats) + err := reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats) require.NoError(t, err, "want no error inserting app stats") return client, events @@ -1579,18 +1579,10 @@ func TestUserActivityInsights_Golden(t *testing.T) { ctx := testutil.Context(t, testutil.WaitSuperLong) - // Use agent stats batcher to insert agent stats, similar to live system. - // NOTE(mafredri): Ideally we would pass batcher as a coderd option and - // insert using the agentClient, but we have a circular dependency on - // the database. - batcher, batcherCloser, err := batchstats.New( - ctx, - batchstats.WithStore(db), - batchstats.WithLogger(logger.Named("batchstats")), - batchstats.WithInterval(time.Hour), - ) - require.NoError(t, err) - defer batcherCloser() // Flushes the stats, this is to ensure they're written. + reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ + Database: db, + AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, + }) for workspace, data := range testData { for _, stat := range data.agentStats { @@ -1600,13 +1592,26 @@ func TestUserActivityInsights_Golden(t *testing.T) { connectionCount = 0 } for createdAt.Before(stat.endedAt) { - err = batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{ - ConnectionCount: connectionCount, - SessionCountVscode: stat.sessionCountVSCode, - SessionCountJetbrains: stat.sessionCountJetBrains, - SessionCountReconnectingPty: stat.sessionCountReconnectingPTY, - SessionCountSsh: stat.sessionCountSSH, - }) + err := reporter.ReportAgentStats( + ctx, + createdAt, + database.Workspace{ + ID: workspace.id, + OwnerID: workspace.user.(*testUser).sdk.ID, + TemplateID: workspace.template.id, + }, + database.WorkspaceAgent{ + ID: workspace.agentID, + }, + workspace.template.name, + &agentproto.Stats{ + ConnectionCount: connectionCount, + SessionCountVscode: stat.sessionCountVSCode, + SessionCountJetbrains: stat.sessionCountJetBrains, + SessionCountReconnectingPty: stat.sessionCountReconnectingPTY, + SessionCountSsh: stat.sessionCountSSH, + }, + ) require.NoError(t, err, "want no error inserting agent stats") createdAt = createdAt.Add(30 * time.Second) } @@ -1636,12 +1641,8 @@ func TestUserActivityInsights_Golden(t *testing.T) { }) } } - reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ - Database: db, - AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, - }) //nolint:gocritic // This is a test. - err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats) + err := reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats) require.NoError(t, err, "want no error inserting app stats") return client, events diff --git a/coderd/prometheusmetrics/prometheusmetrics_test.go b/coderd/prometheusmetrics/prometheusmetrics_test.go index 9c4c9fca0b66f..1b4f24d30300e 100644 --- a/coderd/prometheusmetrics/prometheusmetrics_test.go +++ b/coderd/prometheusmetrics/prometheusmetrics_test.go @@ -21,7 +21,6 @@ import ( "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/coderd/agentmetrics" - "github.com/coder/coder/v2/coderd/batchstats" "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbgen" @@ -389,19 +388,6 @@ func TestAgentStats(t *testing.T) { t.Cleanup(cancelFunc) db, pubsub := dbtestutil.NewDB(t) - log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - - batcher, closeBatcher, err := batchstats.New(ctx, - // We had previously set the batch size to 1 here, but that caused - // intermittent test flakes due to a race between the batcher completing - // its flush and the test asserting that the metrics were collected. - // Instead, we close the batcher after all stats have been posted, which - // forces a flush. - batchstats.WithStore(db), - batchstats.WithLogger(log), - ) - require.NoError(t, err, "create stats batcher failed") - t.Cleanup(closeBatcher) tLogger := slogtest.Make(t, nil) // Build sample workspaces with test agents and fake agent client @@ -409,7 +395,6 @@ func TestAgentStats(t *testing.T) { Database: db, IncludeProvisionerDaemon: true, Pubsub: pubsub, - StatsBatcher: batcher, Logger: &tLogger, }) @@ -424,7 +409,7 @@ func TestAgentStats(t *testing.T) { // given var i int64 for i = 0; i < 3; i++ { - _, err = agent1.PostStats(ctx, &agentsdk.Stats{ + _, err := agent1.PostStats(ctx, &agentsdk.Stats{ TxBytes: 1 + i, RxBytes: 2 + i, SessionCountVSCode: 3 + i, SessionCountJetBrains: 4 + i, SessionCountReconnectingPTY: 5 + i, SessionCountSSH: 6 + i, ConnectionCount: 7 + i, ConnectionMedianLatencyMS: 8000, @@ -449,11 +434,6 @@ func TestAgentStats(t *testing.T) { require.NoError(t, err) } - // Ensure that all stats are flushed to the database - // before we query them. We do not expect any more stats - // to be posted after this. - closeBatcher() - // when // // Set initialCreateAfter to some time in the past, so that AgentStats would include all above PostStats, diff --git a/coderd/workspacestats/reporter.go b/coderd/workspacestats/reporter.go index ec2c6a44fcb24..c18ac0ccae96b 100644 --- a/coderd/workspacestats/reporter.go +++ b/coderd/workspacestats/reporter.go @@ -2,6 +2,7 @@ package workspacestats import ( "context" + "encoding/json" "sync/atomic" "time" @@ -31,7 +32,6 @@ type ReporterOptions struct { Logger slog.Logger Pubsub pubsub.Pubsub TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore] - StatsBatcher StatsBatcher UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) AppStatBatchSize int @@ -146,11 +146,46 @@ func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspac var errGroup errgroup.Group errGroup.Go(func() error { - err := r.opts.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, stats) + start := time.Now() + connectionsByProto := json.RawMessage(`[]`) + payload, err := json.Marshal(stats.ConnectionsByProto) if err != nil { - r.opts.Logger.Error(ctx, "add agent stats to batcher", slog.Error(err)) - return xerrors.Errorf("insert workspace agent stats batch: %w", err) + r.opts.Logger.Error(ctx, "unable to marshal agent connections by proto, dropping data", slog.Error(err)) + } else { + connectionsByProto = payload } + + params := database.InsertWorkspaceAgentStatsParams{ + ID: []uuid.UUID{uuid.New()}, + CreatedAt: []time.Time{dbtime.Time(now)}, + AgentID: []uuid.UUID{workspaceAgent.ID}, + ConnectionsByProto: connectionsByProto, + UserID: []uuid.UUID{workspace.OwnerID}, + TemplateID: []uuid.UUID{workspace.TemplateID}, + WorkspaceID: []uuid.UUID{workspace.ID}, + ConnectionCount: []int64{stats.ConnectionCount}, + RxPackets: []int64{stats.RxPackets}, + RxBytes: []int64{stats.RxBytes}, + TxPackets: []int64{stats.TxPackets}, + TxBytes: []int64{stats.TxBytes}, + SessionCountVSCode: []int64{stats.SessionCountVscode}, + SessionCountJetBrains: []int64{stats.SessionCountJetbrains}, + SessionCountReconnectingPTY: []int64{stats.SessionCountReconnectingPty}, + SessionCountSSH: []int64{stats.SessionCountSsh}, + ConnectionMedianLatencyMS: []float64{stats.ConnectionMedianLatencyMs}, + } + + err = r.opts.Database.InsertWorkspaceAgentStats(ctx, params) + elapsed := time.Since(start) + if err != nil { + if database.IsQueryCanceledError(err) { + r.opts.Logger.Debug(ctx, "query canceled, skipping insert of workspace agent stats", slog.F("elapsed", elapsed)) + return nil + } + r.opts.Logger.Error(ctx, "error inserting workspace agent stats", slog.Error(err), slog.F("elapsed", elapsed)) + return nil + } + return nil }) errGroup.Go(func() error { From ca851baa75493a048d93795bc154eed31418486d Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 29 May 2024 16:44:49 +0000 Subject: [PATCH 2/6] fix tests --- coderd/agentapi/stats_test.go | 9 +++++++++ coderd/workspacestats/reporter.go | 8 +++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/coderd/agentapi/stats_test.go b/coderd/agentapi/stats_test.go index 5c372eecdefe4..ac9e795ca34d3 100644 --- a/coderd/agentapi/stats_test.go +++ b/coderd/agentapi/stats_test.go @@ -144,6 +144,9 @@ func TestUpdateStates(t *testing.T) { // User gets fetched to hit the UpdateAgentMetricsFn. dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil) + // Agent stats get inserted. + dbM.EXPECT().InsertWorkspaceAgentStats(gomock.Any(), gomock.Any()) + // Ensure that pubsub notifications are sent. notifyDescription := make(chan []byte) ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) { @@ -221,6 +224,9 @@ func TestUpdateStates(t *testing.T) { LastUsedAt: now, }).Return(nil) + // Agent stats get inserted. + dbM.EXPECT().InsertWorkspaceAgentStats(gomock.Any(), gomock.Any()) + _, err := api.UpdateStats(context.Background(), req) require.NoError(t, err) }) @@ -353,6 +359,9 @@ func TestUpdateStates(t *testing.T) { // User gets fetched to hit the UpdateAgentMetricsFn. dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil) + // Agent stats get inserted. + dbM.EXPECT().InsertWorkspaceAgentStats(gomock.Any(), gomock.Any()) + resp, err := api.UpdateStats(context.Background(), req) require.NoError(t, err) require.Equal(t, &agentproto.UpdateStatsResponse{ diff --git a/coderd/workspacestats/reporter.go b/coderd/workspacestats/reporter.go index c18ac0ccae96b..12837cace6f29 100644 --- a/coderd/workspacestats/reporter.go +++ b/coderd/workspacestats/reporter.go @@ -148,7 +148,7 @@ func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspac errGroup.Go(func() error { start := time.Now() connectionsByProto := json.RawMessage(`[]`) - payload, err := json.Marshal(stats.ConnectionsByProto) + payload, err := json.Marshal([]map[string]int64{stats.ConnectionsByProto}) if err != nil { r.opts.Logger.Error(ctx, "unable to marshal agent connections by proto, dropping data", slog.Error(err)) } else { @@ -174,16 +174,14 @@ func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspac SessionCountSSH: []int64{stats.SessionCountSsh}, ConnectionMedianLatencyMS: []float64{stats.ConnectionMedianLatencyMs}, } - - err = r.opts.Database.InsertWorkspaceAgentStats(ctx, params) elapsed := time.Since(start) + err = r.opts.Database.InsertWorkspaceAgentStats(ctx, params) if err != nil { if database.IsQueryCanceledError(err) { r.opts.Logger.Debug(ctx, "query canceled, skipping insert of workspace agent stats", slog.F("elapsed", elapsed)) return nil } - r.opts.Logger.Error(ctx, "error inserting workspace agent stats", slog.Error(err), slog.F("elapsed", elapsed)) - return nil + return xerrors.Errorf("insert workspace agent stats: %w", err) } return nil From cd70514c0a60c2940c279061dfdab436f2d8b159 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 29 May 2024 17:37:09 +0000 Subject: [PATCH 3/6] system call --- coderd/workspacestats/reporter.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/coderd/workspacestats/reporter.go b/coderd/workspacestats/reporter.go index 12837cace6f29..1c95e9deb77c9 100644 --- a/coderd/workspacestats/reporter.go +++ b/coderd/workspacestats/reporter.go @@ -14,6 +14,7 @@ import ( agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/prometheusmetrics" @@ -175,7 +176,8 @@ func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspac ConnectionMedianLatencyMS: []float64{stats.ConnectionMedianLatencyMs}, } elapsed := time.Since(start) - err = r.opts.Database.InsertWorkspaceAgentStats(ctx, params) + // nolint: gocritic // system function + err = r.opts.Database.InsertWorkspaceAgentStats(dbauthz.AsSystemRestricted(ctx), params) if err != nil { if database.IsQueryCanceledError(err) { r.opts.Logger.Debug(ctx, "query canceled, skipping insert of workspace agent stats", slog.F("elapsed", elapsed)) From e7b1de86b589eba8b69202267a52fef3cd40a283 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Wed, 29 May 2024 18:05:48 +0000 Subject: [PATCH 4/6] fix deps --- coderd/apikey/apikey_test.go | 2 +- coderd/insights_test.go | 4 ++++ coderd/prometheusmetrics/insights/metricscollector_test.go | 2 ++ coderd/workspaceapps/proxy.go | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/coderd/apikey/apikey_test.go b/coderd/apikey/apikey_test.go index 734a187219bf5..41f64fe0d866f 100644 --- a/coderd/apikey/apikey_test.go +++ b/coderd/apikey/apikey_test.go @@ -128,7 +128,7 @@ func TestGenerate(t *testing.T) { // Assert that the hashed secret is correct. hashed := sha256.Sum256([]byte(keytokens[1])) - assert.ElementsMatch(t, hashed, key.HashedSecret[:]) + assert.ElementsMatch(t, hashed, key.HashedSecret) assert.Equal(t, tc.params.UserID, key.UserID) assert.WithinDuration(t, dbtime.Now(), key.CreatedAt, time.Second*5) diff --git a/coderd/insights_test.go b/coderd/insights_test.go index 4c3bb406dc830..1b91c8033739b 100644 --- a/coderd/insights_test.go +++ b/coderd/insights_test.go @@ -681,6 +681,8 @@ func TestTemplateInsights_Golden(t *testing.T) { reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ Database: db, + Logger: logger.Named("workspacestats"), + Pubsub: ps, AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, }) @@ -1581,6 +1583,8 @@ func TestUserActivityInsights_Golden(t *testing.T) { reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ Database: db, + Logger: logger.Named("workspacestats"), + Pubsub: ps, AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, }) diff --git a/coderd/prometheusmetrics/insights/metricscollector_test.go b/coderd/prometheusmetrics/insights/metricscollector_test.go index 91ef3c7ee88fa..e0a0d05350b37 100644 --- a/coderd/prometheusmetrics/insights/metricscollector_test.go +++ b/coderd/prometheusmetrics/insights/metricscollector_test.go @@ -112,6 +112,8 @@ func TestCollectInsights(t *testing.T) { // Fake app usage reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{ Database: db, + Logger: logger.Named("workspacestats"), + Pubsub: ps, AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize, }) refTime := time.Now().Add(-3 * time.Minute).Truncate(time.Minute) diff --git a/coderd/workspaceapps/proxy.go b/coderd/workspaceapps/proxy.go index 7bf470a3cc416..69f1aadca49b2 100644 --- a/coderd/workspaceapps/proxy.go +++ b/coderd/workspaceapps/proxy.go @@ -573,7 +573,7 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT } // This strips the session token from a workspace app request. - cookieHeaders := r.Header.Values("Cookie")[:] + cookieHeaders := r.Header.Values("Cookie") r.Header.Del("Cookie") for _, cookieHeader := range cookieHeaders { r.Header.Add("Cookie", httpapi.StripCoderCookies(cookieHeader)) From 188103c999a8f5d2b951fa2de8005cc8287932f7 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 30 May 2024 15:09:15 +0000 Subject: [PATCH 5/6] remove workspace usage tracker --- cli/server.go | 8 - coderd/coderd.go | 16 +- coderd/coderdtest/coderdtest.go | 8 - coderd/workspaces.go | 10 +- coderd/workspacestats/reporter.go | 12 ++ coderd/workspaceusage/tracker.go | 235 -------------------------- coderd/workspaceusage/tracker_test.go | 225 ------------------------ 7 files changed, 23 insertions(+), 491 deletions(-) delete mode 100644 coderd/workspaceusage/tracker.go delete mode 100644 coderd/workspaceusage/tracker_test.go diff --git a/cli/server.go b/cli/server.go index ab456220e911d..1b85277bec1c0 100644 --- a/cli/server.go +++ b/cli/server.go @@ -86,7 +86,6 @@ import ( stringutil "github.com/coder/coder/v2/coderd/util/strings" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" - "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/cryptorand" @@ -964,13 +963,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. purger := dbpurge.New(ctx, logger.Named("dbpurge"), options.Database) defer purger.Close() - // Updates workspace usage - tracker := workspaceusage.New(options.Database, - workspaceusage.WithLogger(logger.Named("workspace_usage_tracker")), - ) - options.WorkspaceUsageTracker = tracker - defer tracker.Close() - // Wrap the server in middleware that redirects to the access URL if // the request is not to a local IP. var handler http.Handler = coderAPI.RootHandler diff --git a/coderd/coderd.go b/coderd/coderd.go index 2182894232601..48ea77929ce72 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -68,7 +68,6 @@ import ( "github.com/coder/coder/v2/coderd/util/slice" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspacestats" - "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/drpc" "github.com/coder/coder/v2/codersdk/healthsdk" @@ -203,8 +202,6 @@ type Options struct { // DatabaseRolluper rolls up template usage stats from raw agent and app // stats. This is used to provide insights in the WebUI. DatabaseRolluper *dbrollup.Rolluper - // WorkspaceUsageTracker tracks workspace usage by the CLI. - WorkspaceUsageTracker *workspaceusage.Tracker } // @title Coder API @@ -377,12 +374,6 @@ func New(options *Options) *API { options.DatabaseRolluper = dbrollup.New(options.Logger.Named("dbrollup"), options.Database) } - if options.WorkspaceUsageTracker == nil { - options.WorkspaceUsageTracker = workspaceusage.New(options.Database, - workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), - ) - } - ctx, cancel := context.WithCancel(context.Background()) r := chi.NewRouter() @@ -428,8 +419,7 @@ func New(options *Options) *API { options.Database, options.Pubsub, ), - dbRolluper: options.DatabaseRolluper, - workspaceUsageTracker: options.WorkspaceUsageTracker, + dbRolluper: options.DatabaseRolluper, } var customRoleHandler CustomRoleHandler = &agplCustomRoleHandler{} @@ -1293,8 +1283,7 @@ type API struct { Acquirer *provisionerdserver.Acquirer // dbRolluper rolls up template usage stats from raw agent and app // stats. This is used to provide insights in the WebUI. - dbRolluper *dbrollup.Rolluper - workspaceUsageTracker *workspaceusage.Tracker + dbRolluper *dbrollup.Rolluper } // Close waits for all WebSocket connections to drain before returning. @@ -1333,7 +1322,6 @@ func (api *API) Close() error { _ = (*coordinator).Close() } _ = api.agentProvider.Close() - api.workspaceUsageTracker.Close() return nil } diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 91ce5ea82e05c..2dbf9a6666869 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -69,7 +69,6 @@ import ( "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/coderd/workspaceapps" "github.com/coder/coder/v2/coderd/workspaceapps/appurl" - "github.com/coder/coder/v2/coderd/workspaceusage" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/drpc" @@ -320,12 +319,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can if options.WorkspaceUsageTrackerTick == nil { options.WorkspaceUsageTrackerTick = make(chan time.Time, 1) // buffering just in case } - // Close is called by API.Close() - wuTracker := workspaceusage.New( - options.Database, - workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")), - workspaceusage.WithTickFlush(options.WorkspaceUsageTrackerTick, options.WorkspaceUsageTrackerFlush), - ) var mutex sync.RWMutex var handler http.Handler @@ -479,7 +472,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can AllowWorkspaceRenames: options.AllowWorkspaceRenames, NewTicker: options.NewTicker, DatabaseRolluper: options.DatabaseRolluper, - WorkspaceUsageTracker: wuTracker, } } diff --git a/coderd/workspaces.go b/coderd/workspaces.go index 7d0344be4e321..667f21ae03bb1 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -1115,7 +1115,15 @@ func (api *API) postWorkspaceUsage(rw http.ResponseWriter, r *http.Request) { return } - api.workspaceUsageTracker.Add(workspace.ID) + err := api.statsReporter.ReportWorksaceUsage(r.Context(), workspace.ID) + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Failed to report workspace usage", + Detail: err.Error(), + }) + return + } + rw.WriteHeader(http.StatusNoContent) } diff --git a/coderd/workspacestats/reporter.go b/coderd/workspacestats/reporter.go index 1c95e9deb77c9..fbd74c94ea956 100644 --- a/coderd/workspacestats/reporter.go +++ b/coderd/workspacestats/reporter.go @@ -227,3 +227,15 @@ func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspac return nil } + +func (r *Reporter) ReportWorksaceUsage(ctx context.Context, workspaceID uuid.UUID) error { + err := r.opts.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ + ID: workspaceID, + LastUsedAt: dbtime.Now(), + }) + if err != nil { + return xerrors.Errorf("update workspace last_used_at: %w", err) + } + + return nil +} diff --git a/coderd/workspaceusage/tracker.go b/coderd/workspaceusage/tracker.go deleted file mode 100644 index 118b021d71d52..0000000000000 --- a/coderd/workspaceusage/tracker.go +++ /dev/null @@ -1,235 +0,0 @@ -package workspaceusage - -import ( - "bytes" - "context" - "flag" - "os" - "sort" - "sync" - "time" - - "github.com/google/uuid" - - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbauthz" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/sloghuman" -) - -var DefaultFlushInterval = 60 * time.Second - -// Store is a subset of database.Store -type Store interface { - BatchUpdateWorkspaceLastUsedAt(context.Context, database.BatchUpdateWorkspaceLastUsedAtParams) error -} - -// Tracker tracks and de-bounces updates to workspace usage activity. -// It keeps an internal map of workspace IDs that have been used and -// periodically flushes this to its configured Store. -type Tracker struct { - log slog.Logger // you know, for logs - flushLock sync.Mutex // protects m - flushErrors int // tracks the number of consecutive errors flushing - m *uuidSet // stores workspace ids - s Store // for flushing data - tickCh <-chan time.Time // controls flush interval - stopTick func() // stops flushing - stopCh chan struct{} // signals us to stop - stopOnce sync.Once // because you only stop once - doneCh chan struct{} // signifies that we have stopped - flushCh chan int // used for testing. -} - -// New returns a new Tracker. It is the caller's responsibility -// to call Close(). -func New(s Store, opts ...Option) *Tracker { - tr := &Tracker{ - log: slog.Make(sloghuman.Sink(os.Stderr)), - m: &uuidSet{}, - s: s, - tickCh: nil, - stopTick: nil, - stopCh: make(chan struct{}), - doneCh: make(chan struct{}), - flushCh: nil, - } - for _, opt := range opts { - opt(tr) - } - if tr.tickCh == nil && tr.stopTick == nil { - tick := time.NewTicker(DefaultFlushInterval) - tr.tickCh = tick.C - tr.stopTick = tick.Stop - } - go tr.loop() - return tr -} - -type Option func(*Tracker) - -// WithLogger sets the logger to be used by Tracker. -func WithLogger(log slog.Logger) Option { - return func(h *Tracker) { - h.log = log - } -} - -// WithFlushInterval allows configuring the flush interval of Tracker. -func WithFlushInterval(d time.Duration) Option { - return func(h *Tracker) { - ticker := time.NewTicker(d) - h.tickCh = ticker.C - h.stopTick = ticker.Stop - } -} - -// WithTickFlush allows passing two channels: one that reads -// a time.Time, and one that returns the number of marked workspaces -// every time Tracker flushes. -// For testing only and will panic if used outside of tests. -func WithTickFlush(tickCh <-chan time.Time, flushCh chan int) Option { - if flag.Lookup("test.v") == nil { - panic("developer error: WithTickFlush is not to be used outside of tests.") - } - return func(h *Tracker) { - h.tickCh = tickCh - h.stopTick = func() {} - h.flushCh = flushCh - } -} - -// Add marks the workspace with the given ID as having been used recently. -// Tracker will periodically flush this to its configured Store. -func (tr *Tracker) Add(workspaceID uuid.UUID) { - tr.m.Add(workspaceID) -} - -// flush updates last_used_at of all current workspace IDs. -// If this is held while a previous flush is in progress, it will -// deadlock until the previous flush has completed. -func (tr *Tracker) flush(now time.Time) { - // Copy our current set of IDs - ids := tr.m.UniqueAndClear() - count := len(ids) - if tr.flushCh != nil { // only used for testing - defer func() { - tr.flushCh <- count - }() - } - if count == 0 { - tr.log.Debug(context.Background(), "nothing to flush") - return - } - - // Set a short-ish timeout for this. We don't want to hang forever. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - // nolint: gocritic // system function - authCtx := dbauthz.AsSystemRestricted(ctx) - tr.flushLock.Lock() - defer tr.flushLock.Unlock() - // nolint:gocritic // (#13146) Will be moved soon as part of refactor. - if err := tr.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{ - LastUsedAt: now, - IDs: ids, - }); err != nil { - // A single failure to flush is likely not a huge problem. If the workspace is still connected at - // the next iteration, either another coderd instance will likely have this data or the CLI - // will tell us again that the workspace is in use. - tr.flushErrors++ - if tr.flushErrors > 1 { - tr.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", tr.flushErrors), slog.Error(err)) - // TODO: if this keeps failing, it indicates a fundamental problem with the database connection. - // How to surface it correctly to admins besides just screaming into the logs? - } else { - tr.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err)) - } - return - } - tr.flushErrors = 0 - tr.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now)) -} - -// loop periodically flushes every tick. -// If loop is called after Close, it will exit immediately and log an error. -func (tr *Tracker) loop() { - select { - case <-tr.doneCh: - tr.log.Error(context.Background(), "developer error: Loop called after Close") - return - default: - } - defer func() { - close(tr.doneCh) - tr.log.Debug(context.Background(), "workspace usage tracker loop exited") - }() - for { - select { - case <-tr.stopCh: - return - case now, ok := <-tr.tickCh: - if !ok { - return - } - // NOTE: we do not update last_used_at with the time at which each workspace was added. - // Instead, we update with the time of the flush. If the BatchUpdateWorkspacesLastUsedAt - // query can be rewritten to update each id with a corresponding last_used_at timestamp - // then we could capture the exact usage time of each workspace. For now however, as - // we perform this query at a regular interval, the time of the flush is 'close enough' - // for the purposes of both dormancy (and for autostop, in future). - tr.flush(now.UTC()) - } - } -} - -// Close stops Tracker and returns once Loop has exited. -// After calling Close(), Loop must not be called. -func (tr *Tracker) Close() error { - tr.stopOnce.Do(func() { - tr.stopCh <- struct{}{} - tr.stopTick() - <-tr.doneCh - }) - return nil -} - -// uuidSet is a set of UUIDs. Safe for concurrent usage. -// The zero value can be used. -type uuidSet struct { - l sync.Mutex - m map[uuid.UUID]struct{} -} - -func (s *uuidSet) Add(id uuid.UUID) { - s.l.Lock() - defer s.l.Unlock() - if s.m == nil { - s.m = make(map[uuid.UUID]struct{}) - } - s.m[id] = struct{}{} -} - -// UniqueAndClear returns the unique set of entries in s and -// resets the internal map. -func (s *uuidSet) UniqueAndClear() []uuid.UUID { - s.l.Lock() - defer s.l.Unlock() - if s.m == nil { - s.m = make(map[uuid.UUID]struct{}) - return []uuid.UUID{} - } - l := make([]uuid.UUID, 0) - for k := range s.m { - l = append(l, k) - } - // For ease of testing, sort the IDs lexically - sort.Slice(l, func(i, j int) bool { - // For some unfathomable reason, byte arrays are not comparable? - // See https://github.com/golang/go/issues/61004 - return bytes.Compare(l[i][:], l[j][:]) < 0 - }) - clear(s.m) - return l -} diff --git a/coderd/workspaceusage/tracker_test.go b/coderd/workspaceusage/tracker_test.go deleted file mode 100644 index ae9a9d2162d1c..0000000000000 --- a/coderd/workspaceusage/tracker_test.go +++ /dev/null @@ -1,225 +0,0 @@ -package workspaceusage_test - -import ( - "bytes" - "sort" - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "go.uber.org/goleak" - "go.uber.org/mock/gomock" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/v2/coderd/coderdtest" - "github.com/coder/coder/v2/coderd/database" - "github.com/coder/coder/v2/coderd/database/dbfake" - "github.com/coder/coder/v2/coderd/database/dbmock" - "github.com/coder/coder/v2/coderd/database/dbtestutil" - "github.com/coder/coder/v2/coderd/database/dbtime" - "github.com/coder/coder/v2/coderd/database/pubsub" - "github.com/coder/coder/v2/coderd/workspaceusage" - "github.com/coder/coder/v2/codersdk" - "github.com/coder/coder/v2/testutil" -) - -func TestTracker(t *testing.T) { - t.Parallel() - - ctrl := gomock.NewController(t) - mDB := dbmock.NewMockStore(ctrl) - log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - - tickCh := make(chan time.Time) - flushCh := make(chan int, 1) - wut := workspaceusage.New(mDB, - workspaceusage.WithLogger(log), - workspaceusage.WithTickFlush(tickCh, flushCh), - ) - defer wut.Close() - - // 1. No marked workspaces should imply no flush. - now := dbtime.Now() - tickCh <- now - count := <-flushCh - require.Equal(t, 0, count, "expected zero flushes") - - // 2. One marked workspace should cause a flush. - ids := []uuid.UUID{uuid.New()} - now = dbtime.Now() - wut.Add(ids[0]) - mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ - LastUsedAt: now, - IDs: ids, - }).Times(1) - tickCh <- now - count = <-flushCh - require.Equal(t, 1, count, "expected one flush with one id") - - // 3. Lots of marked workspaces should also cause a flush. - for i := 0; i < 31; i++ { - ids = append(ids, uuid.New()) - } - - // Sort ids so mDB know what to expect. - sort.Slice(ids, func(i, j int) bool { - return bytes.Compare(ids[i][:], ids[j][:]) < 0 - }) - - now = dbtime.Now() - mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{ - LastUsedAt: now, - IDs: ids, - }) - for _, id := range ids { - wut.Add(id) - } - tickCh <- now - count = <-flushCh - require.Equal(t, len(ids), count, "incorrect number of ids flushed") - - // 4. Try to cause a race condition! - now = dbtime.Now() - // Difficult to know what to EXPECT here, so we won't check strictly here. - mDB.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), gomock.Any()).MinTimes(1).MaxTimes(len(ids)) - // Try to force a race condition. - var wg sync.WaitGroup - count = 0 - for i := 0; i < len(ids); i++ { - wg.Add(1) - go func() { - defer wg.Done() - tickCh <- now - }() - wut.Add(ids[i]) - } - - for i := 0; i < len(ids); i++ { - count += <-flushCh - } - - wg.Wait() - require.Equal(t, len(ids), count, "incorrect number of ids flushed") - - // 5. Closing multiple times should not be a problem. - wut.Close() - wut.Close() -} - -// This test performs a more 'integration-style' test with multiple instances. -func TestTracker_MultipleInstances(t *testing.T) { - t.Parallel() - if !dbtestutil.WillUsePostgres() { - t.Skip("this test only makes sense with postgres") - } - - // Given we have two coderd instances connected to the same database - var ( - ctx = testutil.Context(t, testutil.WaitLong) - db, _ = dbtestutil.NewDB(t) - // real pubsub is not safe for concurrent use, and this test currently - // does not depend on pubsub - ps = pubsub.NewInMemory() - wuTickA = make(chan time.Time) - wuFlushA = make(chan int, 1) - wuTickB = make(chan time.Time) - wuFlushB = make(chan int, 1) - clientA = coderdtest.New(t, &coderdtest.Options{ - WorkspaceUsageTrackerTick: wuTickA, - WorkspaceUsageTrackerFlush: wuFlushA, - Database: db, - Pubsub: ps, - }) - clientB = coderdtest.New(t, &coderdtest.Options{ - WorkspaceUsageTrackerTick: wuTickB, - WorkspaceUsageTrackerFlush: wuFlushB, - Database: db, - Pubsub: ps, - }) - owner = coderdtest.CreateFirstUser(t, clientA) - now = dbtime.Now() - ) - - clientB.SetSessionToken(clientA.SessionToken()) - - // Create a number of workspaces - numWorkspaces := 10 - w := make([]dbfake.WorkspaceResponse, numWorkspaces) - for i := 0; i < numWorkspaces; i++ { - wr := dbfake.WorkspaceBuild(t, db, database.Workspace{ - OwnerID: owner.UserID, - OrganizationID: owner.OrganizationID, - LastUsedAt: now, - }).WithAgent().Do() - w[i] = wr - } - - // Use client A to update LastUsedAt of the first three - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[0].Workspace.ID)) - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[1].Workspace.ID)) - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[2].Workspace.ID)) - // Use client B to update LastUsedAt of the next three - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[3].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[4].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[5].Workspace.ID)) - // The next two will have updated from both instances - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[6].Workspace.ID)) - require.NoError(t, clientA.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) - require.NoError(t, clientB.PostWorkspaceUsage(ctx, w[7].Workspace.ID)) - // The last two will not report any usage. - - // Tick both with different times and wait for both flushes to complete - nowA := now.Add(time.Minute) - nowB := now.Add(2 * time.Minute) - var wg sync.WaitGroup - var flushedA, flushedB int - wg.Add(1) - go func() { - defer wg.Done() - wuTickA <- nowA - flushedA = <-wuFlushA - }() - wg.Add(1) - go func() { - defer wg.Done() - wuTickB <- nowB - flushedB = <-wuFlushB - }() - wg.Wait() - - // We expect 5 flushed IDs each - require.Equal(t, 5, flushedA) - require.Equal(t, 5, flushedB) - - // Fetch updated workspaces - updated := make([]codersdk.Workspace, numWorkspaces) - for i := 0; i < numWorkspaces; i++ { - ws, err := clientA.Workspace(ctx, w[i].Workspace.ID) - require.NoError(t, err) - updated[i] = ws - } - // We expect the first three to have the timestamp of flushA - require.Equal(t, nowA.UTC(), updated[0].LastUsedAt.UTC()) - require.Equal(t, nowA.UTC(), updated[1].LastUsedAt.UTC()) - require.Equal(t, nowA.UTC(), updated[2].LastUsedAt.UTC()) - // We expect the next three to have the timestamp of flushB - require.Equal(t, nowB.UTC(), updated[3].LastUsedAt.UTC()) - require.Equal(t, nowB.UTC(), updated[4].LastUsedAt.UTC()) - require.Equal(t, nowB.UTC(), updated[5].LastUsedAt.UTC()) - // The next two should have the timestamp of flushB as it is newer than flushA - require.Equal(t, nowB.UTC(), updated[6].LastUsedAt.UTC()) - require.Equal(t, nowB.UTC(), updated[7].LastUsedAt.UTC()) - // And the last two should be untouched - require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) - require.Equal(t, w[8].Workspace.LastUsedAt.UTC(), updated[8].LastUsedAt.UTC()) - require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) - require.Equal(t, w[9].Workspace.LastUsedAt.UTC(), updated[9].LastUsedAt.UTC()) -} - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} From 59048fdf356947023408bde05f44c11ff8a3dbd1 Mon Sep 17 00:00:00 2001 From: Garrett Delfosse Date: Thu, 30 May 2024 15:34:57 +0000 Subject: [PATCH 6/6] fix port forward tests --- cli/portforward_test.go | 17 +---------------- coderd/coderdtest/coderdtest.go | 26 -------------------------- 2 files changed, 1 insertion(+), 42 deletions(-) diff --git a/cli/portforward_test.go b/cli/portforward_test.go index edef520c23dc6..902ed1f6bb247 100644 --- a/cli/portforward_test.go +++ b/cli/portforward_test.go @@ -21,7 +21,6 @@ import ( "github.com/coder/coder/v2/coderd/coderdtest" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbfake" - "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/pty/ptytest" "github.com/coder/coder/v2/testutil" @@ -97,12 +96,7 @@ func TestPortForward(t *testing.T) { // Setup agent once to be shared between test-cases (avoid expensive // non-parallel setup). var ( - wuTick = make(chan time.Time) - wuFlush = make(chan int, 1) - client, db = coderdtest.NewWithDatabase(t, &coderdtest.Options{ - WorkspaceUsageTrackerTick: wuTick, - WorkspaceUsageTrackerFlush: wuFlush, - }) + client, db = coderdtest.NewWithDatabase(t, nil) admin = coderdtest.CreateFirstUser(t, client) member, memberUser = coderdtest.CreateAnotherUser(t, client, admin.OrganizationID) workspace = runAgent(t, client, memberUser.ID, db) @@ -155,9 +149,6 @@ func TestPortForward(t *testing.T) { err = <-errC require.ErrorIs(t, err, context.Canceled) - flushCtx := testutil.Context(t, testutil.WaitShort) - testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) - _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) @@ -210,9 +201,6 @@ func TestPortForward(t *testing.T) { err = <-errC require.ErrorIs(t, err, context.Canceled) - flushCtx := testutil.Context(t, testutil.WaitShort) - testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) - _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) @@ -278,9 +266,6 @@ func TestPortForward(t *testing.T) { err := <-errC require.ErrorIs(t, err, context.Canceled) - flushCtx := testutil.Context(t, testutil.WaitShort) - testutil.RequireSendCtx(flushCtx, t, wuTick, dbtime.Now()) - _ = testutil.RequireRecvCtx(flushCtx, t, wuFlush) updated, err := client.Workspace(context.Background(), workspace.ID) require.NoError(t, err) require.Greater(t, updated.LastUsedAt, workspace.LastUsedAt) diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 2dbf9a6666869..7ecbbba30bdfb 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -146,8 +146,6 @@ type Options struct { AllowWorkspaceRenames bool NewTicker func(duration time.Duration) (<-chan time.Time, func()) DatabaseRolluper *dbrollup.Rolluper - WorkspaceUsageTrackerFlush chan int - WorkspaceUsageTrackerTick chan time.Time } // New constructs a codersdk client connected to an in-memory API instance. @@ -296,30 +294,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can hangDetector.Start() t.Cleanup(hangDetector.Close) - // Did last_used_at not update? Scratching your noggin? Here's why. - // Workspace usage tracking must be triggered manually in tests. - // The vast majority of existing tests do not depend on last_used_at - // and adding an extra time-based background goroutine to all existing - // tests may lead to future flakes and goleak complaints. - // Instead, pass in your own flush and ticker like so: - // - // tickCh = make(chan time.Time) - // flushCh = make(chan int, 1) - // client = coderdtest.New(t, &coderdtest.Options{ - // WorkspaceUsageTrackerFlush: flushCh, - // WorkspaceUsageTrackerTick: tickCh - // }) - // - // Now to trigger a tick, just write to `tickCh`. - // Reading from `flushCh` will ensure that workspaceusage.Tracker flushed. - // See TestPortForward or TestTracker_MultipleInstances for how this works in practice. - if options.WorkspaceUsageTrackerFlush == nil { - options.WorkspaceUsageTrackerFlush = make(chan int, 1) // buffering just in case - } - if options.WorkspaceUsageTrackerTick == nil { - options.WorkspaceUsageTrackerTick = make(chan time.Time, 1) // buffering just in case - } - var mutex sync.RWMutex var handler http.Handler srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {