Skip to content

Commit fcbffe0

Browse files
committed
Initial implementation
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent d2a5b31 commit fcbffe0

File tree

6 files changed

+470
-65
lines changed

6 files changed

+470
-65
lines changed

cli/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func enablePrometheus(
235235
}
236236
afterCtx(ctx, closeAgentStatsFunc)
237237

238-
metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(logger, options.PrometheusRegistry, 0)
238+
metricsAggregator, err := prometheusmetrics.NewMetricsAggregator(logger, options.PrometheusRegistry, 0, options.DeploymentValues.Prometheus.AggregateAgentStatsBy.Value())
239239
if err != nil {
240240
return nil, xerrors.Errorf("can't initialize metrics aggregator: %w", err)
241241
}

coderd/agentmetrics/labels.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package agentmetrics
2+
3+
const (
4+
TemplateNameLabel = "template_name"
5+
AgentNameLabel = "agent_name"
6+
UsernameLabel = "username"
7+
WorkspaceNameLabel = "workspace_name"
8+
)

coderd/prometheusmetrics/aggregator.go

+145-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/coder/coder/v2/coderd/agentmetrics"
1011
"github.com/prometheus/client_golang/prometheus"
1112
"golang.org/x/xerrors"
1213

@@ -43,9 +44,10 @@ type MetricsAggregator struct {
4344
collectCh chan (chan []prometheus.Metric)
4445
updateCh chan updateRequest
4546

46-
storeSizeGauge prometheus.Gauge
47-
updateHistogram prometheus.Histogram
48-
cleanupHistogram prometheus.Histogram
47+
storeSizeGauge prometheus.Gauge
48+
updateHistogram prometheus.Histogram
49+
cleanupHistogram prometheus.Histogram
50+
aggregateByLabels []string
4951
}
5052

