Skip to content

fix: stop incrementing activity on empty agent stats #15204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions coderd/agentapi/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestUpdateStates(t *testing.T) {
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
UsageTracker: workspacestats.NewTracker(dbM),
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
Expand All @@ -132,21 +133,15 @@ func TestUpdateStates(t *testing.T) {
TemplateName: template.Name,
}, nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)

// We expect an activity bump because ConnectionCount > 0.
dbM.EXPECT().ActivityBumpWorkspace(gomock.Any(), database.ActivityBumpWorkspaceParams{
WorkspaceID: workspace.ID,
NextAutostart: time.Time{}.UTC(),
}).Return(nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
}).Return(nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)

// Ensure that pubsub notifications are sent.
notifyDescription := make(chan []byte)
ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) {
Expand Down Expand Up @@ -213,6 +208,7 @@ func TestUpdateStates(t *testing.T) {
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
UsageTracker: workspacestats.NewTracker(dbM),
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
// Ignored when nil.
Expand All @@ -230,12 +226,6 @@ func TestUpdateStates(t *testing.T) {
TemplateName: template.Name,
}, nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
}).Return(nil)

_, err := api.UpdateStats(context.Background(), req)
require.NoError(t, err)
})
Expand Down Expand Up @@ -330,6 +320,7 @@ func TestUpdateStates(t *testing.T) {
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
UsageTracker: workspacestats.NewTracker(dbM),
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
Expand Down Expand Up @@ -439,6 +430,7 @@ func TestUpdateStates(t *testing.T) {
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
UsageTracker: workspacestats.NewTracker(dbM),
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
Expand Down
23 changes: 20 additions & 3 deletions coderd/httpapi/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package httpapi

import (
"context"
"errors"
"time"

"golang.org/x/xerrors"
"nhooyr.io/websocket"

"cdr.dev/slog"
Expand Down Expand Up @@ -31,7 +33,8 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) {
// Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping
// failure.
func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) {
ticker := time.NewTicker(15 * time.Second)
interval := 15 * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
Expand All @@ -40,12 +43,26 @@ func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *
return
case <-ticker.C:
}
err := conn.Ping(ctx)
err := pingWithTimeout(ctx, conn, interval)
if err != nil {
// context.DeadlineExceeded is expected when the client disconnects without sending a close frame
if !errors.Is(err, context.DeadlineExceeded) {
logger.Error(ctx, "failed to heartbeat ping", slog.Error(err))
}
_ = conn.Close(websocket.StatusGoingAway, "Ping failed")
logger.Info(ctx, "failed to heartbeat ping", slog.Error(err))
exit()
return
}
}
}

func pingWithTimeout(ctx context.Context, conn *websocket.Conn, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := conn.Ping(ctx)
if err != nil {
return xerrors.Errorf("failed to ping: %w", err)
}

return nil
}
6 changes: 2 additions & 4 deletions coderd/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,14 +700,13 @@ func TestTemplateInsights_Golden(t *testing.T) {
connectionCount = 0
}
for createdAt.Before(stat.endedAt) {
err = batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
ConnectionCount: connectionCount,
SessionCountVscode: stat.sessionCountVSCode,
SessionCountJetbrains: stat.sessionCountJetBrains,
SessionCountReconnectingPty: stat.sessionCountReconnectingPTY,
SessionCountSsh: stat.sessionCountSSH,
}, false)
require.NoError(t, err, "want no error inserting agent stats")
createdAt = createdAt.Add(30 * time.Second)
}
}
Expand Down Expand Up @@ -1599,14 +1598,13 @@ func TestUserActivityInsights_Golden(t *testing.T) {
connectionCount = 0
}
for createdAt.Before(stat.endedAt) {
err = batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
ConnectionCount: connectionCount,
SessionCountVscode: stat.sessionCountVSCode,
SessionCountJetbrains: stat.sessionCountJetBrains,
SessionCountReconnectingPty: stat.sessionCountReconnectingPTY,
SessionCountSsh: stat.sessionCountSSH,
}, false)
require.NoError(t, err, "want no error inserting agent stats")
createdAt = createdAt.Add(30 * time.Second)
}
}
Expand Down
7 changes: 3 additions & 4 deletions coderd/workspaceapps/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,6 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))

report := newStatsReportFromSignedToken(appToken)
s.collectStats(report)
defer func() {
// We must use defer here because ServeHTTP may panic.
report.SessionEndedAt = dbtime.Now()
Expand All @@ -614,7 +613,8 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT
// @Success 101
// @Router /workspaceagents/{workspaceagent}/pty [get]
func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, cancel := context.WithCancel(r.Context())
defer cancel()

s.websocketWaitMutex.Lock()
s.websocketWaitGroup.Add(1)
Expand Down Expand Up @@ -670,12 +670,11 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
})
return
}
go httpapi.HeartbeatClose(ctx, s.Logger, cancel, conn)

ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageBinary)
defer wsNetConn.Close() // Also closes conn.

go httpapi.Heartbeat(ctx, conn)

agentConn, release, err := s.AgentProvider.AgentConn(ctx, appToken.AgentID)
if err != nil {
log.Debug(ctx, "dial workspace agent", slog.Error(err))
Expand Down
5 changes: 2 additions & 3 deletions coderd/workspacestats/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
)

