@@ -2,6 +2,9 @@ package prometheusmetrics
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
6
+ "sort"
7
+ "strings"
5
8
"time"
6
9
7
10
"github.com/prometheus/client_golang/prometheus"
@@ -24,20 +27,23 @@ const (
24
27
loggerName = "prometheusmetrics"
25
28
26
29
sizeCollectCh = 10
27
- sizeUpdateCh = 1024
30
+ sizeUpdateCh = 4096
28
31
29
32
defaultMetricsCleanupInterval = 2 * time .Minute
30
33
)
31
34
35
+ var MetricLabelValueEncoder = strings .NewReplacer ("\\ " , "\\ \\ " , "|" , "\\ |" , "," , "\\ ," , "=" , "\\ =" )
36
+
32
37
type MetricsAggregator struct {
33
- queue [ ]annotatedMetric
38
+ store map [ metricKey ]annotatedMetric
34
39
35
40
log slog.Logger
36
41
metricsCleanupInterval time.Duration
37
42
38
43
collectCh chan (chan []prometheus.Metric )
39
44
updateCh chan updateRequest
40
45
46
+ storeSizeGauge prometheus.Gauge
41
47
updateHistogram prometheus.Histogram
42
48
cleanupHistogram prometheus.Histogram
43
49
}
@@ -64,17 +70,37 @@ type annotatedMetric struct {
64
70
expiryDate time.Time
65
71
}
66
72
67
- var _ prometheus.Collector = new (MetricsAggregator )
73
+ type metricKey struct {
74
+ username string
75
+ workspaceName string
76
+ agentName string
77
+ templateName string
68
78
69
- func (am * annotatedMetric ) is (req updateRequest , m * agentproto.Stats_Metric ) bool {
70
- return am .username == req .username &&
71
- am .workspaceName == req .workspaceName &&
72
- am .agentName == req .agentName &&
73
- am .templateName == req .templateName &&
74
- am .Name == m .Name &&
75
- agentproto .LabelsEqual (am .Labels , m .Labels )
79
+ metricName string
80
+ labelsStr string
76
81
}
77
82
83
+ func hashKey (req * updateRequest , m * agentproto.Stats_Metric ) metricKey {
84
+ labelPairs := make (sort.StringSlice , 0 , len (m .GetLabels ()))
85
+ for _ , label := range m .GetLabels () {
86
+ if label .Value == "" {
87
+ continue
88
+ }
89
+ labelPairs = append (labelPairs , fmt .Sprintf ("%s=%s" , label .Name , MetricLabelValueEncoder .Replace (label .Value )))
90
+ }
91
+ labelPairs .Sort ()
92
+ return metricKey {
93
+ username : req .username ,
94
+ workspaceName : req .workspaceName ,
95
+ agentName : req .agentName ,
96
+ templateName : req .templateName ,
97
+ metricName : m .Name ,
98
+ labelsStr : strings .Join (labelPairs , "," ),
99
+ }
100
+ }
101
+
102
+ var _ prometheus.Collector = new (MetricsAggregator )
103
+
78
104
func (am * annotatedMetric ) asPrometheus () (prometheus.Metric , error ) {
79
105
labels := make ([]string , 0 , len (agentMetricsLabels )+ len (am .Labels ))
80
106
labelValues := make ([]string , 0 , len (agentMetricsLabels )+ len (am .Labels ))
@@ -101,14 +127,25 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
101
127
metricsCleanupInterval = duration
102
128
}
103
129
130
+ storeSizeGauge := prometheus .NewGauge (prometheus.GaugeOpts {
131
+ Namespace : "coderd" ,
132
+ Subsystem : "prometheusmetrics" ,
133
+ Name : "metrics_aggregator_store_size" ,
134
+ Help : "The number of metrics stored in the aggregator" ,
135
+ })
136
+ err := registerer .Register (storeSizeGauge )
137
+ if err != nil {
138
+ return nil , err
139
+ }
140
+
104
141
updateHistogram := prometheus .NewHistogram (prometheus.HistogramOpts {
105
142
Namespace : "coderd" ,
106
143
Subsystem : "prometheusmetrics" ,
107
144
Name : "metrics_aggregator_execution_update_seconds" ,
108
145
Help : "Histogram for duration of metrics aggregator update in seconds." ,
109
146
Buckets : []float64 {0.001 , 0.005 , 0.010 , 0.025 , 0.050 , 0.100 , 0.500 , 1 , 5 , 10 , 30 },
110
147
})
111
- err : = registerer .Register (updateHistogram )
148
+ err = registerer .Register (updateHistogram )
112
149
if err != nil {
113
150
return nil , err
114
151
}
@@ -129,9 +166,12 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
129
166
log : logger .Named (loggerName ),
130
167
metricsCleanupInterval : metricsCleanupInterval ,
131
168
169
+ store : map [metricKey ]annotatedMetric {},
170
+
132
171
collectCh : make (chan (chan []prometheus.Metric ), sizeCollectCh ),
133
172
updateCh : make (chan updateRequest , sizeUpdateCh ),
134
173
174
+ storeSizeGauge : storeSizeGauge ,
135
175
updateHistogram : updateHistogram ,
136
176
cleanupHistogram : cleanupHistogram ,
137
177
}, nil
@@ -152,32 +192,32 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
152
192
ma .log .Debug (ctx , "update metrics" )
153
193
154
194
timer := prometheus .NewTimer (ma .updateHistogram )
155
- UpdateLoop:
156
195
for _ , m := range req .metrics {
157
- for i , q := range ma .queue {
158
- if q .is (req , m ) {
159
- ma .queue [i ].Stats_Metric .Value = m .Value
160
- ma .queue [i ].expiryDate = req .timestamp .Add (ma .metricsCleanupInterval )
161
- continue UpdateLoop
196
+ key := hashKey (& req , m )
197
+
198
+ if val , ok := ma .store [key ]; ok {
199
+ val .Stats_Metric .Value = m .Value
200
+ val .expiryDate = req .timestamp .Add (ma .metricsCleanupInterval )
201
+ ma .store [key ] = val
202
+ } else {
203
+ ma .store [key ] = annotatedMetric {
204
+ Stats_Metric : m ,
205
+ username : req .username ,
206
+ workspaceName : req .workspaceName ,
207
+ agentName : req .agentName ,
208
+ templateName : req .templateName ,
209
+ expiryDate : req .timestamp .Add (ma .metricsCleanupInterval ),
162
210
}
163
211
}
164
-
165
- ma .queue = append (ma .queue , annotatedMetric {
166
- Stats_Metric : m ,
167
- username : req .username ,
168
- workspaceName : req .workspaceName ,
169
- agentName : req .agentName ,
170
- templateName : req .templateName ,
171
- expiryDate : req .timestamp .Add (ma .metricsCleanupInterval ),
172
- })
173
212
}
174
-
175
213
timer .ObserveDuration ()
214
+
215
+ ma .storeSizeGauge .Set (float64 (len (ma .store )))
176
216
case outputCh := <- ma .collectCh :
177
217
ma .log .Debug (ctx , "collect metrics" )
178
218
179
- output := make ([]prometheus.Metric , 0 , len (ma .queue ))
180
- for _ , m := range ma .queue {
219
+ output := make ([]prometheus.Metric , 0 , len (ma .store ))
220
+ for _ , m := range ma .store {
181
221
promMetric , err := m .asPrometheus ()
182
222
if err != nil {
183
223
ma .log .Error (ctx , "can't convert Prometheus value type" , slog .F ("name" , m .Name ), slog .F ("type" , m .Type ), slog .F ("value" , m .Value ), slog .Error (err ))
@@ -191,29 +231,17 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
191
231
ma .log .Debug (ctx , "clean expired metrics" )
192
232
193
233
timer := prometheus .NewTimer (ma .cleanupHistogram )
194
-
195
234
now := time .Now ()
196
235
197
- var hasExpiredMetrics bool
198
- for _ , m := range ma .queue {
199
- if now .After (m .expiryDate ) {
200
- hasExpiredMetrics = true
201
- break
202
- }
203
- }
204
-
205
- if hasExpiredMetrics {
206
- fresh := make ([]annotatedMetric , 0 , len (ma .queue ))
207
- for _ , m := range ma .queue {
208
- if m .expiryDate .After (now ) {
209
- fresh = append (fresh , m )
210
- }
236
+ for key , val := range ma .store {
237
+ if now .After (val .expiryDate ) {
238
+ delete (ma .store , key )
211
239
}
212
- ma .queue = fresh
213
240
}
214
241
215
242
timer .ObserveDuration ()
216
243
cleanupTicker .Reset (ma .metricsCleanupInterval )
244
+ ma .storeSizeGauge .Set (float64 (len (ma .store )))
217
245
218
246
case <- ctx .Done ():
219
247
ma .log .Debug (ctx , "metrics aggregator is stopped" )
0 commit comments