Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
Expand Down Expand Up @@ -59,7 +60,7 @@ type Options struct {
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsBatcher StatsBatcher
StatsReporter *workspacestats.Reporter
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
Expand Down Expand Up @@ -114,12 +115,9 @@ func New(opts Options) *API {
api.StatsAPI = &StatsAPI{
AgentFn: api.agent,
Database: opts.Database,
Pubsub: opts.Pubsub,
Log: opts.Log,
StatsBatcher: opts.StatsBatcher,
TemplateScheduleStore: opts.TemplateScheduleStore,
StatsReporter: opts.StatsReporter,
AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval,
UpdateAgentMetricsFn: opts.UpdateAgentMetricsFn,
}

api.LifecycleAPI = &LifecycleAPI{
Expand Down
93 changes: 11 additions & 82 deletions coderd/agentapi/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package agentapi

import (
"context"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"

Expand All @@ -15,10 +13,7 @@ import (
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/coderd/workspacestats"
)

type StatsBatcher interface {
Expand All @@ -28,12 +23,9 @@ type StatsBatcher interface {
type StatsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Pubsub pubsub.Pubsub
Log slog.Logger
StatsBatcher StatsBatcher
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsReporter *workspacestats.Reporter
AgentStatsRefreshInterval time.Duration
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)

TimeNowFn func() time.Time // defaults to dbtime.Now()
}
Expand Down Expand Up @@ -69,80 +61,17 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR
slog.F("payload", req),
)

now := a.now()
if req.Stats.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart)
}

var errGroup errgroup.Group
errGroup.Go(func() error {
err := a.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req.Stats)
if err != nil {
a.Log.Error(ctx, "add agent stats to batcher", slog.Error(err))
return xerrors.Errorf("insert workspace agent stats batch: %w", err)
}
return nil
})
errGroup.Go(func() error {
// nolint:gocritic // (#13146) Will be moved soon as part of refactor.
err := a.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
if err != nil {
return xerrors.Errorf("update workspace LastUsedAt: %w", err)
}
return nil
})
if a.UpdateAgentMetricsFn != nil {
errGroup.Go(func() error {
user, err := a.Database.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
return xerrors.Errorf("get user: %w", err)
}

a.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: getWorkspaceAgentByIDRow.TemplateName,
}, req.Stats.Metrics)
return nil
})
}
err = errGroup.Wait()
err = a.StatsReporter.ReportAgentStats(
ctx,
a.now(),
workspace,
workspaceAgent,
getWorkspaceAgentByIDRow.TemplateName,
req.Stats,
)
if err != nil {
return nil, xerrors.Errorf("update stats in database: %w", err)
return nil, xerrors.Errorf("report agent stats: %w", err)
}

// Tell the frontend about the new agent report, now that everything is updated
a.publishWorkspaceAgentStats(ctx, workspace.ID)

return res, nil
}

