diff --git a/cli/server.go b/cli/server.go index 170b7c5eb9f00..15cefb364ce3e 100644 --- a/cli/server.go +++ b/cli/server.go @@ -63,6 +63,7 @@ import ( "github.com/coder/coder/cli/config" "github.com/coder/coder/coderd" "github.com/coder/coder/coderd/autobuild" + "github.com/coder/coder/coderd/batchstats" "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbfake" "github.com/coder/coder/coderd/database/dbmetrics" @@ -813,6 +814,16 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. options.SwaggerEndpoint = cfg.Swagger.Enable.Value() } + batcher, closeBatcher, err := batchstats.New(ctx, + batchstats.WithLogger(options.Logger.Named("batchstats")), + batchstats.WithStore(options.Database), + ) + if err != nil { + return xerrors.Errorf("failed to create agent stats batcher: %w", err) + } + options.StatsBatcher = batcher + defer closeBatcher() + closeCheckInactiveUsersFunc := dormancy.CheckInactiveUsers(ctx, logger, options.Database) defer closeCheckInactiveUsersFunc() diff --git a/coderd/batchstats/batcher.go b/coderd/batchstats/batcher.go new file mode 100644 index 0000000000000..fc177fd143d6a --- /dev/null +++ b/coderd/batchstats/batcher.go @@ -0,0 +1,289 @@ +package batchstats + +import ( + "context" + "encoding/json" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" + + "github.com/coder/coder/coderd/database" + "github.com/coder/coder/coderd/database/dbauthz" + "github.com/coder/coder/codersdk/agentsdk" +) + +const ( + defaultBufferSize = 1024 + defaultFlushInterval = time.Second +) + +// Batcher holds a buffer of agent stats and periodically flushes them to +// its configured store. It also updates the workspace's last used time. +type Batcher struct { + store database.Store + log slog.Logger + + mu sync.Mutex + // TODO: make this a buffered chan instead? + buf *database.InsertWorkspaceAgentStatsParams + // NOTE: we batch this separately as it's a jsonb field and + // pq.Array + unnest doesn't play nicely with this. + connectionsByProto []map[string]int64 + batchSize int + + // tickCh is used to periodically flush the buffer. + tickCh <-chan time.Time + ticker *time.Ticker + interval time.Duration + // flushLever is used to signal the flusher to flush the buffer immediately. + flushLever chan struct{} + flushForced atomic.Bool + // flushed is used during testing to signal that a flush has completed. + flushed chan<- int +} + +// Option is a functional option for configuring a Batcher. +type Option func(b *Batcher) + +// WithStore sets the store to use for storing stats. +func WithStore(store database.Store) Option { + return func(b *Batcher) { + b.store = store + } +} + +// WithBatchSize sets the number of stats to store in a batch. +func WithBatchSize(size int) Option { + return func(b *Batcher) { + b.batchSize = size + } +} + +// WithInterval sets the interval for flushes. +func WithInterval(d time.Duration) Option { + return func(b *Batcher) { + b.interval = d + } +} + +// WithLogger sets the logger to use for logging. +func WithLogger(log slog.Logger) Option { + return func(b *Batcher) { + b.log = log + } +} + +// New creates a new Batcher and starts it. +func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) { + b := &Batcher{} + b.log = slog.Make(sloghuman.Sink(os.Stderr)) + b.flushLever = make(chan struct{}, 1) // Buffered so that it doesn't block. + for _, opt := range opts { + opt(b) + } + + if b.store == nil { + return nil, nil, xerrors.Errorf("no store configured for batcher") + } + + if b.interval == 0 { + b.interval = defaultFlushInterval + } + + if b.batchSize == 0 { + b.batchSize = defaultBufferSize + } + + if b.tickCh == nil { + b.ticker = time.NewTicker(b.interval) + b.tickCh = b.ticker.C + } + + cancelCtx, cancelFunc := context.WithCancel(ctx) + done := make(chan struct{}) + go func() { + b.run(cancelCtx) + close(done) + }() + + closer := func() { + cancelFunc() + if b.ticker != nil { + b.ticker.Stop() + } + <-done + } + + return b, closer, nil +} + +// Add adds a stat to the batcher for the given workspace and agent. +func (b *Batcher) Add( + agentID uuid.UUID, + templateID uuid.UUID, + userID uuid.UUID, + workspaceID uuid.UUID, + st agentsdk.Stats, +) error { + b.mu.Lock() + defer b.mu.Unlock() + + now := database.Now() + + b.buf.ID = append(b.buf.ID, uuid.New()) + b.buf.CreatedAt = append(b.buf.CreatedAt, now) + b.buf.AgentID = append(b.buf.AgentID, agentID) + b.buf.UserID = append(b.buf.UserID, userID) + b.buf.TemplateID = append(b.buf.TemplateID, templateID) + b.buf.WorkspaceID = append(b.buf.WorkspaceID, workspaceID) + + // Store the connections by proto separately as it's a jsonb field. We marshal on flush. + // b.buf.ConnectionsByProto = append(b.buf.ConnectionsByProto, st.ConnectionsByProto) + b.connectionsByProto = append(b.connectionsByProto, st.ConnectionsByProto) + + b.buf.ConnectionCount = append(b.buf.ConnectionCount, st.ConnectionCount) + b.buf.RxPackets = append(b.buf.RxPackets, st.RxPackets) + b.buf.RxBytes = append(b.buf.RxBytes, st.RxBytes) + b.buf.TxPackets = append(b.buf.TxPackets, st.TxPackets) + b.buf.TxBytes = append(b.buf.TxBytes, st.TxBytes) + b.buf.SessionCountVSCode = append(b.buf.SessionCountVSCode, st.SessionCountVSCode) + b.buf.SessionCountJetBrains = append(b.buf.SessionCountJetBrains, st.SessionCountJetBrains) + b.buf.SessionCountReconnectingPTY = append(b.buf.SessionCountReconnectingPTY, st.SessionCountReconnectingPTY) + b.buf.SessionCountSSH = append(b.buf.SessionCountSSH, st.SessionCountSSH) + b.buf.ConnectionMedianLatencyMS = append(b.buf.ConnectionMedianLatencyMS, st.ConnectionMedianLatencyMS) + + // If the buffer is over 80% full, signal the flusher to flush immediately. + // We want to trigger flushes early to reduce the likelihood of + // accidentally growing the buffer over batchSize. + filled := float64(len(b.buf.ID)) / float64(b.batchSize) + if filled >= 0.8 && !b.flushForced.Load() { + b.flushLever <- struct{}{} + b.flushForced.Store(true) + } + return nil +} + +// Run runs the batcher. +func (b *Batcher) run(ctx context.Context) { + b.initBuf(b.batchSize) + // nolint:gocritic // This is only ever used for one thing - inserting agent stats. + authCtx := dbauthz.AsSystemRestricted(ctx) + for { + select { + case <-b.tickCh: + b.flush(authCtx, false, "scheduled") + case <-b.flushLever: + // If the flush lever is depressed, flush the buffer immediately. + b.flush(authCtx, true, "reaching capacity") + case <-ctx.Done(): + b.log.Warn(ctx, "context done, flushing before exit") + b.flush(authCtx, true, "exit") + return + } + } +} + +// flush flushes the batcher's buffer. +func (b *Batcher) flush(ctx context.Context, forced bool, reason string) { + b.mu.Lock() + b.flushForced.Store(true) + start := time.Now() + count := len(b.buf.ID) + defer func() { + b.flushForced.Store(false) + b.mu.Unlock() + // Notify that a flush has completed. This only happens in tests. + if b.flushed != nil { + select { + case <-ctx.Done(): + close(b.flushed) + default: + b.flushed <- count + } + } + if count > 0 { + elapsed := time.Since(start) + b.log.Debug(ctx, "flush complete", + slog.F("count", count), + slog.F("elapsed", elapsed), + slog.F("forced", forced), + slog.F("reason", reason), + ) + } + }() + + if len(b.buf.ID) == 0 { + return + } + + // marshal connections by proto + payload, err := json.Marshal(b.connectionsByProto) + if err != nil { + b.log.Error(ctx, "unable to marshal agent connections by proto, dropping data", slog.Error(err)) + b.buf.ConnectionsByProto = json.RawMessage(`[]`) + } else { + b.buf.ConnectionsByProto = payload + } + + err = b.store.InsertWorkspaceAgentStats(ctx, *b.buf) + elapsed := time.Since(start) + if err != nil { + b.log.Error(ctx, "error inserting workspace agent stats", slog.Error(err), slog.F("elapsed", elapsed)) + return + } + + b.resetBuf() +} + +// initBuf resets the buffer. b MUST be locked. +func (b *Batcher) initBuf(size int) { + b.buf = &database.InsertWorkspaceAgentStatsParams{ + ID: make([]uuid.UUID, 0, b.batchSize), + CreatedAt: make([]time.Time, 0, b.batchSize), + UserID: make([]uuid.UUID, 0, b.batchSize), + WorkspaceID: make([]uuid.UUID, 0, b.batchSize), + TemplateID: make([]uuid.UUID, 0, b.batchSize), + AgentID: make([]uuid.UUID, 0, b.batchSize), + ConnectionsByProto: json.RawMessage("[]"), + ConnectionCount: make([]int64, 0, b.batchSize), + RxPackets: make([]int64, 0, b.batchSize), + RxBytes: make([]int64, 0, b.batchSize), + TxPackets: make([]int64, 0, b.batchSize), + TxBytes: make([]int64, 0, b.batchSize), + SessionCountVSCode: make([]int64, 0, b.batchSize), + SessionCountJetBrains: make([]int64, 0, b.batchSize), + SessionCountReconnectingPTY: make([]int64, 0, b.batchSize), + SessionCountSSH: make([]int64, 0, b.batchSize), + ConnectionMedianLatencyMS: make([]float64, 0, b.batchSize), + } + + b.connectionsByProto = make([]map[string]int64, 0, size) +} + +func (b *Batcher) resetBuf() { + b.buf.ID = b.buf.ID[:0] + b.buf.CreatedAt = b.buf.CreatedAt[:0] + b.buf.UserID = b.buf.UserID[:0] + b.buf.WorkspaceID = b.buf.WorkspaceID[:0] + b.buf.TemplateID = b.buf.TemplateID[:0] + b.buf.AgentID = b.buf.AgentID[:0] + b.buf.ConnectionsByProto = json.RawMessage(`[]`) + b.buf.ConnectionCount = b.buf.ConnectionCount[:0] + b.buf.RxPackets = b.buf.RxPackets[:0] + b.buf.RxBytes = b.buf.RxBytes[:0] + b.buf.TxPackets = b.buf.TxPackets[:0] + b.buf.TxBytes = b.buf.TxBytes[:0] + b.buf.SessionCountVSCode = b.buf.SessionCountVSCode[:0] + b.buf.SessionCountJetBrains = b.buf.SessionCountJetBrains[:0] + b.buf.SessionCountReconnectingPTY = b.buf.SessionCountReconnectingPTY[:0] + b.buf.SessionCountSSH = b.buf.SessionCountSSH[:0] + b.buf.ConnectionMedianLatencyMS = b.buf.ConnectionMedianLatencyMS[:0] + b.connectionsByProto = b.connectionsByProto[:0] +} diff --git a/coderd/batchstats/batcher_internal_test.go b/coderd/batchstats/batcher_internal_test.go new file mode 100644 index 0000000000000..a6e28f1a9f389 --- /dev/null +++ b/coderd/batchstats/batcher_internal_test.go @@ -0,0 +1,226 @@ +package batchstats + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + + "github.com/coder/coder/coderd/database" + "github.com/coder/coder/coderd/database/dbgen" + "github.com/coder/coder/coderd/database/dbtestutil" + "github.com/coder/coder/coderd/rbac" + "github.com/coder/coder/codersdk/agentsdk" + "github.com/coder/coder/cryptorand" +) + +func TestBatchStats(t *testing.T) { + t.Parallel() + + // Given: a fresh batcher with no data + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + log := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + store, _ := dbtestutil.NewDB(t) + + // Set up some test dependencies. + deps1 := setupDeps(t, store) + deps2 := setupDeps(t, store) + tick := make(chan time.Time) + flushed := make(chan int) + + b, closer, err := New(ctx, + WithStore(store), + WithLogger(log), + func(b *Batcher) { + b.tickCh = tick + b.flushed = flushed + }, + ) + require.NoError(t, err) + t.Cleanup(closer) + + // Given: no data points are added for workspace + // When: it becomes time to report stats + t1 := time.Now() + // Signal a tick and wait for a flush to complete. + tick <- t1 + f := <-flushed + require.Equal(t, 0, f, "expected no data to be flushed") + t.Logf("flush 1 completed") + + // Then: it should report no stats. + stats, err := store.GetWorkspaceAgentStats(ctx, t1) + require.NoError(t, err, "should not error getting stats") + require.Empty(t, stats, "should have no stats for workspace") + + // Given: a single data point is added for workspace + t2 := time.Now() + t.Logf("inserting 1 stat") + require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + + // When: it becomes time to report stats + // Signal a tick and wait for a flush to complete. + tick <- t2 + f = <-flushed // Wait for a flush to complete. + require.Equal(t, 1, f, "expected one stat to be flushed") + t.Logf("flush 2 completed") + + // Then: it should report a single stat. + stats, err = store.GetWorkspaceAgentStats(ctx, t2) + require.NoError(t, err, "should not error getting stats") + require.Len(t, stats, 1, "should have stats for workspace") + + // Given: a lot of data points are added for both workspaces + // (equal to batch size) + t3 := time.Now() + done := make(chan struct{}) + + go func() { + defer close(done) + t.Logf("inserting %d stats", defaultBufferSize) + for i := 0; i < defaultBufferSize; i++ { + if i%2 == 0 { + require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t))) + } else { + require.NoError(t, b.Add(deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t))) + } + } + }() + + // When: the buffer comes close to capacity + // Then: The buffer will force-flush once. + f = <-flushed + t.Logf("flush 3 completed") + require.Greater(t, f, 819, "expected at least 819 stats to be flushed (>=80% of buffer)") + // And we should finish inserting the stats + <-done + + stats, err = store.GetWorkspaceAgentStats(ctx, t3) + require.NoError(t, err, "should not error getting stats") + require.Len(t, stats, 2, "should have stats for both workspaces") + + // Ensures that a subsequent flush pushes all the remaining data + t4 := time.Now() + tick <- t4 + f2 := <-flushed + t.Logf("flush 4 completed") + expectedCount := defaultBufferSize - f + require.Equal(t, expectedCount, f2, "did not flush expected remaining rows") + + // Ensure that a subsequent flush does not push stale data. + t5 := time.Now() + tick <- t5 + f = <-flushed + require.Zero(t, f, "expected zero stats to have been flushed") + t.Logf("flush 5 completed") + + stats, err = store.GetWorkspaceAgentStats(ctx, t5) + require.NoError(t, err, "should not error getting stats") + require.Len(t, stats, 0, "should have no stats for workspace") + + // Ensure that buf never grew beyond what we expect + require.Equal(t, defaultBufferSize, cap(b.buf.ID), "buffer grew beyond expected capacity") +} + +// randAgentSDKStats returns a random agentsdk.Stats +func randAgentSDKStats(t *testing.T, opts ...func(*agentsdk.Stats)) agentsdk.Stats { + t.Helper() + s := agentsdk.Stats{ + ConnectionsByProto: map[string]int64{ + "ssh": mustRandInt64n(t, 9) + 1, + "vscode": mustRandInt64n(t, 9) + 1, + "jetbrains": mustRandInt64n(t, 9) + 1, + "reconnecting_pty": mustRandInt64n(t, 9) + 1, + }, + ConnectionCount: mustRandInt64n(t, 99) + 1, + ConnectionMedianLatencyMS: float64(mustRandInt64n(t, 99) + 1), + RxPackets: mustRandInt64n(t, 99) + 1, + RxBytes: mustRandInt64n(t, 99) + 1, + TxPackets: mustRandInt64n(t, 99) + 1, + TxBytes: mustRandInt64n(t, 99) + 1, + SessionCountVSCode: mustRandInt64n(t, 9) + 1, + SessionCountJetBrains: mustRandInt64n(t, 9) + 1, + SessionCountReconnectingPTY: mustRandInt64n(t, 9) + 1, + SessionCountSSH: mustRandInt64n(t, 9) + 1, + Metrics: []agentsdk.AgentMetric{}, + } + for _, opt := range opts { + opt(&s) + } + return s +} + +// deps is a set of test dependencies. +type deps struct { + Agent database.WorkspaceAgent + Template database.Template + User database.User + Workspace database.Workspace +} + +// setupDeps sets up a set of test dependencies. +// It creates an organization, user, template, workspace, and agent +// along with all the other miscellaneous plumbing required to link +// them together. +func setupDeps(t *testing.T, store database.Store) deps { + t.Helper() + + org := dbgen.Organization(t, store, database.Organization{}) + user := dbgen.User(t, store, database.User{}) + _, err := store.InsertOrganizationMember(context.Background(), database.InsertOrganizationMemberParams{ + OrganizationID: org.ID, + UserID: user.ID, + Roles: []string{rbac.RoleOrgMember(org.ID)}, + }) + require.NoError(t, err) + tv := dbgen.TemplateVersion(t, store, database.TemplateVersion{ + OrganizationID: org.ID, + CreatedBy: user.ID, + }) + tpl := dbgen.Template(t, store, database.Template{ + CreatedBy: user.ID, + OrganizationID: org.ID, + ActiveVersionID: tv.ID, + }) + ws := dbgen.Workspace(t, store, database.Workspace{ + TemplateID: tpl.ID, + OwnerID: user.ID, + OrganizationID: org.ID, + LastUsedAt: time.Now().Add(-time.Hour), + }) + pj := dbgen.ProvisionerJob(t, store, database.ProvisionerJob{ + InitiatorID: user.ID, + OrganizationID: org.ID, + }) + _ = dbgen.WorkspaceBuild(t, store, database.WorkspaceBuild{ + TemplateVersionID: tv.ID, + WorkspaceID: ws.ID, + JobID: pj.ID, + }) + res := dbgen.WorkspaceResource(t, store, database.WorkspaceResource{ + Transition: database.WorkspaceTransitionStart, + JobID: pj.ID, + }) + agt := dbgen.WorkspaceAgent(t, store, database.WorkspaceAgent{ + ResourceID: res.ID, + }) + return deps{ + Agent: agt, + Template: tpl, + User: user, + Workspace: ws, + } +} + +// mustRandInt64n returns a random int64 in the range [0, n). +func mustRandInt64n(t *testing.T, n int64) int64 { + t.Helper() + i, err := cryptorand.Intn(int(n)) + require.NoError(t, err) + return int64(i) +} diff --git a/coderd/coderd.go b/coderd/coderd.go index d7b80ff273097..58b6c902c7dbc 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -43,6 +43,7 @@ import ( "github.com/coder/coder/buildinfo" "github.com/coder/coder/coderd/audit" "github.com/coder/coder/coderd/awsidentity" + "github.com/coder/coder/coderd/batchstats" "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" "github.com/coder/coder/coderd/database/pubsub" @@ -160,6 +161,7 @@ type Options struct { HTTPClient *http.Client UpdateAgentMetrics func(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) + StatsBatcher *batchstats.Batcher } // @title Coder API @@ -180,6 +182,8 @@ type Options struct { // @in header // @name Coder-Session-Token // New constructs a Coder API handler. +// +//nolint:gocyclo func New(options *Options) *API { if options == nil { options = &Options{} @@ -288,6 +292,10 @@ func New(options *Options) *API { options.UserQuietHoursScheduleStore.Store(&v) } + if options.StatsBatcher == nil { + panic("developer error: options.StatsBatcher is nil") + } + siteCacheDir := options.CacheDir if siteCacheDir != "" { siteCacheDir = filepath.Join(siteCacheDir, "site") @@ -462,6 +470,8 @@ func New(options *Options) *API { cors := httpmw.Cors(options.DeploymentValues.Dangerous.AllowAllCors.Value()) prometheusMW := httpmw.Prometheus(options.PrometheusRegistry) + api.statsBatcher = options.StatsBatcher + r.Use( httpmw.Recover(api.Logger), tracing.StatusWriterMiddleware, @@ -994,6 +1004,8 @@ type API struct { healthCheckGroup *singleflight.Group[string, *healthcheck.Report] healthCheckCache atomic.Pointer[healthcheck.Report] + + statsBatcher *batchstats.Batcher } // Close waits for all WebSocket connections to drain before returning. diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index 351a6d0d9a075..71e3336ab2e87 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -51,11 +51,13 @@ import ( "tailscale.com/types/nettype" "cdr.dev/slog" + "cdr.dev/slog/sloggers/sloghuman" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/coderd" "github.com/coder/coder/coderd/audit" "github.com/coder/coder/coderd/autobuild" "github.com/coder/coder/coderd/awsidentity" + "github.com/coder/coder/coderd/batchstats" "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" "github.com/coder/coder/coderd/database/dbtestutil" @@ -140,7 +142,8 @@ type Options struct { SwaggerEndpoint bool // Logger should only be overridden if you expect errors // as part of your test. - Logger *slog.Logger + Logger *slog.Logger + StatsBatcher *batchstats.Batcher } // New constructs a codersdk client connected to an in-memory API instance. @@ -241,6 +244,18 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can if options.FilesRateLimit == 0 { options.FilesRateLimit = -1 } + if options.StatsBatcher == nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + batcher, closeBatcher, err := batchstats.New(ctx, + batchstats.WithStore(options.Database), + // Avoid cluttering up test output. + batchstats.WithLogger(slog.Make(sloghuman.Sink(io.Discard))), + ) + require.NoError(t, err, "create stats batcher") + options.StatsBatcher = batcher + t.Cleanup(closeBatcher) + } var templateScheduleStore atomic.Pointer[schedule.TemplateScheduleStore] if options.TemplateScheduleStore == nil { @@ -409,6 +424,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can HealthcheckFunc: options.HealthcheckFunc, HealthcheckTimeout: options.HealthcheckTimeout, HealthcheckRefresh: options.HealthcheckRefresh, + StatsBatcher: options.StatsBatcher, } } diff --git a/coderd/database/dbauthz/dbauthz.go b/coderd/database/dbauthz/dbauthz.go index 5e82320832442..488842dcaf351 100644 --- a/coderd/database/dbauthz/dbauthz.go +++ b/coderd/database/dbauthz/dbauthz.go @@ -2016,6 +2016,14 @@ func (q *querier) InsertWorkspaceAgentStat(ctx context.Context, arg database.Ins return q.db.InsertWorkspaceAgentStat(ctx, arg) } +func (q *querier) InsertWorkspaceAgentStats(ctx context.Context, arg database.InsertWorkspaceAgentStatsParams) error { + if err := q.authorizeContext(ctx, rbac.ActionCreate, rbac.ResourceSystem); err != nil { + return err + } + + return q.db.InsertWorkspaceAgentStats(ctx, arg) +} + func (q *querier) InsertWorkspaceApp(ctx context.Context, arg database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) { if err := q.authorizeContext(ctx, rbac.ActionCreate, rbac.ResourceSystem); err != nil { return database.WorkspaceApp{}, err diff --git a/coderd/database/dbfake/dbfake.go b/coderd/database/dbfake/dbfake.go index 03ae01182dbd0..cc0d05aaee9c3 100644 --- a/coderd/database/dbfake/dbfake.go +++ b/coderd/database/dbfake/dbfake.go @@ -2810,8 +2810,12 @@ func (q *FakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim } statByAgent := map[uuid.UUID]database.GetWorkspaceAgentStatsRow{} - for _, agentStat := range latestAgentStats { - stat := statByAgent[agentStat.AgentID] + for agentID, agentStat := range latestAgentStats { + stat := statByAgent[agentID] + stat.AgentID = agentStat.AgentID + stat.TemplateID = agentStat.TemplateID + stat.UserID = agentStat.UserID + stat.WorkspaceID = agentStat.WorkspaceID stat.SessionCountVSCode += agentStat.SessionCountVSCode stat.SessionCountJetBrains += agentStat.SessionCountJetBrains stat.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY @@ -4177,6 +4181,49 @@ func (q *FakeQuerier) InsertWorkspaceAgentStat(_ context.Context, p database.Ins return stat, nil } +func (q *FakeQuerier) InsertWorkspaceAgentStats(_ context.Context, arg database.InsertWorkspaceAgentStatsParams) error { + err := validateDatabaseType(arg) + if err != nil { + return err + } + + q.mutex.Lock() + defer q.mutex.Unlock() + + var connectionsByProto []map[string]int64 + if err := json.Unmarshal(arg.ConnectionsByProto, &connectionsByProto); err != nil { + return err + } + for i := 0; i < len(arg.ID); i++ { + cbp, err := json.Marshal(connectionsByProto[i]) + if err != nil { + return xerrors.Errorf("failed to marshal connections_by_proto: %w", err) + } + stat := database.WorkspaceAgentStat{ + ID: arg.ID[i], + CreatedAt: arg.CreatedAt[i], + WorkspaceID: arg.WorkspaceID[i], + AgentID: arg.AgentID[i], + UserID: arg.UserID[i], + ConnectionsByProto: cbp, + ConnectionCount: arg.ConnectionCount[i], + RxPackets: arg.RxPackets[i], + RxBytes: arg.RxBytes[i], + TxPackets: arg.TxPackets[i], + TxBytes: arg.TxBytes[i], + TemplateID: arg.TemplateID[i], + SessionCountVSCode: arg.SessionCountVSCode[i], + SessionCountJetBrains: arg.SessionCountJetBrains[i], + SessionCountReconnectingPTY: arg.SessionCountReconnectingPTY[i], + SessionCountSSH: arg.SessionCountSSH[i], + ConnectionMedianLatencyMS: arg.ConnectionMedianLatencyMS[i], + } + q.workspaceAgentStats = append(q.workspaceAgentStats, stat) + } + + return nil +} + func (q *FakeQuerier) InsertWorkspaceApp(_ context.Context, arg database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) { if err := validateDatabaseType(arg); err != nil { return database.WorkspaceApp{}, err diff --git a/coderd/database/dbmetrics/dbmetrics.go b/coderd/database/dbmetrics/dbmetrics.go index ee7f8ae53b433..85ee8c26a0d51 100644 --- a/coderd/database/dbmetrics/dbmetrics.go +++ b/coderd/database/dbmetrics/dbmetrics.go @@ -1236,6 +1236,13 @@ func (m metricsStore) InsertWorkspaceAgentStat(ctx context.Context, arg database return stat, err } +func (m metricsStore) InsertWorkspaceAgentStats(ctx context.Context, arg database.InsertWorkspaceAgentStatsParams) error { + start := time.Now() + r0 := m.s.InsertWorkspaceAgentStats(ctx, arg) + m.queryLatencies.WithLabelValues("InsertWorkspaceAgentStats").Observe(time.Since(start).Seconds()) + return r0 +} + func (m metricsStore) InsertWorkspaceApp(ctx context.Context, arg database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) { start := time.Now() app, err := m.s.InsertWorkspaceApp(ctx, arg) diff --git a/coderd/database/dbmock/dbmock.go b/coderd/database/dbmock/dbmock.go index 6a0edec4f015d..cb7278369884b 100644 --- a/coderd/database/dbmock/dbmock.go +++ b/coderd/database/dbmock/dbmock.go @@ -2598,6 +2598,20 @@ func (mr *MockStoreMockRecorder) InsertWorkspaceAgentStat(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertWorkspaceAgentStat", reflect.TypeOf((*MockStore)(nil).InsertWorkspaceAgentStat), arg0, arg1) } +// InsertWorkspaceAgentStats mocks base method. +func (m *MockStore) InsertWorkspaceAgentStats(arg0 context.Context, arg1 database.InsertWorkspaceAgentStatsParams) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InsertWorkspaceAgentStats", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InsertWorkspaceAgentStats indicates an expected call of InsertWorkspaceAgentStats. +func (mr *MockStoreMockRecorder) InsertWorkspaceAgentStats(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertWorkspaceAgentStats", reflect.TypeOf((*MockStore)(nil).InsertWorkspaceAgentStats), arg0, arg1) +} + // InsertWorkspaceApp mocks base method. func (m *MockStore) InsertWorkspaceApp(arg0 context.Context, arg1 database.InsertWorkspaceAppParams) (database.WorkspaceApp, error) { m.ctrl.T.Helper() diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 783524375f822..b308589ffc350 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -225,6 +225,7 @@ type sqlcQuerier interface { InsertWorkspaceAgentLogs(ctx context.Context, arg InsertWorkspaceAgentLogsParams) ([]WorkspaceAgentLog, error) InsertWorkspaceAgentMetadata(ctx context.Context, arg InsertWorkspaceAgentMetadataParams) error InsertWorkspaceAgentStat(ctx context.Context, arg InsertWorkspaceAgentStatParams) (WorkspaceAgentStat, error) + InsertWorkspaceAgentStats(ctx context.Context, arg InsertWorkspaceAgentStatsParams) error InsertWorkspaceApp(ctx context.Context, arg InsertWorkspaceAppParams) (WorkspaceApp, error) InsertWorkspaceBuild(ctx context.Context, arg InsertWorkspaceBuildParams) error InsertWorkspaceBuildParameters(ctx context.Context, arg InsertWorkspaceBuildParametersParams) error diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index ba0f3cfb54188..3e07fc22f1aa3 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -7418,6 +7418,90 @@ func (q *sqlQuerier) InsertWorkspaceAgentStat(ctx context.Context, arg InsertWor return i, err } +const insertWorkspaceAgentStats = `-- name: InsertWorkspaceAgentStats :exec +INSERT INTO + workspace_agent_stats ( + id, + created_at, + user_id, + workspace_id, + template_id, + agent_id, + connections_by_proto, + connection_count, + rx_packets, + rx_bytes, + tx_packets, + tx_bytes, + session_count_vscode, + session_count_jetbrains, + session_count_reconnecting_pty, + session_count_ssh, + connection_median_latency_ms + ) +SELECT + unnest($1 :: uuid[]) AS id, + unnest($2 :: timestamptz[]) AS created_at, + unnest($3 :: uuid[]) AS user_id, + unnest($4 :: uuid[]) AS workspace_id, + unnest($5 :: uuid[]) AS template_id, + unnest($6 :: uuid[]) AS agent_id, + jsonb_array_elements($7 :: jsonb) AS connections_by_proto, + unnest($8 :: bigint[]) AS connection_count, + unnest($9 :: bigint[]) AS rx_packets, + unnest($10 :: bigint[]) AS rx_bytes, + unnest($11 :: bigint[]) AS tx_packets, + unnest($12 :: bigint[]) AS tx_bytes, + unnest($13 :: bigint[]) AS session_count_vscode, + unnest($14 :: bigint[]) AS session_count_jetbrains, + unnest($15 :: bigint[]) AS session_count_reconnecting_pty, + unnest($16 :: bigint[]) AS session_count_ssh, + unnest($17 :: double precision[]) AS connection_median_latency_ms +` + +type InsertWorkspaceAgentStatsParams struct { + ID []uuid.UUID `db:"id" json:"id"` + CreatedAt []time.Time `db:"created_at" json:"created_at"` + UserID []uuid.UUID `db:"user_id" json:"user_id"` + WorkspaceID []uuid.UUID `db:"workspace_id" json:"workspace_id"` + TemplateID []uuid.UUID `db:"template_id" json:"template_id"` + AgentID []uuid.UUID `db:"agent_id" json:"agent_id"` + ConnectionsByProto json.RawMessage `db:"connections_by_proto" json:"connections_by_proto"` + ConnectionCount []int64 `db:"connection_count" json:"connection_count"` + RxPackets []int64 `db:"rx_packets" json:"rx_packets"` + RxBytes []int64 `db:"rx_bytes" json:"rx_bytes"` + TxPackets []int64 `db:"tx_packets" json:"tx_packets"` + TxBytes []int64 `db:"tx_bytes" json:"tx_bytes"` + SessionCountVSCode []int64 `db:"session_count_vscode" json:"session_count_vscode"` + SessionCountJetBrains []int64 `db:"session_count_jetbrains" json:"session_count_jetbrains"` + SessionCountReconnectingPTY []int64 `db:"session_count_reconnecting_pty" json:"session_count_reconnecting_pty"` + SessionCountSSH []int64 `db:"session_count_ssh" json:"session_count_ssh"` + ConnectionMedianLatencyMS []float64 `db:"connection_median_latency_ms" json:"connection_median_latency_ms"` +} + +func (q *sqlQuerier) InsertWorkspaceAgentStats(ctx context.Context, arg InsertWorkspaceAgentStatsParams) error { + _, err := q.db.ExecContext(ctx, insertWorkspaceAgentStats, + pq.Array(arg.ID), + pq.Array(arg.CreatedAt), + pq.Array(arg.UserID), + pq.Array(arg.WorkspaceID), + pq.Array(arg.TemplateID), + pq.Array(arg.AgentID), + arg.ConnectionsByProto, + pq.Array(arg.ConnectionCount), + pq.Array(arg.RxPackets), + pq.Array(arg.RxBytes), + pq.Array(arg.TxPackets), + pq.Array(arg.TxBytes), + pq.Array(arg.SessionCountVSCode), + pq.Array(arg.SessionCountJetBrains), + pq.Array(arg.SessionCountReconnectingPTY), + pq.Array(arg.SessionCountSSH), + pq.Array(arg.ConnectionMedianLatencyMS), + ) + return err +} + const getWorkspaceAppByAgentIDAndSlug = `-- name: GetWorkspaceAppByAgentIDAndSlug :one SELECT id, created_at, agent_id, display_name, icon, command, url, healthcheck_url, healthcheck_interval, healthcheck_threshold, health, subdomain, sharing_level, slug, external FROM workspace_apps WHERE agent_id = $1 AND slug = $2 ` diff --git a/coderd/database/queries/workspaceagentstats.sql b/coderd/database/queries/workspaceagentstats.sql index 1a598bd6a6263..daba093a3d9e1 100644 --- a/coderd/database/queries/workspaceagentstats.sql +++ b/coderd/database/queries/workspaceagentstats.sql @@ -22,6 +22,46 @@ INSERT INTO VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) RETURNING *; +-- name: InsertWorkspaceAgentStats :exec +INSERT INTO + workspace_agent_stats ( + id, + created_at, + user_id, + workspace_id, + template_id, + agent_id, + connections_by_proto, + connection_count, + rx_packets, + rx_bytes, + tx_packets, + tx_bytes, + session_count_vscode, + session_count_jetbrains, + session_count_reconnecting_pty, + session_count_ssh, + connection_median_latency_ms + ) +SELECT + unnest(@id :: uuid[]) AS id, + unnest(@created_at :: timestamptz[]) AS created_at, + unnest(@user_id :: uuid[]) AS user_id, + unnest(@workspace_id :: uuid[]) AS workspace_id, + unnest(@template_id :: uuid[]) AS template_id, + unnest(@agent_id :: uuid[]) AS agent_id, + jsonb_array_elements(@connections_by_proto :: jsonb) AS connections_by_proto, + unnest(@connection_count :: bigint[]) AS connection_count, + unnest(@rx_packets :: bigint[]) AS rx_packets, + unnest(@rx_bytes :: bigint[]) AS rx_bytes, + unnest(@tx_packets :: bigint[]) AS tx_packets, + unnest(@tx_bytes :: bigint[]) AS tx_bytes, + unnest(@session_count_vscode :: bigint[]) AS session_count_vscode, + unnest(@session_count_jetbrains :: bigint[]) AS session_count_jetbrains, + unnest(@session_count_reconnecting_pty :: bigint[]) AS session_count_reconnecting_pty, + unnest(@session_count_ssh :: bigint[]) AS session_count_ssh, + unnest(@connection_median_latency_ms :: double precision[]) AS connection_median_latency_ms; + -- name: GetTemplateDAUs :many SELECT (created_at at TIME ZONE cast(@tz_offset::integer as text))::date as date, diff --git a/coderd/prometheusmetrics/prometheusmetrics_test.go b/coderd/prometheusmetrics/prometheusmetrics_test.go index 3ea774df1186d..ad39ec840c526 100644 --- a/coderd/prometheusmetrics/prometheusmetrics_test.go +++ b/coderd/prometheusmetrics/prometheusmetrics_test.go @@ -11,6 +11,9 @@ import ( "testing" "time" + "github.com/coder/coder/coderd/batchstats" + "github.com/coder/coder/coderd/database/dbtestutil" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -372,9 +375,29 @@ func TestAgents(t *testing.T) { func TestAgentStats(t *testing.T) { t.Parallel() + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + db, pubsub := dbtestutil.NewDB(t) + log := slogtest.Make(t, nil) + + batcher, closeBatcher, err := batchstats.New(ctx, + batchstats.WithStore(db), + // We want our stats, and we want them NOW. + batchstats.WithBatchSize(1), + batchstats.WithInterval(time.Hour), + batchstats.WithLogger(log), + ) + require.NoError(t, err, "create stats batcher failed") + t.Cleanup(closeBatcher) + // Build sample workspaces with test agents and fake agent client - client, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) - db := api.Database + client, _, _ := coderdtest.NewWithAPI(t, &coderdtest.Options{ + Database: db, + IncludeProvisionerDaemon: true, + Pubsub: pubsub, + StatsBatcher: batcher, + }) user := coderdtest.CreateFirstUser(t, client) @@ -384,11 +407,7 @@ func TestAgentStats(t *testing.T) { registry := prometheus.NewRegistry() - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - // given - var err error var i int64 for i = 0; i < 3; i++ { _, err = agent1.PostStats(ctx, &agentsdk.Stats{ diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 8567ff1d895b3..0f5607db73436 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -1410,36 +1410,12 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques activityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID) } - payload, err := json.Marshal(req.ConnectionsByProto) - if err != nil { - api.Logger.Error(ctx, "marshal agent connections by proto", slog.F("workspace_agent_id", workspaceAgent.ID), slog.Error(err)) - payload = json.RawMessage("{}") - } - now := database.Now() var errGroup errgroup.Group errGroup.Go(func() error { - _, err = api.Database.InsertWorkspaceAgentStat(ctx, database.InsertWorkspaceAgentStatParams{ - ID: uuid.New(), - CreatedAt: now, - AgentID: workspaceAgent.ID, - WorkspaceID: workspace.ID, - UserID: workspace.OwnerID, - TemplateID: workspace.TemplateID, - ConnectionsByProto: payload, - ConnectionCount: req.ConnectionCount, - RxPackets: req.RxPackets, - RxBytes: req.RxBytes, - TxPackets: req.TxPackets, - TxBytes: req.TxBytes, - SessionCountVSCode: req.SessionCountVSCode, - SessionCountJetBrains: req.SessionCountJetBrains, - SessionCountReconnectingPTY: req.SessionCountReconnectingPTY, - SessionCountSSH: req.SessionCountSSH, - ConnectionMedianLatencyMS: req.ConnectionMedianLatencyMS, - }) - if err != nil { + if err := api.statsBatcher.Add(workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil { + api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err)) return xerrors.Errorf("can't insert workspace agent stat: %w", err) } return nil