diff --git a/agent/agent.go b/agent/agent.go index 9de9c49b423c5..f64a261f40b2d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -16,7 +16,6 @@ import ( "os" "os/user" "path/filepath" - "reflect" "sort" "strconv" "strings" @@ -1223,11 +1222,11 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) { // Convert from microseconds to milliseconds. stats.ConnectionMedianLatencyMS /= 1000 - lastStat := a.latestStat.Load() - if lastStat != nil && reflect.DeepEqual(lastStat, stats) { - a.logger.Info(ctx, "skipping stat because nothing changed") - return - } + // Collect agent metrics. + // Agent metrics are changing all the time, so there is no need to perform + // reflect.DeepEqual to see if stats should be transferred. + stats.Metrics = collectMetrics() + a.latestStat.Store(stats) select { diff --git a/agent/metrics.go b/agent/metrics.go new file mode 100644 index 0000000000000..fd195202c0086 --- /dev/null +++ b/agent/metrics.go @@ -0,0 +1,52 @@ +package agent + +import ( + "fmt" + "strings" + + "tailscale.com/util/clientmetric" + + "github.com/coder/coder/codersdk/agentsdk" +) + +func collectMetrics() []agentsdk.AgentMetric { + // Tailscale metrics + metrics := clientmetric.Metrics() + collected := make([]agentsdk.AgentMetric, 0, len(metrics)) + for _, m := range metrics { + if isIgnoredMetric(m.Name()) { + continue + } + + collected = append(collected, agentsdk.AgentMetric{ + Name: m.Name(), + Type: asMetricType(m.Type()), + Value: float64(m.Value()), + }) + } + return collected +} + +// isIgnoredMetric checks if the metric should be ignored, as Coder agent doesn't use related features. +// Expected metric families: magicsock_*, derp_*, tstun_*, netcheck_*, portmap_*, etc. +func isIgnoredMetric(metricName string) bool { + if strings.HasPrefix(metricName, "dns_") || + strings.HasPrefix(metricName, "controlclient_") || + strings.HasPrefix(metricName, "peerapi_") || + strings.HasPrefix(metricName, "profiles_") || + strings.HasPrefix(metricName, "tstun_") { + return true + } + return false +} + +func asMetricType(typ clientmetric.Type) agentsdk.AgentMetricType { + switch typ { + case clientmetric.TypeGauge: + return agentsdk.AgentMetricTypeGauge + case clientmetric.TypeCounter: + return agentsdk.AgentMetricTypeCounter + default: + panic(fmt.Sprintf("unknown metric type: %d", typ)) + } +} diff --git a/cli/server.go b/cli/server.go index 81611ca45e2a4..039eeecef8d0a 100644 --- a/cli/server.go +++ b/cli/server.go @@ -723,6 +723,20 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. return xerrors.Errorf("register agent stats prometheus metric: %w", err) } defer closeAgentStatsFunc() + + metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(logger, options.PrometheusRegistry, 0) + if err != nil { + return xerrors.Errorf("can't initialize metrics aggregator: %w", err) + } + + cancelMetricsAggregator := metricsAggregator.Run(ctx) + defer cancelMetricsAggregator() + + options.UpdateAgentMetrics = metricsAggregator.Update + err = options.PrometheusRegistry.Register(metricsAggregator) + if err != nil { + return xerrors.Errorf("can't register metrics aggregator as collector: %w", err) + } } //nolint:revive diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index cceff55a217a0..91bae7945e422 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -5655,6 +5655,44 @@ const docTemplate = `{ } } }, + "agentsdk.AgentMetric": { + "type": "object", + "required": [ + "name", + "type", + "value" + ], + "properties": { + "name": { + "type": "string" + }, + "type": { + "enum": [ + "counter", + "gauge" + ], + "allOf": [ + { + "$ref": "#/definitions/agentsdk.AgentMetricType" + } + ] + }, + "value": { + "type": "number" + } + } + }, + "agentsdk.AgentMetricType": { + "type": "string", + "enum": [ + "counter", + "gauge" + ], + "x-enum-varnames": [ + "AgentMetricTypeCounter", + "AgentMetricTypeGauge" + ] + }, "agentsdk.AuthenticateResponse": { "type": "object", "properties": { @@ -5858,6 +5896,13 @@ const docTemplate = `{ "type": "integer" } }, + "metrics": { + "description": "Metrics collected by the agent", + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.AgentMetric" + } + }, "rx_bytes": { "description": "RxBytes is the number of received bytes.", "type": "integer" diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 8d3e6467383e9..7e279b3643e56 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -4979,6 +4979,31 @@ } } }, + "agentsdk.AgentMetric": { + "type": "object", + "required": ["name", "type", "value"], + "properties": { + "name": { + "type": "string" + }, + "type": { + "enum": ["counter", "gauge"], + "allOf": [ + { + "$ref": "#/definitions/agentsdk.AgentMetricType" + } + ] + }, + "value": { + "type": "number" + } + } + }, + "agentsdk.AgentMetricType": { + "type": "string", + "enum": ["counter", "gauge"], + "x-enum-varnames": ["AgentMetricTypeCounter", "AgentMetricTypeGauge"] + }, "agentsdk.AuthenticateResponse": { "type": "object", "properties": { @@ -5177,6 +5202,13 @@ "type": "integer" } }, + "metrics": { + "description": "Metrics collected by the agent", + "type": "array", + "items": { + "$ref": "#/definitions/agentsdk.AgentMetric" + } + }, "rx_bytes": { "description": "RxBytes is the number of received bytes.", "type": "integer" diff --git a/coderd/coderd.go b/coderd/coderd.go index 4013c0cc77e8b..62fa2f5da3e35 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -38,6 +38,8 @@ import ( "cdr.dev/slog" "github.com/coder/coder/buildinfo" + "github.com/coder/coder/codersdk/agentsdk" + // Used for swagger docs. _ "github.com/coder/coder/coderd/apidoc" "github.com/coder/coder/coderd/audit" @@ -146,6 +148,8 @@ type Options struct { SSHConfig codersdk.SSHConfigResponse HTTPClient *http.Client + + UpdateAgentMetrics func(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) } // @title Coder API diff --git a/coderd/prometheusmetrics/aggregator.go b/coderd/prometheusmetrics/aggregator.go new file mode 100644 index 0000000000000..ba3d520468690 --- /dev/null +++ b/coderd/prometheusmetrics/aggregator.go @@ -0,0 +1,250 @@ +package prometheusmetrics + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/xerrors" + + "cdr.dev/slog" + + "github.com/coder/coder/codersdk/agentsdk" +) + +const ( + // MetricHelpForAgent is a help string that replaces all agent metric help + // messages. This is because a registry cannot have conflicting + // help messages for the same metric in a "gather". If our coder agents are + // on different versions, this is a possible scenario. + metricHelpForAgent = "Metrics are forwarded from workspace agents connected to this instance of coderd." +) + +const ( + sizeCollectCh = 10 + sizeUpdateCh = 1024 + + defaultMetricsCleanupInterval = 2 * time.Minute +) + +type MetricsAggregator struct { + queue []annotatedMetric + + log slog.Logger + metricsCleanupInterval time.Duration + + collectCh chan (chan []prometheus.Metric) + updateCh chan updateRequest + + updateHistogram prometheus.Histogram + cleanupHistogram prometheus.Histogram +} + +type updateRequest struct { + username string + workspaceName string + agentName string + + metrics []agentsdk.AgentMetric + + timestamp time.Time +} + +type annotatedMetric struct { + agentsdk.AgentMetric + + username string + workspaceName string + agentName string + + expiryDate time.Time +} + +var _ prometheus.Collector = new(MetricsAggregator) + +func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, duration time.Duration) (*MetricsAggregator, error) { + metricsCleanupInterval := defaultMetricsCleanupInterval + if duration > 0 { + metricsCleanupInterval = duration + } + + updateHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "coderd", + Subsystem: "prometheusmetrics", + Name: "metrics_aggregator_execution_update_seconds", + Help: "Histogram for duration of metrics aggregator update in seconds.", + Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30}, + }) + err := registerer.Register(updateHistogram) + if err != nil { + return nil, err + } + + cleanupHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "coderd", + Subsystem: "prometheusmetrics", + Name: "metrics_aggregator_execution_cleanup_seconds", + Help: "Histogram for duration of metrics aggregator cleanup in seconds.", + Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30}, + }) + err = registerer.Register(cleanupHistogram) + if err != nil { + return nil, err + } + + return &MetricsAggregator{ + log: logger, + metricsCleanupInterval: metricsCleanupInterval, + + collectCh: make(chan (chan []prometheus.Metric), sizeCollectCh), + updateCh: make(chan updateRequest, sizeUpdateCh), + + updateHistogram: updateHistogram, + cleanupHistogram: cleanupHistogram, + }, nil +} + +func (ma *MetricsAggregator) Run(ctx context.Context) func() { + ctx, cancelFunc := context.WithCancel(ctx) + done := make(chan struct{}) + + cleanupTicker := time.NewTicker(ma.metricsCleanupInterval) + go func() { + defer close(done) + defer cleanupTicker.Stop() + + for { + select { + case req := <-ma.updateCh: + ma.log.Debug(ctx, "metrics aggregator: update metrics") + + timer := prometheus.NewTimer(ma.updateHistogram) + UpdateLoop: + for _, m := range req.metrics { + for i, q := range ma.queue { + if q.username == req.username && q.workspaceName == req.workspaceName && q.agentName == req.agentName && q.Name == m.Name { + ma.queue[i].AgentMetric.Value = m.Value + ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval) + continue UpdateLoop + } + } + + ma.queue = append(ma.queue, annotatedMetric{ + username: req.username, + workspaceName: req.workspaceName, + agentName: req.agentName, + + AgentMetric: m, + + expiryDate: req.timestamp.Add(ma.metricsCleanupInterval), + }) + } + + timer.ObserveDuration() + case outputCh := <-ma.collectCh: + ma.log.Debug(ctx, "metrics aggregator: collect metrics") + + output := make([]prometheus.Metric, 0, len(ma.queue)) + for _, m := range ma.queue { + desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil) + valueType, err := asPrometheusValueType(m.Type) + if err != nil { + ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err)) + continue + } + constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName) + output = append(output, constMetric) + } + outputCh <- output + close(outputCh) + case <-cleanupTicker.C: + ma.log.Debug(ctx, "metrics aggregator: clean expired metrics") + + timer := prometheus.NewTimer(ma.cleanupHistogram) + + now := time.Now() + + var hasExpiredMetrics bool + for _, m := range ma.queue { + if now.After(m.expiryDate) { + hasExpiredMetrics = true + break + } + } + + if hasExpiredMetrics { + fresh := make([]annotatedMetric, 0, len(ma.queue)) + for _, m := range ma.queue { + if m.expiryDate.After(now) { + fresh = append(fresh, m) + } + } + ma.queue = fresh + } + + timer.ObserveDuration() + cleanupTicker.Reset(ma.metricsCleanupInterval) + + case <-ctx.Done(): + ma.log.Debug(ctx, "metrics aggregator: is stopped") + return + } + } + }() + return func() { + cancelFunc() + <-done + } +} + +// Describe function does not have any knowledge about the metrics schema, +// so it does not emit anything. +func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) { +} + +var agentMetricsLabels = []string{usernameLabel, workspaceNameLabel, agentNameLabel} + +func (ma *MetricsAggregator) Collect(ch chan<- prometheus.Metric) { + output := make(chan []prometheus.Metric, 1) + + select { + case ma.collectCh <- output: + default: + ma.log.Error(context.Background(), "metrics aggregator: collect queue is full") + return + } + + for s := range output { + for _, m := range s { + ch <- m + } + } +} + +func (ma *MetricsAggregator) Update(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) { + select { + case ma.updateCh <- updateRequest{ + username: username, + workspaceName: workspaceName, + agentName: agentName, + metrics: metrics, + + timestamp: time.Now(), + }: + case <-ctx.Done(): + ma.log.Debug(ctx, "metrics aggregator: update request is canceled") + default: + ma.log.Error(ctx, "metrics aggregator: update queue is full") + } +} + +func asPrometheusValueType(metricType agentsdk.AgentMetricType) (prometheus.ValueType, error) { + switch metricType { + case agentsdk.AgentMetricTypeGauge: + return prometheus.GaugeValue, nil + case agentsdk.AgentMetricTypeCounter: + return prometheus.CounterValue, nil + default: + return -1, xerrors.Errorf("unsupported value type: %s", metricType) + } +} diff --git a/coderd/prometheusmetrics/aggregator_test.go b/coderd/prometheusmetrics/aggregator_test.go new file mode 100644 index 0000000000000..68b5f94e464ee --- /dev/null +++ b/coderd/prometheusmetrics/aggregator_test.go @@ -0,0 +1,154 @@ +package prometheusmetrics_test + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/coderd/prometheusmetrics" + "github.com/coder/coder/codersdk/agentsdk" + "github.com/coder/coder/testutil" +) + +const ( + testWorkspaceName = "yogi-workspace" + testUsername = "yogi-bear" + testAgentName = "main-agent" +) + +func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) { + t.Parallel() + + // given + registry := prometheus.NewRegistry() + metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}), registry, time.Hour) // time.Hour, so metrics won't expire + require.NoError(t, err) + + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + closeFunc := metricsAggregator.Run(ctx) + t.Cleanup(closeFunc) + + given1 := []agentsdk.AgentMetric{ + {Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1}, + {Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 2}, + {Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 3}, + } + + given2 := []agentsdk.AgentMetric{ + {Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 4}, + {Name: "d_gauge_four", Type: agentsdk.AgentMetricTypeGauge, Value: 6}, + } + + expected := []agentsdk.AgentMetric{ + {Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1}, + {Name: "b_counter_two", Type: agentsdk.AgentMetricTypeCounter, Value: 4}, + {Name: "c_gauge_three", Type: agentsdk.AgentMetricTypeGauge, Value: 3}, + {Name: "d_gauge_four", Type: agentsdk.AgentMetricTypeGauge, Value: 6}, + } + + // when + metricsAggregator.Update(ctx, testUsername, testWorkspaceName, testAgentName, given1) + metricsAggregator.Update(ctx, testUsername, testWorkspaceName, testAgentName, given2) + + // then + require.Eventually(t, func() bool { + var actual []prometheus.Metric + metricsCh := make(chan prometheus.Metric) + + done := make(chan struct{}, 1) + defer close(done) + go func() { + for m := range metricsCh { + actual = append(actual, m) + } + done <- struct{}{} + }() + metricsAggregator.Collect(metricsCh) + close(metricsCh) + <-done + return verifyCollectedMetrics(t, expected, actual) + }, testutil.WaitMedium, testutil.IntervalSlow) +} + +func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actual []prometheus.Metric) bool { + if len(expected) != len(actual) { + return false + } + + // Metrics are expected to arrive in order + for i, e := range expected { + desc := actual[i].Desc() + assert.Contains(t, desc.String(), e.Name) + + var d dto.Metric + err := actual[i].Write(&d) + require.NoError(t, err) + + require.Equal(t, "agent_name", *d.Label[0].Name) + require.Equal(t, testAgentName, *d.Label[0].Value) + require.Equal(t, "username", *d.Label[1].Name) + require.Equal(t, testUsername, *d.Label[1].Value) + require.Equal(t, "workspace_name", *d.Label[2].Name) + require.Equal(t, testWorkspaceName, *d.Label[2].Value) + + if e.Type == agentsdk.AgentMetricTypeCounter { + require.Equal(t, e.Value, *d.Counter.Value) + } else if e.Type == agentsdk.AgentMetricTypeGauge { + require.Equal(t, e.Value, *d.Gauge.Value) + } else { + require.Failf(t, "unsupported type: %s", string(e.Type)) + } + } + return true +} + +func TestUpdateMetrics_MetricsExpire(t *testing.T) { + t.Parallel() + + // given + registry := prometheus.NewRegistry() + metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}), registry, time.Millisecond) + require.NoError(t, err) + + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + closeFunc := metricsAggregator.Run(ctx) + t.Cleanup(closeFunc) + + given := []agentsdk.AgentMetric{ + {Name: "a_counter_one", Type: agentsdk.AgentMetricTypeCounter, Value: 1}, + } + + // when + metricsAggregator.Update(ctx, testUsername, testWorkspaceName, testAgentName, given) + + time.Sleep(time.Millisecond * 10) // Ensure that metric is expired + + // then + require.Eventually(t, func() bool { + var actual []prometheus.Metric + metricsCh := make(chan prometheus.Metric) + + done := make(chan struct{}, 1) + defer close(done) + go func() { + for m := range metricsCh { + actual = append(actual, m) + } + done <- struct{}{} + }() + metricsAggregator.Collect(metricsCh) + close(metricsCh) + <-done + return len(actual) == 0 + }, testutil.WaitShort, testutil.IntervalFast) +} diff --git a/coderd/prometheusmetrics/prometheusmetrics.go b/coderd/prometheusmetrics/prometheusmetrics.go index cfc64122cd3d5..6a616bcc05438 100644 --- a/coderd/prometheusmetrics/prometheusmetrics.go +++ b/coderd/prometheusmetrics/prometheusmetrics.go @@ -22,6 +22,12 @@ import ( "github.com/coder/coder/tailnet" ) +const ( + agentNameLabel = "agent_name" + usernameLabel = "username" + workspaceNameLabel = "workspace_name" +) + // ActiveUsers tracks the number of users that have authenticated within the past hour. func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (func(), error) { if duration == 0 { @@ -140,7 +146,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis Subsystem: "agents", Name: "up", Help: "The number of active agents per workspace.", - }, []string{"username", "workspace_name"})) + }, []string{usernameLabel, workspaceNameLabel})) err := registerer.Register(agentsGauge) if err != nil { return nil, err @@ -151,7 +157,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis Subsystem: "agents", Name: "connections", Help: "Agent connections with statuses.", - }, []string{"agent_name", "username", "workspace_name", "status", "lifecycle_state", "tailnet_node"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel, "status", "lifecycle_state", "tailnet_node"})) err = registerer.Register(agentsConnectionsGauge) if err != nil { return nil, err @@ -162,7 +168,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis Subsystem: "agents", Name: "connection_latencies_seconds", Help: "Agent connection latencies in seconds.", - }, []string{"agent_name", "username", "workspace_name", "derp_region", "preferred"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel, "derp_region", "preferred"})) err = registerer.Register(agentsConnectionLatenciesGauge) if err != nil { return nil, err @@ -173,7 +179,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis Subsystem: "agents", Name: "apps", Help: "Agent applications with statuses.", - }, []string{"agent_name", "username", "workspace_name", "app_name", "health"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel, "app_name", "health"})) err = registerer.Register(agentsAppsGauge) if err != nil { return nil, err @@ -333,7 +339,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "tx_bytes", Help: "Agent Tx bytes", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsTxBytesGauge) if err != nil { return nil, err @@ -344,7 +350,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "rx_bytes", Help: "Agent Rx bytes", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsRxBytesGauge) if err != nil { return nil, err @@ -355,7 +361,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "connection_count", Help: "The number of established connections by agent", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsConnectionCountGauge) if err != nil { return nil, err @@ -366,7 +372,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "connection_median_latency_seconds", Help: "The median agent connection latency in seconds", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsConnectionMedianLatencyGauge) if err != nil { return nil, err @@ -377,7 +383,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "session_count_jetbrains", Help: "The number of session established by JetBrains", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsSessionCountJetBrainsGauge) if err != nil { return nil, err @@ -388,7 +394,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "session_count_reconnecting_pty", Help: "The number of session established by reconnecting PTY", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsSessionCountReconnectingPTYGauge) if err != nil { return nil, err @@ -399,7 +405,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "session_count_ssh", Help: "The number of session established by SSH", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsSessionCountSSHGauge) if err != nil { return nil, err @@ -410,7 +416,7 @@ func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.R Subsystem: "agentstats", Name: "session_count_vscode", Help: "The number of session established by VSCode", - }, []string{"agent_name", "username", "workspace_name"})) + }, []string{agentNameLabel, usernameLabel, workspaceNameLabel})) err = registerer.Register(agentStatsSessionCountVSCodeGauge) if err != nil { return nil, err diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index c295b605c9725..1b58c9f2c3c0c 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -24,6 +24,7 @@ import ( "github.com/google/uuid" "golang.org/x/exp/slices" "golang.org/x/mod/semver" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "nhooyr.io/websocket" "tailscale.com/tailcfg" @@ -258,19 +259,19 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R output := make([]string, 0) level := make([]database.LogLevel, 0) outputLength := 0 - for _, log := range req.Logs { - createdAt = append(createdAt, log.CreatedAt) - output = append(output, log.Output) - outputLength += len(log.Output) - if log.Level == "" { + for _, logEntry := range req.Logs { + createdAt = append(createdAt, logEntry.CreatedAt) + output = append(output, logEntry.Output) + outputLength += len(logEntry.Output) + if logEntry.Level == "" { // Default to "info" to support older agents that didn't have the level field. - log.Level = codersdk.LogLevelInfo + logEntry.Level = codersdk.LogLevelInfo } - parsedLevel := database.LogLevel(log.Level) + parsedLevel := database.LogLevel(logEntry.Level) if !parsedLevel.Valid() { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Invalid log level provided.", - Detail: fmt.Sprintf("invalid log level: %q", log.Level), + Detail: fmt.Sprintf("invalid log level: %q", logEntry.Level), }) return } @@ -1213,39 +1214,58 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques } now := database.Now() - _, err = api.Database.InsertWorkspaceAgentStat(ctx, database.InsertWorkspaceAgentStatParams{ - ID: uuid.New(), - CreatedAt: now, - AgentID: workspaceAgent.ID, - WorkspaceID: workspace.ID, - UserID: workspace.OwnerID, - TemplateID: workspace.TemplateID, - ConnectionsByProto: payload, - ConnectionCount: req.ConnectionCount, - RxPackets: req.RxPackets, - RxBytes: req.RxBytes, - TxPackets: req.TxPackets, - TxBytes: req.TxBytes, - SessionCountVSCode: req.SessionCountVSCode, - SessionCountJetBrains: req.SessionCountJetBrains, - SessionCountReconnectingPTY: req.SessionCountReconnectingPTY, - SessionCountSSH: req.SessionCountSSH, - ConnectionMedianLatencyMS: req.ConnectionMedianLatencyMS, - }) - if err != nil { - httpapi.InternalServerError(rw, err) - return - } - if req.ConnectionCount > 0 { - err = api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ + var errGroup errgroup.Group + errGroup.Go(func() error { + _, err = api.Database.InsertWorkspaceAgentStat(ctx, database.InsertWorkspaceAgentStatParams{ + ID: uuid.New(), + CreatedAt: now, + AgentID: workspaceAgent.ID, + WorkspaceID: workspace.ID, + UserID: workspace.OwnerID, + TemplateID: workspace.TemplateID, + ConnectionsByProto: payload, + ConnectionCount: req.ConnectionCount, + RxPackets: req.RxPackets, + RxBytes: req.RxBytes, + TxPackets: req.TxPackets, + TxBytes: req.TxBytes, + SessionCountVSCode: req.SessionCountVSCode, + SessionCountJetBrains: req.SessionCountJetBrains, + SessionCountReconnectingPTY: req.SessionCountReconnectingPTY, + SessionCountSSH: req.SessionCountSSH, + ConnectionMedianLatencyMS: req.ConnectionMedianLatencyMS, + }) + if err != nil { + return xerrors.Errorf("can't insert workspace agent stat: %w", err) + } + return nil + }) + errGroup.Go(func() error { + err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{ ID: workspace.ID, LastUsedAt: now, }) if err != nil { - httpapi.InternalServerError(rw, err) - return + return xerrors.Errorf("can't update workspace LastUsedAt: %w", err) } + return nil + }) + if api.Options.UpdateAgentMetrics != nil { + errGroup.Go(func() error { + user, err := api.Database.GetUserByID(ctx, workspace.OwnerID) + if err != nil { + return xerrors.Errorf("can't get user: %w", err) + } + + api.Options.UpdateAgentMetrics(ctx, user.Username, workspace.Name, workspaceAgent.Name, req.Metrics) + return nil + }) + } + err = errGroup.Wait() + if err != nil { + httpapi.InternalServerError(rw, err) + return } httpapi.Write(ctx, rw, http.StatusOK, agentsdk.StatsResponse{ @@ -1973,17 +1993,17 @@ func websocketNetConn(ctx context.Context, conn *websocket.Conn, msgType websock func convertWorkspaceAgentStartupLogs(logs []database.WorkspaceAgentStartupLog) []codersdk.WorkspaceAgentStartupLog { sdk := make([]codersdk.WorkspaceAgentStartupLog, 0, len(logs)) - for _, log := range logs { - sdk = append(sdk, convertWorkspaceAgentStartupLog(log)) + for _, logEntry := range logs { + sdk = append(sdk, convertWorkspaceAgentStartupLog(logEntry)) } return sdk } -func convertWorkspaceAgentStartupLog(log database.WorkspaceAgentStartupLog) codersdk.WorkspaceAgentStartupLog { +func convertWorkspaceAgentStartupLog(logEntry database.WorkspaceAgentStartupLog) codersdk.WorkspaceAgentStartupLog { return codersdk.WorkspaceAgentStartupLog{ - ID: log.ID, - CreatedAt: log.CreatedAt, - Output: log.Output, - Level: codersdk.LogLevel(log.Level), + ID: logEntry.ID, + CreatedAt: logEntry.CreatedAt, + Output: logEntry.Output, + Level: codersdk.LogLevel(logEntry.Level), } } diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index df98961f5c488..12d651e3f0412 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -483,6 +483,22 @@ type Stats struct { // SessionCountSSH is the number of connections received by an agent // that are normal, non-tagged SSH sessions. SessionCountSSH int64 `json:"session_count_ssh"` + + // Metrics collected by the agent + Metrics []AgentMetric `json:"metrics"` +} + +type AgentMetricType string + +const ( + AgentMetricTypeCounter AgentMetricType = "counter" + AgentMetricTypeGauge AgentMetricType = "gauge" +) + +type AgentMetric struct { + Name string `json:"name" validate:"required"` + Type AgentMetricType `json:"type" validate:"required" enums:"counter,gauge"` + Value float64 `json:"value" validate:"required"` } type StatsResponse struct { diff --git a/docs/api/schemas.md b/docs/api/schemas.md index 658a3f241f93a..ee8e52e07a4a4 100644 --- a/docs/api/schemas.md +++ b/docs/api/schemas.md @@ -16,6 +16,46 @@ | `document` | string | true | | | | `signature` | string | true | | | +## agentsdk.AgentMetric + +```json +{ + "name": "string", + "type": "counter", + "value": 0 +} +``` + +### Properties + +| Name | Type | Required | Restrictions | Description | +| ------- | ---------------------------------------------------- | -------- | ------------ | ----------- | +| `name` | string | true | | | +| `type` | [agentsdk.AgentMetricType](#agentsdkagentmetrictype) | true | | | +| `value` | number | true | | | + +#### Enumerated Values + +| Property | Value | +| -------- | --------- | +| `type` | `counter` | +| `type` | `gauge` | + +## agentsdk.AgentMetricType + +```json +"counter" +``` + +### Properties + +#### Enumerated Values + +| Value | +| --------- | +| `counter` | +| `gauge` | + ## agentsdk.AuthenticateResponse ```json @@ -326,6 +366,13 @@ "property1": 0, "property2": 0 }, + "metrics": [ + { + "name": "string", + "type": "counter", + "value": 0 + } + ], "rx_bytes": 0, "rx_packets": 0, "session_count_jetbrains": 0, @@ -339,20 +386,21 @@ ### Properties -| Name | Type | Required | Restrictions | Description | -| -------------------------------- | ------- | -------- | ------------ | ----------------------------------------------------------------------------------------------------------------------------- | -| `connection_count` | integer | false | | Connection count is the number of connections received by an agent. | -| `connection_median_latency_ms` | number | false | | Connection median latency ms is the median latency of all connections in milliseconds. | -| `connections_by_proto` | object | false | | Connections by proto is a count of connections by protocol. | -| » `[any property]` | integer | false | | | -| `rx_bytes` | integer | false | | Rx bytes is the number of received bytes. | -| `rx_packets` | integer | false | | Rx packets is the number of received packets. | -| `session_count_jetbrains` | integer | false | | Session count jetbrains is the number of connections received by an agent that are from our JetBrains extension. | -| `session_count_reconnecting_pty` | integer | false | | Session count reconnecting pty is the number of connections received by an agent that are from the reconnecting web terminal. | -| `session_count_ssh` | integer | false | | Session count ssh is the number of connections received by an agent that are normal, non-tagged SSH sessions. | -| `session_count_vscode` | integer | false | | Session count vscode is the number of connections received by an agent that are from our VS Code extension. | -| `tx_bytes` | integer | false | | Tx bytes is the number of transmitted bytes. | -| `tx_packets` | integer | false | | Tx packets is the number of transmitted bytes. | +| Name | Type | Required | Restrictions | Description | +| -------------------------------- | ----------------------------------------------------- | -------- | ------------ | ----------------------------------------------------------------------------------------------------------------------------- | +| `connection_count` | integer | false | | Connection count is the number of connections received by an agent. | +| `connection_median_latency_ms` | number | false | | Connection median latency ms is the median latency of all connections in milliseconds. | +| `connections_by_proto` | object | false | | Connections by proto is a count of connections by protocol. | +| » `[any property]` | integer | false | | | +| `metrics` | array of [agentsdk.AgentMetric](#agentsdkagentmetric) | false | | Metrics collected by the agent | +| `rx_bytes` | integer | false | | Rx bytes is the number of received bytes. | +| `rx_packets` | integer | false | | Rx packets is the number of received packets. | +| `session_count_jetbrains` | integer | false | | Session count jetbrains is the number of connections received by an agent that are from our JetBrains extension. | +| `session_count_reconnecting_pty` | integer | false | | Session count reconnecting pty is the number of connections received by an agent that are from the reconnecting web terminal. | +| `session_count_ssh` | integer | false | | Session count ssh is the number of connections received by an agent that are normal, non-tagged SSH sessions. | +| `session_count_vscode` | integer | false | | Session count vscode is the number of connections received by an agent that are from our VS Code extension. | +| `tx_bytes` | integer | false | | Tx bytes is the number of transmitted bytes. | +| `tx_packets` | integer | false | | Tx packets is the number of transmitted bytes. | ## agentsdk.StatsResponse