Skip to content

Commit 8df9eea

Browse files
committed
Metrics expiry
1 parent 10e6d8d commit 8df9eea

File tree

2 files changed

+57
-6
lines changed

2 files changed

+57
-6
lines changed

cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
724724
}
725725
defer closeAgentStatsFunc()
726726

727-
metricsAggregator := prometheusmetrics.NewMetricsAggregator(logger)
727+
metricsAggregator := prometheusmetrics.NewMetricsAggregator(logger, 0)
728728
cancelMetricsAggregator := metricsAggregator.Run(ctx)
729729
defer cancelMetricsAggregator()
730730

coderd/prometheusmetrics/aggregator.go

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package prometheusmetrics
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/prometheus/client_golang/prometheus"
78
"golang.org/x/xerrors"
@@ -22,12 +23,15 @@ const (
2223
const (
2324
sizeCollectCh = 10
2425
sizeUpdateCh = 1024
26+
27+
defaultMetricsCleanupInterval = 2 * time.Minute
2528
)
2629

2730
type MetricsAggregator struct {
2831
queue []annotatedMetric
2932

30-
log slog.Logger
33+
log slog.Logger
34+
metricsCleanupInterval time.Duration
3135

3236
collectCh chan (chan<- prometheus.Metric)
3337
updateCh chan updateRequest
@@ -39,6 +43,8 @@ type updateRequest struct {
3943
agentName string
4044

4145
metrics []agentsdk.AgentMetric
46+
47+
timestamp time.Time
4248
}
4349

4450
type annotatedMetric struct {
@@ -47,13 +53,20 @@ type annotatedMetric struct {
4753
username string
4854
workspaceName string
4955
agentName string
56+
57+
expiryDate time.Time
5058
}
5159

5260
var _ prometheus.Collector = new(MetricsAggregator)
5361

54-
func NewMetricsAggregator(logger slog.Logger) *MetricsAggregator {
62+
func NewMetricsAggregator(logger slog.Logger, duration time.Duration) *MetricsAggregator {
63+
metricsCleanupInterval := defaultMetricsCleanupInterval
64+
if duration > 0 {
65+
metricsCleanupInterval = duration
66+
}
5567
return &MetricsAggregator{
56-
log: logger,
68+
log: logger,
69+
metricsCleanupInterval: metricsCleanupInterval,
5770

5871
collectCh: make(chan (chan<- prometheus.Metric), sizeCollectCh),
5972
updateCh: make(chan updateRequest, sizeUpdateCh),
@@ -64,17 +77,22 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
6477
ctx, cancelFunc := context.WithCancel(ctx)
6578
done := make(chan struct{})
6679

80+
cleanupTicker := time.NewTicker(ma.metricsCleanupInterval)
6781
go func() {
6882
defer close(done)
83+
defer cleanupTicker.Stop()
6984

7085
for {
7186
select {
7287
case req := <-ma.updateCh:
88+
ma.log.Debug(ctx, "metrics aggregator: update metrics")
89+
7390
UpdateLoop:
7491
for _, m := range req.metrics {
7592
for i, q := range ma.queue {
7693
if q.username == req.username && q.workspaceName == req.workspaceName && q.agentName == req.agentName && q.Name == m.Name {
7794
ma.queue[i].AgentMetric.Value = m.Value
95+
ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
7896
continue UpdateLoop
7997
}
8098
}
@@ -85,20 +103,51 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
85103
agentName: req.agentName,
86104

87105
AgentMetric: m,
106+
107+
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
88108
})
89109
}
90110
case inputCh := <-ma.collectCh:
111+
ma.log.Debug(ctx, "metrics aggregator: collect metrics")
112+
91113
for _, m := range ma.queue {
92114
desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil)
93115
valueType, err := asPrometheusValueType(m.Type)
94116
if err != nil {
95-
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("value_type", m.Type), slog.Error(err))
117+
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err))
96118
continue
97119
}
98120
constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName)
99121
inputCh <- constMetric
100122
}
101123
close(inputCh)
124+
case <-cleanupTicker.C:
125+
ma.log.Debug(ctx, "metrics aggregator: clean expired metrics")
126+
127+
now := time.Now()
128+
129+
var hasExpiredMetrics bool
130+
for _, m := range ma.queue {
131+
if m.expiryDate.After(now) {
132+
hasExpiredMetrics = true
133+
break
134+
}
135+
}
136+
137+
if !hasExpiredMetrics {
138+
continue
139+
}
140+
141+
var j int
142+
fresh := make([]annotatedMetric, len(ma.queue))
143+
for _, m := range ma.queue {
144+
if m.expiryDate.After(now) {
145+
fresh[j] = m
146+
j++
147+
}
148+
}
149+
fresh = fresh[:j]
150+
ma.queue = fresh
102151
case <-ctx.Done():
103152
ma.log.Debug(ctx, "metrics aggregator: is stopped")
104153
return
@@ -140,9 +189,11 @@ func (ma *MetricsAggregator) Update(ctx context.Context, username, workspaceName
140189
workspaceName: workspaceName,
141190
agentName: agentName,
142191
metrics: metrics,
192+
193+
timestamp: time.Now(),
143194
}:
144195
case <-ctx.Done():
145-
ma.log.Debug(ctx, "metrics aggregator: update is canceled")
196+
ma.log.Debug(ctx, "metrics aggregator: update request is canceled")
146197
default:
147198
ma.log.Error(ctx, "metrics aggregator: update queue is full")
148199
}

0 commit comments

Comments
 (0)