func (a *StatsAPI) publishWorkspaceAgentStats(ctx context.Context, workspaceID uuid.UUID) {
err := a.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{})
if err != nil {
a.Log.Warn(ctx, "failed to publish workspace agent stats",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}
91 changes: 52 additions & 39 deletions coderd/agentapi/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
)
Expand Down Expand Up @@ -129,21 +130,24 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
}),
AgentStatsRefreshInterval: 10 * time.Second,
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
TimeNowFn: func() time.Time {
return now
},
Expand Down Expand Up @@ -232,13 +236,16 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
// Ignored when nil.
UpdateAgentMetricsFn: nil,
}),
AgentStatsRefreshInterval: 10 * time.Second,
// Ignored when nil.
UpdateAgentMetricsFn: nil,
TimeNowFn: func() time.Time {
return now
},
Expand Down Expand Up @@ -274,12 +281,15 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: nil, // should not be called
TemplateScheduleStore: nil, // should not be called
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: nil, // should not be called
TemplateScheduleStore: nil, // should not be called
UpdateAgentMetricsFn: nil, // should not be called
}),
AgentStatsRefreshInterval: 10 * time.Second,
UpdateAgentMetricsFn: nil, // should not be called
TimeNowFn: func() time.Time {
panic("should not be called")
},
Expand Down Expand Up @@ -343,21 +353,24 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
}),
AgentStatsRefreshInterval: 15 * time.Second,
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
TimeNowFn: func() time.Time {
return now
},
Expand Down
2 changes: 1 addition & 1 deletion coderd/apikey/apikey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestGenerate(t *testing.T) {

// Assert that the hashed secret is correct.
hashed := sha256.Sum256([]byte(keytokens[1]))
assert.ElementsMatch(t, hashed, key.HashedSecret[:])
assert.ElementsMatch(t, hashed, key.HashedSecret)

assert.Equal(t, tc.params.UserID, key.UserID)
assert.WithinDuration(t, dbtime.Now(), key.CreatedAt, time.Second*5)
Expand Down
16 changes: 12 additions & 4 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/coder/coder/v2/coderd/updatecheck"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/coderd/workspaceusage"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
Expand Down Expand Up @@ -547,13 +548,22 @@ func New(options *Options) *API {
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
}

api.statsReporter = workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: options.Database,
Logger: options.Logger.Named("workspacestats"),
Pubsub: options.Pubsub,
TemplateScheduleStore: options.TemplateScheduleStore,
StatsBatcher: options.StatsBatcher,
UpdateAgentMetricsFn: options.UpdateAgentMetrics,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
workspaceAppsLogger := options.Logger.Named("workspaceapps")
if options.WorkspaceAppsStatsCollectorOptions.Logger == nil {
named := workspaceAppsLogger.Named("stats_collector")
options.WorkspaceAppsStatsCollectorOptions.Logger = &named
}
if options.WorkspaceAppsStatsCollectorOptions.Reporter == nil {
options.WorkspaceAppsStatsCollectorOptions.Reporter = workspaceapps.NewStatsDBReporter(options.Database, workspaceapps.DefaultStatsDBReporterBatchSize)
options.WorkspaceAppsStatsCollectorOptions.Reporter = api.statsReporter
}

api.workspaceAppServer = &workspaceapps.Server{
Expand Down Expand Up @@ -623,8 +633,6 @@ func New(options *Options) *API {
cors := httpmw.Cors(options.DeploymentValues.Dangerous.AllowAllCors.Value())
prometheusMW := httpmw.Prometheus(options.PrometheusRegistry)

api.statsBatcher = options.StatsBatcher

r.Use(
httpmw.Recover(api.Logger),
tracing.StatusWriterMiddleware,
Expand Down Expand Up @@ -1277,7 +1285,7 @@ type API struct {
healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport]
healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport]

statsBatcher *batchstats.Batcher
statsReporter *workspacestats.Reporter

Acquirer *provisionerdserver.Acquirer
// dbRolluper rolls up template usage stats from raw agent and app
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/dbauthz/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *MethodTestSuite) NotAuthorizedErrorTest(ctx context.Context, az *coderd
// any case where the error is nil and the response is an empty slice.
if err != nil || !hasEmptySliceResponse(resp) {
s.Errorf(err, "method should an error with cancellation")
s.ErrorIsf(err, context.Canceled, "error should match context.Cancelled")
s.ErrorIsf(err, context.Canceled, "error should match context.Canceled")
}
})
}
Expand Down
15 changes: 11 additions & 4 deletions coderd/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
Expand Down Expand Up @@ -736,9 +737,12 @@ func TestTemplateInsights_Golden(t *testing.T) {
})
}
}
reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize)
reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: db,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
//nolint:gocritic // This is a test.
err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats)
err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats)
require.NoError(t, err, "want no error inserting app stats")

return client, events
Expand Down Expand Up @@ -1632,9 +1636,12 @@ func TestUserActivityInsights_Golden(t *testing.T) {
})
}
}
reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize)
reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: db,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
//nolint:gocritic // This is a test.
err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats)
err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats)
require.NoError(t, err, "want no error inserting app stats")

return client, events
Expand Down
Loading