Skip to content

chore: move stat reporting into workspacestats package #13386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
Expand Down Expand Up @@ -59,7 +60,7 @@ type Options struct {
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsBatcher StatsBatcher
StatsReporter *workspacestats.Reporter
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
Expand Down Expand Up @@ -114,12 +115,9 @@ func New(opts Options) *API {
api.StatsAPI = &StatsAPI{
AgentFn: api.agent,
Database: opts.Database,
Pubsub: opts.Pubsub,
Log: opts.Log,
StatsBatcher: opts.StatsBatcher,
TemplateScheduleStore: opts.TemplateScheduleStore,
StatsReporter: opts.StatsReporter,
AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval,
UpdateAgentMetricsFn: opts.UpdateAgentMetricsFn,
}

api.LifecycleAPI = &LifecycleAPI{
Expand Down
93 changes: 11 additions & 82 deletions coderd/agentapi/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package agentapi

import (
"context"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"

Expand All @@ -15,10 +13,7 @@ import (
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/coderd/workspacestats"
)

type StatsBatcher interface {
Expand All @@ -28,12 +23,9 @@ type StatsBatcher interface {
type StatsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Pubsub pubsub.Pubsub
Log slog.Logger
StatsBatcher StatsBatcher
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsReporter *workspacestats.Reporter
AgentStatsRefreshInterval time.Duration
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)

TimeNowFn func() time.Time // defaults to dbtime.Now()
}
Expand Down Expand Up @@ -69,80 +61,17 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR
slog.F("payload", req),
)

now := a.now()
if req.Stats.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart)
}

var errGroup errgroup.Group
errGroup.Go(func() error {
err := a.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req.Stats)
if err != nil {
a.Log.Error(ctx, "add agent stats to batcher", slog.Error(err))
return xerrors.Errorf("insert workspace agent stats batch: %w", err)
}
return nil
})
errGroup.Go(func() error {
// nolint:gocritic // (#13146) Will be moved soon as part of refactor.
err := a.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
if err != nil {
return xerrors.Errorf("update workspace LastUsedAt: %w", err)
}
return nil
})
if a.UpdateAgentMetricsFn != nil {
errGroup.Go(func() error {
user, err := a.Database.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
return xerrors.Errorf("get user: %w", err)
}

a.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: getWorkspaceAgentByIDRow.TemplateName,
}, req.Stats.Metrics)
return nil
})
}
err = errGroup.Wait()
err = a.StatsReporter.ReportAgentStats(
ctx,
a.now(),
workspace,
workspaceAgent,
getWorkspaceAgentByIDRow.TemplateName,
req.Stats,
)
if err != nil {
return nil, xerrors.Errorf("update stats in database: %w", err)
return nil, xerrors.Errorf("report agent stats: %w", err)
}

// Tell the frontend about the new agent report, now that everything is updated
a.publishWorkspaceAgentStats(ctx, workspace.ID)

return res, nil
}

func (a *StatsAPI) publishWorkspaceAgentStats(ctx context.Context, workspaceID uuid.UUID) {
err := a.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{})
if err != nil {
a.Log.Warn(ctx, "failed to publish workspace agent stats",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}
91 changes: 52 additions & 39 deletions coderd/agentapi/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
)
Expand Down Expand Up @@ -129,21 +130,24 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
}),
AgentStatsRefreshInterval: 10 * time.Second,
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
TimeNowFn: func() time.Time {
return now
},
Expand Down Expand Up @@ -232,13 +236,16 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
// Ignored when nil.
UpdateAgentMetricsFn: nil,
}),
AgentStatsRefreshInterval: 10 * time.Second,
// Ignored when nil.
UpdateAgentMetricsFn: nil,
TimeNowFn: func() time.Time {
return now
},
Expand Down Expand Up @@ -274,12 +281,15 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: nil, // should not be called
TemplateScheduleStore: nil, // should not be called
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: nil, // should not be called
TemplateScheduleStore: nil, // should not be called
UpdateAgentMetricsFn: nil, // should not be called
}),
AgentStatsRefreshInterval: 10 * time.Second,
UpdateAgentMetricsFn: nil, // should not be called
TimeNowFn: func() time.Time {
panic("should not be called")
},
Expand Down Expand Up @@ -343,21 +353,24 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
}),
AgentStatsRefreshInterval: 15 * time.Second,
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
TimeNowFn: func() time.Time {
return now
},
Expand Down
16 changes: 12 additions & 4 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/coder/coder/v2/coderd/updatecheck"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/coderd/workspaceusage"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
Expand Down Expand Up @@ -547,13 +548,22 @@ func New(options *Options) *API {
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
}

api.statsReporter = workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: options.Database,
Logger: options.Logger.Named("workspacestats"),
Pubsub: options.Pubsub,
TemplateScheduleStore: options.TemplateScheduleStore,
StatsBatcher: options.StatsBatcher,
UpdateAgentMetricsFn: options.UpdateAgentMetrics,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
workspaceAppsLogger := options.Logger.Named("workspaceapps")
if options.WorkspaceAppsStatsCollectorOptions.Logger == nil {
named := workspaceAppsLogger.Named("stats_collector")
options.WorkspaceAppsStatsCollectorOptions.Logger = &named
}
if options.WorkspaceAppsStatsCollectorOptions.Reporter == nil {
options.WorkspaceAppsStatsCollectorOptions.Reporter = workspaceapps.NewStatsDBReporter(options.Database, workspaceapps.DefaultStatsDBReporterBatchSize)
options.WorkspaceAppsStatsCollectorOptions.Reporter = api.statsReporter
}

api.workspaceAppServer = &workspaceapps.Server{
Expand Down Expand Up @@ -623,8 +633,6 @@ func New(options *Options) *API {
cors := httpmw.Cors(options.DeploymentValues.Dangerous.AllowAllCors.Value())
prometheusMW := httpmw.Prometheus(options.PrometheusRegistry)

api.statsBatcher = options.StatsBatcher

r.Use(
httpmw.Recover(api.Logger),
tracing.StatusWriterMiddleware,
Expand Down Expand Up @@ -1277,7 +1285,7 @@ type API struct {
healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport]
healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport]

statsBatcher *batchstats.Batcher
statsReporter *workspacestats.Reporter

Acquirer *provisionerdserver.Acquirer
// dbRolluper rolls up template usage stats from raw agent and app
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/dbauthz/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *MethodTestSuite) NotAuthorizedErrorTest(ctx context.Context, az *coderd
// any case where the error is nil and the response is an empty slice.
if err != nil || !hasEmptySliceResponse(resp) {
s.Errorf(err, "method should an error with cancellation")
s.ErrorIsf(err, context.Canceled, "error should match context.Cancelled")
s.ErrorIsf(err, context.Canceled, "error should match context.Canceled")
}
})
}
Expand Down
15 changes: 11 additions & 4 deletions coderd/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
Expand Down Expand Up @@ -736,9 +737,12 @@ func TestTemplateInsights_Golden(t *testing.T) {
})
}
}
reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize)
reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: db,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
//nolint:gocritic // This is a test.
err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats)
err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats)
require.NoError(t, err, "want no error inserting app stats")

return client, events
Expand Down Expand Up @@ -1632,9 +1636,12 @@ func TestUserActivityInsights_Golden(t *testing.T) {
})
}
}
reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize)
reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: db,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
//nolint:gocritic // This is a test.
err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats)
err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats)
require.NoError(t, err, "want no error inserting app stats")

return client, events
Expand Down
Loading
Loading