type Batcher interface {
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool) error
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: This does make the whole interface more convenient to use 👍

}

// DBBatcher holds a buffer of agent stats and periodically flushes them to
Expand Down Expand Up @@ -139,7 +139,7 @@ func (b *DBBatcher) Add(
workspaceID uuid.UUID,
st *agentproto.Stats,
usage bool,
) error {
) {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down Expand Up @@ -176,7 +176,6 @@ func (b *DBBatcher) Add(
b.flushLever <- struct{}{}
b.flushForced.Store(true)
}
return nil
}

// Run runs the batcher.
Expand Down
6 changes: 3 additions & 3 deletions coderd/workspacestats/batcher_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBatchStats(t *testing.T) {
// Given: a single data point is added for workspace
t2 := t1.Add(time.Second)
t.Logf("inserting 1 stat")
require.NoError(t, b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false))
b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false)

// When: it becomes time to report stats
// Signal a tick and wait for a flush to complete.
Expand All @@ -87,9 +87,9 @@ func TestBatchStats(t *testing.T) {
t.Logf("inserting %d stats", defaultBufferSize)
for i := 0; i < defaultBufferSize; i++ {
if i%2 == 0 {
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false))
b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false)
} else {
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randStats(t), false))
b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randStats(t), false)
}
}
}()
Expand Down
99 changes: 43 additions & 56 deletions coderd/workspacestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand Down Expand Up @@ -119,69 +118,57 @@ func (r *Reporter) ReportAppStats(ctx context.Context, stats []workspaceapps.Sta
}

func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspace database.Workspace, workspaceAgent database.WorkspaceAgent, templateName string, stats *agentproto.Stats, usage bool) error {
if stats.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(r.opts.TemplateScheduleStore.Load())).Get(ctx, r.opts.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
r.opts.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
ActivityBumpWorkspace(ctx, r.opts.Logger.Named("activity_bump"), r.opts.Database, workspace.ID, nextAutostart)
}
// update agent stats
r.opts.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, stats, usage)

var errGroup errgroup.Group
errGroup.Go(func() error {
err := r.opts.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, stats, usage)
// update prometheus metrics
if r.opts.UpdateAgentMetricsFn != nil {
user, err := r.opts.Database.GetUserByID(ctx, workspace.OwnerID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought (non-blocking): It's unfortunate to have to do this just to get the username related to the workspace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we just have generally avoided database types that embed other types as a default strategy which leads to cleaner types but unfortunately sometimes more fetching. I think it's OK for now and can always be optimized if found to be a bottleneck later. It's a good goal to get this func as low IO as possible.

if err != nil {
r.opts.Logger.Error(ctx, "add agent stats to batcher", slog.Error(err))
return xerrors.Errorf("insert workspace agent stats batch: %w", err)
return xerrors.Errorf("get user: %w", err)
}

r.opts.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: templateName,
}, stats.Metrics)
}

// if no active connections we do not bump activity
if stats.ConnectionCount == 0 {
return nil
})
errGroup.Go(func() error {
err := r.opts.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
}

// check next autostart
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(r.opts.TemplateScheduleStore.Load())).Get(ctx, r.opts.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
return xerrors.Errorf("update workspace LastUsedAt: %w", err)
}
return nil
})
if r.opts.UpdateAgentMetricsFn != nil {
errGroup.Go(func() error {
user, err := r.opts.Database.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
return xerrors.Errorf("get user: %w", err)
r.opts.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}

r.opts.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: templateName,
}, stats.Metrics)
return nil
})
}
err := errGroup.Wait()
if err != nil {
return xerrors.Errorf("update stats in database: %w", err)
}
}

err = r.opts.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{})
// bump workspace activity
ActivityBumpWorkspace(ctx, r.opts.Logger.Named("activity_bump"), r.opts.Database, workspace.ID, nextAutostart)

// bump workspace last_used_at
r.opts.UsageTracker.Add(workspace.ID)

// notify workspace update
err := r.opts.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{})
if err != nil {
r.opts.Logger.Warn(ctx, "failed to publish workspace agent stats",
slog.F("workspace_id", workspace.ID), slog.Error(err))
Expand Down
1 change: 0 additions & 1 deletion coderd/workspacestats/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func (tr *UsageTracker) flush(now time.Time) {
authCtx := dbauthz.AsSystemRestricted(ctx)
tr.flushLock.Lock()
defer tr.flushLock.Unlock()
// nolint:gocritic // (#13146) Will be moved soon as part of refactor.
if err := tr.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{
LastUsedAt: now,
IDs: ids,
Expand Down
3 changes: 1 addition & 2 deletions coderd/workspacestats/workspacestatstest/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type StatsBatcher struct {

var _ workspacestats.Batcher = &StatsBatcher{}

func (b *StatsBatcher) Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool) error {
func (b *StatsBatcher) Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool) {
b.Mu.Lock()
defer b.Mu.Unlock()
b.Called++
Expand All @@ -36,5 +36,4 @@ func (b *StatsBatcher) Add(now time.Time, agentID uuid.UUID, templateID uuid.UUI
b.LastWorkspaceID = workspaceID
b.LastStats = st
b.LastUsage = usage
return nil
}
Loading