Skip to content

feat: expose agent stats via Prometheus endpoint #7115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8d4e67d
WIP
mtojek Apr 3, 2023
da729e6
Merge branch 'main' into 6724-metrics
mtojek Apr 4, 2023
9ad09b2
WIP
mtojek Apr 4, 2023
440657c
WIP
mtojek Apr 5, 2023
8764f89
Agents
mtojek Apr 5, 2023
663b5d5
fix
mtojek Apr 5, 2023
63aff5e
1min
mtojek Apr 5, 2023
3905481
fix
mtojek Apr 5, 2023
f8d6f46
WIP
mtojek Apr 5, 2023
d487a77
Test
mtojek Apr 5, 2023
7acbaf0
docs
mtojek Apr 5, 2023
7418779
fmt
mtojek Apr 5, 2023
3a8e4e6
Add timer to measure the metrics collection
mtojek Apr 6, 2023
b5d0581
Use CachedGaugeVec
mtojek Apr 6, 2023
e4d708b
Unit tests
mtojek Apr 6, 2023
199e549
WIP
mtojek Apr 7, 2023
7307bd3
Merge branch 'main' into 6724-metrics-2
mtojek Apr 12, 2023
d0b8398
WIP
mtojek Apr 13, 2023
f0c0418
db: GetWorkspaceAgentStatsAndLabels
mtojek Apr 13, 2023
970d35a
fmt
mtojek Apr 13, 2023
229f546
WIP
mtojek Apr 13, 2023
7070e0e
Merge branch 'main' into 6724-metrics-2
mtojek Apr 13, 2023
8c6f96b
gauges
mtojek Apr 13, 2023
1ed37b4
feat: collect
mtojek Apr 13, 2023
7ee1bfc
fix
mtojek Apr 13, 2023
2b8a9e4
fmt
mtojek Apr 13, 2023
322f7e8
minor fixes
mtojek Apr 14, 2023
c7af75a
Prometheus flag
mtojek Apr 14, 2023
9693fa8
fix
mtojek Apr 14, 2023
28f7a13
WIP
mtojek Apr 14, 2023
7878167
fix tests
mtojek Apr 14, 2023
d9e4903
WIP
mtojek Apr 14, 2023
0d37c85
fix json
mtojek Apr 14, 2023
f752c6f
Rx Tx bytes
mtojek Apr 14, 2023
9c7aef8
CloseFunc
mtojek Apr 14, 2023
5290571
fix
mtojek Apr 14, 2023
1cbe59b
fix
mtojek Apr 14, 2023
f8f11eb
Fixes
mtojek Apr 14, 2023
4ffae11
fix
mtojek Apr 14, 2023
7ba16b5
fix: IgnoreErrors
mtojek Apr 14, 2023
2a4c674
Fix: Windows
mtojek Apr 14, 2023
201da83
fix
mtojek Apr 14, 2023
ba52c45
reflect.DeepEquals
mtojek Apr 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: collect
  • Loading branch information
mtojek committed Apr 13, 2023
commit 1ed37b45ba89e43d5c81683c5243f432aec19031
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}
defer closeWorkspacesFunc()

closeAgentStatsFunc, err := prometheusmetrics.AgentStats(ctx, logger, options.PrometheusRegistry, options.Database, 0)
closeAgentStatsFunc, err := prometheusmetrics.AgentStats(ctx, logger, options.PrometheusRegistry, options.Database, time.Now(), 0)
if err != nil {
return xerrors.Errorf("register agent stats prometheus metric: %w", err)
}
Expand Down
71 changes: 69 additions & 2 deletions coderd/database/dbfake/databasefake.go
Original file line number Diff line number Diff line change
Expand Up @@ -3998,8 +3998,75 @@ func (q *fakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim
return stats, nil
}

