@@ -2,6 +2,8 @@ package prometheusmetrics
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
6
+ "strings"
5
7
"time"
6
8
7
9
"github.com/prometheus/client_golang/prometheus"
@@ -30,7 +32,7 @@ const (
30
32
)
31
33
32
34
type MetricsAggregator struct {
33
- queue [ ]annotatedMetric
35
+ store map [ string ]annotatedMetric
34
36
35
37
log slog.Logger
36
38
metricsCleanupInterval time.Duration
@@ -64,6 +66,20 @@ type annotatedMetric struct {
64
66
expiryDate time.Time
65
67
}
66
68
69
+ func hashKey (req * updateRequest , m * agentproto.Stats_Metric ) string {
70
+ var sbLabels strings.Builder
71
+ for i , label := range m .GetLabels () {
72
+ _ , _ = sbLabels .WriteString (label .Name )
73
+ _ = sbLabels .WriteByte ('=' )
74
+ _ , _ = sbLabels .WriteString (label .Value )
75
+
76
+ if i - 1 != len (m .GetLabels ()) {
77
+ _ = sbLabels .WriteByte (',' )
78
+ }
79
+ }
80
+ return fmt .Sprintf ("%s|%s|%s|%s|%s|%s" , req .username , req .workspaceName , req .agentName , req .templateName , m .GetName (), sbLabels .String ())
81
+ }
82
+
67
83
var _ prometheus.Collector = new (MetricsAggregator )
68
84
69
85
func (am * annotatedMetric ) is (req updateRequest , m * agentproto.Stats_Metric ) bool {
@@ -129,6 +145,8 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
129
145
log : logger .Named (loggerName ),
130
146
metricsCleanupInterval : metricsCleanupInterval ,
131
147
148
+ store : map [string ]annotatedMetric {},
149
+
132
150
collectCh : make (chan (chan []prometheus.Metric ), sizeCollectCh ),
133
151
updateCh : make (chan updateRequest , sizeUpdateCh ),
134
152
@@ -152,32 +170,30 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
152
170
ma .log .Debug (ctx , "update metrics" )
153
171
154
172
timer := prometheus .NewTimer (ma .updateHistogram )
155
- UpdateLoop:
156
173
for _ , m := range req .metrics {
157
- for i , q := range ma .queue {
158
- if q .is (req , m ) {
159
- ma .queue [i ].Stats_Metric .Value = m .Value
160
- ma .queue [i ].expiryDate = req .timestamp .Add (ma .metricsCleanupInterval )
161
- continue UpdateLoop
174
+ key := hashKey (& req , m )
175
+ if val , ok := ma .store [key ]; ok {
176
+ val .Stats_Metric .Value = m .Value
177
+ val .expiryDate = req .timestamp .Add (ma .metricsCleanupInterval )
178
+ ma .store [key ] = val
179
+ } else {
180
+ ma .store [key ] = annotatedMetric {
181
+ Stats_Metric : m ,
182
+ username : req .username ,
183
+ workspaceName : req .workspaceName ,
184
+ agentName : req .agentName ,
185
+ templateName : req .templateName ,
186
+ expiryDate : req .timestamp .Add (ma .metricsCleanupInterval ),
162
187
}
163
188
}
164
-
165
- ma .queue = append (ma .queue , annotatedMetric {
166
- Stats_Metric : m ,
167
- username : req .username ,
168
- workspaceName : req .workspaceName ,
169
- agentName : req .agentName ,
170
- templateName : req .templateName ,
171
- expiryDate : req .timestamp .Add (ma .metricsCleanupInterval ),
172
- })
173
189
}
174
190
175
191
timer .ObserveDuration ()
176
192
case outputCh := <- ma .collectCh :
177
193
ma .log .Debug (ctx , "collect metrics" )
178
194
179
- output := make ([]prometheus.Metric , 0 , len (ma .queue ))
180
- for _ , m := range ma .queue {
195
+ output := make ([]prometheus.Metric , 0 , len (ma .store ))
196
+ for _ , m := range ma .store {
181
197
promMetric , err := m .asPrometheus ()
182
198
if err != nil {
183
199
ma .log .Error (ctx , "can't convert Prometheus value type" , slog .F ("name" , m .Name ), slog .F ("type" , m .Type ), slog .F ("value" , m .Value ), slog .Error (err ))
@@ -191,25 +207,12 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
191
207
ma .log .Debug (ctx , "clean expired metrics" )
192
208
193
209
timer := prometheus .NewTimer (ma .cleanupHistogram )
194
-
195
210
now := time .Now ()
196
211
197
- var hasExpiredMetrics bool
198
- for _ , m := range ma .queue {
199
- if now .After (m .expiryDate ) {
200
- hasExpiredMetrics = true
201
- break
202
- }
203
- }
204
-
205
- if hasExpiredMetrics {
206
- fresh := make ([]annotatedMetric , 0 , len (ma .queue ))
207
- for _ , m := range ma .queue {
208
- if m .expiryDate .After (now ) {
209
- fresh = append (fresh , m )
210
- }
212
+ for key , val := range ma .store {
213
+ if now .After (val .expiryDate ) {
214
+ delete (ma .store , key )
211
215
}
212
- ma .queue = fresh
213
216
}
214
217
215
218
timer .ObserveDuration ()
0 commit comments