@@ -2,6 +2,7 @@ package prometheusmetrics
2
2
3
3
import (
4
4
"context"
5
+ "time"
5
6
6
7
"github.com/prometheus/client_golang/prometheus"
7
8
"golang.org/x/xerrors"
@@ -22,12 +23,15 @@ const (
22
23
const (
23
24
sizeCollectCh = 10
24
25
sizeUpdateCh = 1024
26
+
27
+ defaultMetricsCleanupInterval = 2 * time .Minute
25
28
)
26
29
27
30
type MetricsAggregator struct {
28
31
queue []annotatedMetric
29
32
30
- log slog.Logger
33
+ log slog.Logger
34
+ metricsCleanupInterval time.Duration
31
35
32
36
collectCh chan (chan <- prometheus.Metric )
33
37
updateCh chan updateRequest
@@ -39,6 +43,8 @@ type updateRequest struct {
39
43
agentName string
40
44
41
45
metrics []agentsdk.AgentMetric
46
+
47
+ timestamp time.Time
42
48
}
43
49
44
50
type annotatedMetric struct {
@@ -47,13 +53,20 @@ type annotatedMetric struct {
47
53
username string
48
54
workspaceName string
49
55
agentName string
56
+
57
+ expiryDate time.Time
50
58
}
51
59
52
60
var _ prometheus.Collector = new (MetricsAggregator )
53
61
54
- func NewMetricsAggregator (logger slog.Logger ) * MetricsAggregator {
62
+ func NewMetricsAggregator (logger slog.Logger , duration time.Duration ) * MetricsAggregator {
63
+ metricsCleanupInterval := defaultMetricsCleanupInterval
64
+ if duration > 0 {
65
+ metricsCleanupInterval = duration
66
+ }
55
67
return & MetricsAggregator {
56
- log : logger ,
68
+ log : logger ,
69
+ metricsCleanupInterval : metricsCleanupInterval ,
57
70
58
71
collectCh : make (chan (chan <- prometheus.Metric ), sizeCollectCh ),
59
72
updateCh : make (chan updateRequest , sizeUpdateCh ),
@@ -64,17 +77,22 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
64
77
ctx , cancelFunc := context .WithCancel (ctx )
65
78
done := make (chan struct {})
66
79
80
+ cleanupTicker := time .NewTicker (ma .metricsCleanupInterval )
67
81
go func () {
68
82
defer close (done )
83
+ defer cleanupTicker .Stop ()
69
84
70
85
for {
71
86
select {
72
87
case req := <- ma .updateCh :
88
+ ma .log .Debug (ctx , "metrics aggregator: update metrics" )
89
+
73
90
UpdateLoop:
74
91
for _ , m := range req .metrics {
75
92
for i , q := range ma .queue {
76
93
if q .username == req .username && q .workspaceName == req .workspaceName && q .agentName == req .agentName && q .Name == m .Name {
77
94
ma .queue [i ].AgentMetric .Value = m .Value
95
+ ma .queue [i ].expiryDate = req .timestamp .Add (ma .metricsCleanupInterval )
78
96
continue UpdateLoop
79
97
}
80
98
}
@@ -85,20 +103,51 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
85
103
agentName : req .agentName ,
86
104
87
105
AgentMetric : m ,
106
+
107
+ expiryDate : req .timestamp .Add (ma .metricsCleanupInterval ),
88
108
})
89
109
}
90
110
case inputCh := <- ma .collectCh :
111
+ ma .log .Debug (ctx , "metrics aggregator: collect metrics" )
112
+
91
113
for _ , m := range ma .queue {
92
114
desc := prometheus .NewDesc (m .Name , metricHelpForAgent , agentMetricsLabels , nil )
93
115
valueType , err := asPrometheusValueType (m .Type )
94
116
if err != nil {
95
- ma .log .Error (ctx , "can't convert Prometheus value type" , slog .F ("value_type " , m .Type ), slog .Error (err ))
117
+ ma .log .Error (ctx , "can't convert Prometheus value type" , slog .F ("name " , m .Name ), slog . F ( "type" , m . Type ), slog . F ( "value" , m . Value ), slog .Error (err ))
96
118
continue
97
119
}
98
120
constMetric := prometheus .MustNewConstMetric (desc , valueType , m .Value , m .username , m .workspaceName , m .agentName )
99
121
inputCh <- constMetric
100
122
}
101
123
close (inputCh )
124
+ case <- cleanupTicker .C :
125
+ ma .log .Debug (ctx , "metrics aggregator: clean expired metrics" )
126
+
127
+ now := time .Now ()
128
+
129
+ var hasExpiredMetrics bool
130
+ for _ , m := range ma .queue {
131
+ if m .expiryDate .After (now ) {
132
+ hasExpiredMetrics = true
133
+ break
134
+ }
135
+ }
136
+
137
+ if ! hasExpiredMetrics {
138
+ continue
139
+ }
140
+
141
+ var j int
142
+ fresh := make ([]annotatedMetric , len (ma .queue ))
143
+ for _ , m := range ma .queue {
144
+ if m .expiryDate .After (now ) {
145
+ fresh [j ] = m
146
+ j ++
147
+ }
148
+ }
149
+ fresh = fresh [:j ]
150
+ ma .queue = fresh
102
151
case <- ctx .Done ():
103
152
ma .log .Debug (ctx , "metrics aggregator: is stopped" )
104
153
return
@@ -140,9 +189,11 @@ func (ma *MetricsAggregator) Update(ctx context.Context, username, workspaceName
140
189
workspaceName : workspaceName ,
141
190
agentName : agentName ,
142
191
metrics : metrics ,
192
+
193
+ timestamp : time .Now (),
143
194
}:
144
195
case <- ctx .Done ():
145
- ma .log .Debug (ctx , "metrics aggregator: update is canceled" )
196
+ ma .log .Debug (ctx , "metrics aggregator: update request is canceled" )
146
197
default :
147
198
ma .log .Error (ctx , "metrics aggregator: update queue is full" )
148
199
}
0 commit comments