func (q *fakeQuerier) GetWorkspaceAgentStatsAndLabels(_ context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsAndLabelsRow, error) {
panic("not implemented yet")
func (q *fakeQuerier) GetWorkspaceAgentStatsAndLabels(ctx context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsAndLabelsRow, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()

agentStatsCreatedAfter := make([]database.WorkspaceAgentStat, 0)
latestAgentStats := map[uuid.UUID]database.WorkspaceAgentStat{}

for _, agentStat := range q.workspaceAgentStats {
if agentStat.CreatedAt.After(createdAfter) {
agentStatsCreatedAfter = append(agentStatsCreatedAfter, agentStat)
latestAgentStats[agentStat.AgentID] = agentStat
}
}

statByAgent := map[uuid.UUID]database.GetWorkspaceAgentStatsAndLabelsRow{}

// Session and connection metrics
for _, agentStat := range latestAgentStats {
stat := statByAgent[agentStat.AgentID]
stat.SessionCountVSCode += agentStat.SessionCountVSCode
stat.SessionCountJetBrains += agentStat.SessionCountJetBrains
stat.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY
stat.SessionCountSSH += agentStat.SessionCountSSH
stat.ConnectionCount += agentStat.ConnectionCount
if agentStat.ConnectionMedianLatencyMS >= 0 && stat.ConnectionMedianLatencyMS < agentStat.ConnectionMedianLatencyMS {
stat.ConnectionMedianLatencyMS = agentStat.ConnectionMedianLatencyMS
}
statByAgent[agentStat.AgentID] = stat
}

// Tx, Rx metrics
for _, agentStat := range agentStatsCreatedAfter {
stat := statByAgent[agentStat.AgentID]
stat.WorkspaceRxBytes += agentStat.RxBytes
stat.WorkspaceTxBytes += agentStat.TxBytes
statByAgent[agentStat.AgentID] = stat
}

// Labels
for _, agentStat := range agentStatsCreatedAfter {
stat := statByAgent[agentStat.AgentID]

user, err := q.getUserByIDNoLock(agentStat.UserID)
if err != nil {
return nil, err
}

stat.Username = user.Username

workspace, err := q.GetWorkspaceByID(ctx, agentStat.WorkspaceID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be called whilst holding mutex? Wondering since the above call is to a nolock method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is RWMutex, and we're using here only RLocks, so I guess that is's safe?

if err != nil {
return nil, err
}
stat.WorkspaceName = workspace.Name

agent, err := q.GetWorkspaceAgentByID(ctx, agentStat.AgentID)
if err != nil {
return nil, err
}
stat.AgentName = agent.Name

statByAgent[agentStat.AgentID] = stat
}

stats := make([]database.GetWorkspaceAgentStatsAndLabelsRow, 0, len(statByAgent))
for _, agent := range statByAgent {
stats = append(stats, agent)
}
return stats, nil
}

func (q *fakeQuerier) GetWorkspacesEligibleForAutoStartStop(ctx context.Context, now time.Time) ([]database.Workspace, error) {
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion coderd/database/queries/workspaceagentstats.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ WITH agent_stats AS (
coalesce(SUM(session_count_jetbrains), 0)::bigint AS session_count_jetbrains,
coalesce(SUM(session_count_reconnecting_pty), 0)::bigint AS session_count_reconnecting_pty,
coalesce(SUM(connection_count), 0)::bigint AS connection_count,
coalesce(SUM(connection_median_latency_ms), 0)::float AS connection_median_latency_ms
coalesce(MAX(connection_median_latency_ms), 0)::float AS connection_median_latency_ms
FROM (
SELECT *, ROW_NUMBER() OVER(PARTITION BY agent_id ORDER BY created_at DESC) AS rn
FROM workspace_agent_stats
Expand Down
16 changes: 11 additions & 5 deletions coderd/prometheusmetrics/prometheusmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
return cancelFunc, nil
}

func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, duration time.Duration) (context.CancelFunc, error) {
func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, initialCreateAfter time.Time, duration time.Duration) (context.CancelFunc, error) {
if duration == 0 {
duration = 1 * time.Minute
}
Expand Down Expand Up @@ -344,8 +344,8 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
agentStatsConnectionMedianLatencyGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agentstats",
Name: "connection_median_latency",
Help: "The median agent connection latency",
Name: "connection_median_latency_seconds",
Help: "The median agent connection latency in seconds",
}, []string{"agent_name", "username", "workspace_name"}))
err = registerer.Register(agentStatsConnectionMedianLatencyGauge)
if err != nil {
Expand Down Expand Up @@ -396,7 +396,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
return nil, err
}

createdAfter := time.Now()
createdAfter := initialCreateAfter
ctx, cancelFunc := context.WithCancel(ctx)
ticker := time.NewTicker(duration)
go func() {
Expand All @@ -411,12 +411,18 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
logger.Debug(ctx, "Agent metrics collection is starting")
timer := prometheus.NewTimer(metricsCollectorAgentStats)

checkpoint := time.Now()
stats, err := db.GetWorkspaceAgentStatsAndLabels(ctx, createdAfter)

if err != nil {
logger.Error(ctx, "can't get agent stats", slog.Error(err))
goto done
}

if len(stats) == 0 {
goto done
}

for _, agentStat := range stats {
agentStatsRxBytesGauge.WithLabelValues(VectorOperationAdd, float64(agentStat.WorkspaceTxBytes), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
agentStatsTxBytesGauge.WithLabelValues(VectorOperationAdd, float64(agentStat.WorkspaceRxBytes), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
Expand Down Expand Up @@ -445,7 +451,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R
logger.Debug(ctx, "Agent metrics collection is done")
metricsCollectorAgentStats.Observe(timer.ObserveDuration().Seconds())

createdAfter = time.Now()
createdAfter = checkpoint
}
}()
return cancelFunc, nil
Expand Down
84 changes: 84 additions & 0 deletions coderd/prometheusmetrics/prometheusmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/coder/coder/coderd/database/dbgen"
"github.com/coder/coder/coderd/prometheusmetrics"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/codersdk/agentsdk"
"github.com/coder/coder/provisioner/echo"
"github.com/coder/coder/provisionersdk/proto"
"github.com/coder/coder/tailnet"
Expand Down Expand Up @@ -352,3 +353,86 @@ func TestAgents(t *testing.T) {
return agentsUp && agentsConnections && agentsApps && agentsExecutionInSeconds
}, testutil.WaitShort, testutil.IntervalFast)
}

func TestAgentStats(t *testing.T) {
t.Parallel()

// Build a sample workspace with test agent and fake agent client
client, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
db := api.Database

user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionPlan: echo.ProvisionComplete,
ProvisionApply: echo.ProvisionApplyWithAgent(authToken),
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)

agentClient := agentsdk.New(client.URL)
agentClient.SetSessionToken(authToken)

registry := prometheus.NewRegistry()

// given
cancel, err := prometheusmetrics.AgentStats(context.Background(), slogtest.Make(t, nil), registry, db, time.Now(), time.Second)
require.NoError(t, err)
t.Cleanup(cancel)

// when
_, err = agentClient.PostStats(context.Background(), &agentsdk.Stats{
ConnectionsByProto: map[string]int64{"TCP": 1},
ConnectionCount: 2,
RxPackets: 3,
RxBytes: 4,
TxPackets: 5,
TxBytes: 6,
SessionCountVSCode: 7,
SessionCountJetBrains: 8,
SessionCountReconnectingPTY: 9,
SessionCountSSH: 10,
ConnectionMedianLatencyMS: 10000,
})
require.NoError(t, err)

// then
require.NoError(t, err)

collectedMetrics := map[string]struct{}{}
require.Eventually(t, func() bool {
metrics, err := registry.Gather()
assert.NoError(t, err)

if len(metrics) < 1 {
return false
}

for _, metric := range metrics {
switch metric.GetName() {
case "coderd_prometheusmetrics_agentstats_execution_seconds":
collectedMetrics[metric.GetName()] = struct{}{}
case "coderd_agentstats_connection_count",
"coderd_agentstats_connection_median_latency_seconds",
"coderd_agentstats_rx_bytes",
"coderd_agentstats_tx_bytes",
"coderd_agentstats_session_count_jetbrains",
"coderd_agentstats_session_count_reconnecting_pty",
"coderd_agentstats_session_count_ssh",
"coderd_agentstats_session_count_vscode":
collectedMetrics[metric.GetName()] = struct{}{}
assert.Equal(t, "example", metric.Metric[0].Label[0].GetValue()) // Agent name
assert.Equal(t, "testuser", metric.Metric[0].Label[1].GetValue()) // Username
assert.Equal(t, workspace.Name, metric.Metric[0].Label[2].GetValue()) // Workspace name
assert.NotZero(t, int(metric.Metric[0].Gauge.GetValue()), metric.GetName()) // Metric value
default:
require.FailNowf(t, "unexpected metric collected", "metric: %s", metric.GetName())
}
}

return len(collectedMetrics) == 9
}, testutil.WaitShort, testutil.IntervalFast, "collected metrics: %v", collectedMetrics)
}
Loading