5153
type updateRequest struct {
@@ -68,6 +70,8 @@ type annotatedMetric struct {
6870
templateName string
6971

7072
expiryDate time.Time
73+
74+
aggregateByLabels []string
7175
}
7276

7377
type metricKey struct {
@@ -102,26 +106,80 @@ func hashKey(req *updateRequest, m *agentproto.Stats_Metric) metricKey {
102106
var _ prometheus.Collector = new(MetricsAggregator)
103107

104108
func (am *annotatedMetric) asPrometheus() (prometheus.Metric, error) {
105-
labels := make([]string, 0, len(agentMetricsLabels)+len(am.Labels))
106-
labelValues := make([]string, 0, len(agentMetricsLabels)+len(am.Labels))
109+
var (
110+
baseLabelNames []string = am.aggregateByLabels
111+
baseLabelValues []string
112+
)
113+
114+
for _, label := range am.aggregateByLabels {
115+
val, err := am.getFieldByLabel(label)
116+
if err != nil {
117+
return nil, err
118+
}
119+
120+
baseLabelValues = append(baseLabelValues, val)
121+
}
122+
123+
labels := make([]string, 0, len(baseLabelNames)+len(am.Labels))
124+
labelValues := make([]string, 0, len(baseLabelNames)+len(am.Labels))
107125

108-
labels = append(labels, agentMetricsLabels...)
109-
labelValues = append(labelValues, am.username, am.workspaceName, am.agentName, am.templateName)
126+
labels = append(labels, baseLabelNames...)
127+
labelValues = append(labelValues, baseLabelValues...)
110128

111129
for _, l := range am.Labels {
112130
labels = append(labels, l.Name)
113131
labelValues = append(labelValues, l.Value)
114132
}
115133

134+
//fmt.Printf(">>>>[%s] [%s] %s [%q] [%q]: %v\n", time.Now().Format(time.RFC3339Nano), am.Type, am.Name, labels, labelValues, am.Value)
135+
116136
desc := prometheus.NewDesc(am.Name, metricHelpForAgent, labels, nil)
117137
valueType, err := asPrometheusValueType(am.Type)
118138
if err != nil {
119139
return nil, err
120140
}
141+
121142
return prometheus.MustNewConstMetric(desc, valueType, am.Value, labelValues...), nil
122143
}
123144

124-
func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, duration time.Duration) (*MetricsAggregator, error) {
145+
// getFieldByLabel returns the related field value for a given label
146+
func (am *annotatedMetric) getFieldByLabel(label string) (string, error) {
147+
var labelVal string
148+
switch label {
149+
case agentmetrics.WorkspaceNameLabel:
150+
labelVal = am.workspaceName
151+
case agentmetrics.TemplateNameLabel:
152+
labelVal = am.templateName
153+
case agentmetrics.AgentNameLabel:
154+
labelVal = am.agentName
155+
case agentmetrics.UsernameLabel:
156+
labelVal = am.username
157+
default:
158+
return "", xerrors.Errorf("unexpected label: %q", label)
159+
}
160+
161+
return labelVal, nil
162+
}
163+
164+
func (am *annotatedMetric) clone() annotatedMetric {
165+
stats := &agentproto.Stats_Metric{
166+
Name: am.Name,
167+
Type: am.Type,
168+
Value: am.Value,
169+
Labels: am.Labels,
170+
}
171+
172+
return annotatedMetric{
173+
Stats_Metric: stats,
174+
username: am.username,
175+
workspaceName: am.workspaceName,
176+
agentName: am.agentName,
177+
templateName: am.templateName,
178+
expiryDate: am.expiryDate,
179+
}
180+
}
181+
182+
func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer, duration time.Duration, aggregateByLabels []string) (*MetricsAggregator, error) {
125183
metricsCleanupInterval := defaultMetricsCleanupInterval
126184
if duration > 0 {
127185
metricsCleanupInterval = duration
@@ -174,9 +232,61 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
174232
storeSizeGauge: storeSizeGauge,
175233
updateHistogram: updateHistogram,
176234
cleanupHistogram: cleanupHistogram,
235+
236+
aggregateByLabels: aggregateByLabels,
177237
}, nil
178238
}
179239

240+
type MetricAggregator struct {
241+
aggregations map[string]float64
242+
metrics map[string]annotatedMetric
243+
}
244+
245+
func NewMetricAggregator(size int) *MetricAggregator {
246+
return &MetricAggregator{
247+
aggregations: make(map[string]float64, size),
248+
metrics: make(map[string]annotatedMetric, size),
249+
}
250+
}
251+
252+
func (a *MetricAggregator) Aggregate(am annotatedMetric, labels []string) error {
253+
// if we already have an entry for this key, don't clone this am afresh - rather use the existing one
254+
// this will be a bit more memory efficient
255+
// ...do this after unit-test is written
256+
257+
clone := am.clone()
258+
259+
fields := make(map[string]string, len(labels))
260+
labelValues := make([]string, 0, len(labels))
261+
262+
for _, label := range labels {
263+
val, err := clone.getFieldByLabel(label)
264+
if err != nil {
265+
return err
266+
}
267+
268+
fields[label] = val
269+
labelValues = append(labelValues, val)
270+
}
271+
272+
key := fmt.Sprintf("%s:%v", clone.Stats_Metric.Name, fields)
273+
274+
clone.aggregateByLabels = labels
275+
a.aggregations[key] += clone.Value
276+
277+
clone.Value = a.aggregations[key]
278+
a.metrics[key] = clone
279+
280+
return nil
281+
}
282+
283+
func (a *MetricAggregator) asMetrics() (out []annotatedMetric) {
284+
for _, am := range a.metrics {
285+
out = append(out, am)
286+
}
287+
return
288+
}
289+
180290
func (ma *MetricsAggregator) Run(ctx context.Context) func() {
181291
ctx, cancelFunc := context.WithCancel(ctx)
182292
done := make(chan struct{})
@@ -216,15 +326,40 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
216326
case outputCh := <-ma.collectCh:
217327
ma.log.Debug(ctx, "collect metrics")
218328

329+
var input []annotatedMetric
219330
output := make([]prometheus.Metric, 0, len(ma.store))
220-
for _, m := range ma.store {
331+
332+
// If custom aggregation labels have not been chosen, generate Prometheus metrics without any pre-aggregation.
333+
// This results in higher cardinality, but may be desirable in larger deployments.
334+
if len(ma.aggregateByLabels) == 0 {
335+
for _, m := range ma.store {
336+
// Aggregate by high cardinality labels.
337+
m.aggregateByLabels = agentMetricsLabels
338+
input = append(input, m)
339+
}
340+
} else {
341+
// However, if custom aggregations have been chosen, we need to aggregate the values from the annotated
342+
// metrics because we cannot register multiple metric series with the same labels.
343+
aggregator := NewMetricAggregator(len(ma.store) * len(ma.aggregateByLabels))
344+
345+
for _, m := range ma.store {
346+
if err := aggregator.Aggregate(m, ma.aggregateByLabels); err != nil {
347+
ma.log.Error(ctx, "can't aggregate labels", slog.F("labels", strings.Join(ma.aggregateByLabels, ",")), slog.Error(err))
348+
}
349+
}
350+
351+
input = aggregator.asMetrics()
352+
}
353+
354+
for _, m := range input {
221355
promMetric, err := m.asPrometheus()
222356
if err != nil {
223357
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err))
224358
continue
225359
}
226360
output = append(output, promMetric)
227361
}
362+
228363
outputCh <- output
229364
close(outputCh)
230365
case <-cleanupTicker.C:
@@ -260,7 +395,7 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
260395
func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) {
261396
}
262397

263-
var agentMetricsLabels = []string{usernameLabel, workspaceNameLabel, agentNameLabel, templateNameLabel}
398+
var agentMetricsLabels = []string{agentmetrics.UsernameLabel, agentmetrics.WorkspaceNameLabel, agentmetrics.AgentNameLabel, agentmetrics.TemplateNameLabel}
264399

265400
// AgentMetricLabels are the labels used to decorate an agent's metrics.
266401
// This list should match the list of labels in agentMetricsLabels.

0 commit comments

Comments
 (0)