7
7
"strings"
8
8
"time"
9
9
10
+ "github.com/coder/coder/v2/coderd/agentmetrics"
10
11
"github.com/prometheus/client_golang/prometheus"
11
12
"golang.org/x/xerrors"
12
13
@@ -43,9 +44,10 @@ type MetricsAggregator struct {
43
44
collectCh chan (chan []prometheus.Metric )
44
45
updateCh chan updateRequest
45
46
46
- storeSizeGauge prometheus.Gauge
47
- updateHistogram prometheus.Histogram
48
- cleanupHistogram prometheus.Histogram
47
+ storeSizeGauge prometheus.Gauge
48
+ updateHistogram prometheus.Histogram
49
+ cleanupHistogram prometheus.Histogram
50
+ aggregateByLabels []string
49
51
}
50
52
51
53
type updateRequest struct {
@@ -68,6 +70,8 @@ type annotatedMetric struct {
68
70
templateName string
69
71
70
72
expiryDate time.Time
73
+
74
+ aggregateByLabels []string
71
75
}
72
76
73
77
type metricKey struct {
@@ -102,26 +106,80 @@ func hashKey(req *updateRequest, m *agentproto.Stats_Metric) metricKey {
102
106
var _ prometheus.Collector = new (MetricsAggregator )
103
107
104
108
func (am * annotatedMetric ) asPrometheus () (prometheus.Metric , error ) {
105
- labels := make ([]string , 0 , len (agentMetricsLabels )+ len (am .Labels ))
106
- labelValues := make ([]string , 0 , len (agentMetricsLabels )+ len (am .Labels ))
109
+ var (
110
+ baseLabelNames []string = am .aggregateByLabels
111
+ baseLabelValues []string
112
+ )
113
+
114
+ for _ , label := range am .aggregateByLabels {
115
+ val , err := am .getFieldByLabel (label )
116
+ if err != nil {
117
+ return nil , err
118
+ }
119
+
120
+ baseLabelValues = append (baseLabelValues , val )
121
+ }
122
+
123
+ labels := make ([]string , 0 , len (baseLabelNames )+ len (am .Labels ))
124
+ labelValues := make ([]string , 0 , len (baseLabelNames )+ len (am .Labels ))
107
125
108
- labels = append (labels , agentMetricsLabels ... )
109
- labelValues = append (labelValues , am . username , am . workspaceName , am . agentName , am . templateName )
126
+ labels = append (labels , baseLabelNames ... )
127
+ labelValues = append (labelValues , baseLabelValues ... )
110
128
111
129
for _ , l := range am .Labels {
112
130
labels = append (labels , l .Name )
113
131
labelValues = append (labelValues , l .Value )
114
132
}
115
133
134
+ //fmt.Printf(">>>>[%s] [%s] %s [%q] [%q]: %v\n", time.Now().Format(time.RFC3339Nano), am.Type, am.Name, labels, labelValues, am.Value)
135
+
116
136
desc := prometheus .NewDesc (am .Name , metricHelpForAgent , labels , nil )
117
137
valueType , err := asPrometheusValueType (am .Type )
118
138
if err != nil {
119
139
return nil , err
120
140
}
141
+
121
142
return prometheus .MustNewConstMetric (desc , valueType , am .Value , labelValues ... ), nil
122
143
}
123
144
124
- func NewMetricsAggregator (logger slog.Logger , registerer prometheus.Registerer , duration time.Duration ) (* MetricsAggregator , error ) {
145
+ // getFieldByLabel returns the related field value for a given label
146
+ func (am * annotatedMetric ) getFieldByLabel (label string ) (string , error ) {
147
+ var labelVal string
148
+ switch label {
149
+ case agentmetrics .WorkspaceNameLabel :
150
+ labelVal = am .workspaceName
151
+ case agentmetrics .TemplateNameLabel :
152
+ labelVal = am .templateName
153
+ case agentmetrics .AgentNameLabel :
154
+ labelVal = am .agentName
155
+ case agentmetrics .UsernameLabel :
156
+ labelVal = am .username
157
+ default :
158
+ return "" , xerrors .Errorf ("unexpected label: %q" , label )
159
+ }
160
+
161
+ return labelVal , nil
162
+ }
163
+
164
+ func (am * annotatedMetric ) clone () annotatedMetric {
165
+ stats := & agentproto.Stats_Metric {
166
+ Name : am .Name ,
167
+ Type : am .Type ,
168
+ Value : am .Value ,
169
+ Labels : am .Labels ,
170
+ }
171
+
172
+ return annotatedMetric {
173
+ Stats_Metric : stats ,
174
+ username : am .username ,
175
+ workspaceName : am .workspaceName ,
176
+ agentName : am .agentName ,
177
+ templateName : am .templateName ,
178
+ expiryDate : am .expiryDate ,
179
+ }
180
+ }
181
+
182
+ func NewMetricsAggregator (logger slog.Logger , registerer prometheus.Registerer , duration time.Duration , aggregateByLabels []string ) (* MetricsAggregator , error ) {
125
183
metricsCleanupInterval := defaultMetricsCleanupInterval
126
184
if duration > 0 {
127
185
metricsCleanupInterval = duration
@@ -174,9 +232,61 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
174
232
storeSizeGauge : storeSizeGauge ,
175
233
updateHistogram : updateHistogram ,
176
234
cleanupHistogram : cleanupHistogram ,
235
+
236
+ aggregateByLabels : aggregateByLabels ,
177
237
}, nil
178
238
}
179
239
240
+ type MetricAggregator struct {
241
+ aggregations map [string ]float64
242
+ metrics map [string ]annotatedMetric
243
+ }
244
+
245
+ func NewMetricAggregator (size int ) * MetricAggregator {
246
+ return & MetricAggregator {
247
+ aggregations : make (map [string ]float64 , size ),
248
+ metrics : make (map [string ]annotatedMetric , size ),
249
+ }
250
+ }
251
+
252
+ func (a * MetricAggregator ) Aggregate (am annotatedMetric , labels []string ) error {
253
+ // if we already have an entry for this key, don't clone this am afresh - rather use the existing one
254
+ // this will be a bit more memory efficient
255
+ // ...do this after unit-test is written
256
+
257
+ clone := am .clone ()
258
+
259
+ fields := make (map [string ]string , len (labels ))
260
+ labelValues := make ([]string , 0 , len (labels ))
261
+
262
+ for _ , label := range labels {
263
+ val , err := clone .getFieldByLabel (label )
264
+ if err != nil {
265
+ return err
266
+ }
267
+
268
+ fields [label ] = val
269
+ labelValues = append (labelValues , val )
270
+ }
271
+
272
+ key := fmt .Sprintf ("%s:%v" , clone .Stats_Metric .Name , fields )
273
+
274
+ clone .aggregateByLabels = labels
275
+ a .aggregations [key ] += clone .Value
276
+
277
+ clone .Value = a .aggregations [key ]
278
+ a .metrics [key ] = clone
279
+
280
+ return nil
281
+ }
282
+
283
+ func (a * MetricAggregator ) asMetrics () (out []annotatedMetric ) {
284
+ for _ , am := range a .metrics {
285
+ out = append (out , am )
286
+ }
287
+ return
288
+ }
289
+
180
290
func (ma * MetricsAggregator ) Run (ctx context.Context ) func () {
181
291
ctx , cancelFunc := context .WithCancel (ctx )
182
292
done := make (chan struct {})
@@ -216,15 +326,40 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
216
326
case outputCh := <- ma .collectCh :
217
327
ma .log .Debug (ctx , "collect metrics" )
218
328
329
+ var input []annotatedMetric
219
330
output := make ([]prometheus.Metric , 0 , len (ma .store ))
220
- for _ , m := range ma .store {
331
+
332
+ // If custom aggregation labels have not been chosen, generate Prometheus metrics without any pre-aggregation.
333
+ // This results in higher cardinality, but may be desirable in larger deployments.
334
+ if len (ma .aggregateByLabels ) == 0 {
335
+ for _ , m := range ma .store {
336
+ // Aggregate by high cardinality labels.
337
+ m .aggregateByLabels = agentMetricsLabels
338
+ input = append (input , m )
339
+ }
340
+ } else {
341
+ // However, if custom aggregations have been chosen, we need to aggregate the values from the annotated
342
+ // metrics because we cannot register multiple metric series with the same labels.
343
+ aggregator := NewMetricAggregator (len (ma .store ) * len (ma .aggregateByLabels ))
344
+
345
+ for _ , m := range ma .store {
346
+ if err := aggregator .Aggregate (m , ma .aggregateByLabels ); err != nil {
347
+ ma .log .Error (ctx , "can't aggregate labels" , slog .F ("labels" , strings .Join (ma .aggregateByLabels , "," )), slog .Error (err ))
348
+ }
349
+ }
350
+
351
+ input = aggregator .asMetrics ()
352
+ }
353
+
354
+ for _ , m := range input {
221
355
promMetric , err := m .asPrometheus ()
222
356
if err != nil {
223
357
ma .log .Error (ctx , "can't convert Prometheus value type" , slog .F ("name" , m .Name ), slog .F ("type" , m .Type ), slog .F ("value" , m .Value ), slog .Error (err ))
224
358
continue
225
359
}
226
360
output = append (output , promMetric )
227
361
}
362
+
228
363
outputCh <- output
229
364
close (outputCh )
230
365
case <- cleanupTicker .C :
@@ -260,7 +395,7 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
260
395
func (* MetricsAggregator ) Describe (_ chan <- * prometheus.Desc ) {
261
396
}
262
397
263
- var agentMetricsLabels = []string {usernameLabel , workspaceNameLabel , agentNameLabel , templateNameLabel }
398
+ var agentMetricsLabels = []string {agentmetrics . UsernameLabel , agentmetrics . WorkspaceNameLabel , agentmetrics . AgentNameLabel , agentmetrics . TemplateNameLabel }
264
399
265
400
// AgentMetricLabels are the labels used to decorate an agent's metrics.
266
401
// This list should match the list of labels in agentMetricsLabels.
0 commit comments