Skip to content

Commit aacb4a2

Browse files
authored
feat: use map instead of slice in metrics aggregator (#11815)
1 parent 37e9479 commit aacb4a2

File tree

3 files changed

+156
-257
lines changed

3 files changed

+156
-257
lines changed

coderd/prometheusmetrics/aggregator.go

+73-45
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package prometheusmetrics
22

33
import (
44
"context"
5+
"fmt"
6+
"sort"
7+
"strings"
58
"time"
69

710
"github.com/prometheus/client_golang/prometheus"
@@ -24,20 +27,23 @@ const (
2427
loggerName = "prometheusmetrics"
2528

2629
sizeCollectCh = 10
27-
sizeUpdateCh = 1024
30+
sizeUpdateCh = 4096
2831

2932
defaultMetricsCleanupInterval = 2 * time.Minute
3033
)
3134

35+
var MetricLabelValueEncoder = strings.NewReplacer("\\", "\\\\", "|", "\\|", ",", "\\,", "=", "\\=")
36+
3237
type MetricsAggregator struct {
33-
queue []annotatedMetric
38+
store map[metricKey]annotatedMetric
3439

3540
log slog.Logger
3641
metricsCleanupInterval time.Duration
3742

3843
collectCh chan (chan []prometheus.Metric)
3944
updateCh chan updateRequest
4045

46+
storeSizeGauge prometheus.Gauge
4147
updateHistogram prometheus.Histogram
4248
cleanupHistogram prometheus.Histogram
4349
}
@@ -64,17 +70,37 @@ type annotatedMetric struct {
6470
expiryDate time.Time
6571
}
6672

67-
var _ prometheus.Collector = new(MetricsAggregator)
73+
type metricKey struct {
74+
username string
75+
workspaceName string
76+
agentName string
77+
templateName string
6878

69-
func (am *annotatedMetric) is(req updateRequest, m *agentproto.Stats_Metric) bool {
70-
return am.username == req.username &&
71-
am.workspaceName == req.workspaceName &&
72-
am.agentName == req.agentName &&
73-
am.templateName == req.templateName &&
74-
am.Name == m.Name &&
75-
agentproto.LabelsEqual(am.Labels, m.Labels)
79+
metricName string
80+
labelsStr string
7681
}
7782

83+
func hashKey(req *updateRequest, m *agentproto.Stats_Metric) metricKey {
84+
labelPairs := make(sort.StringSlice, 0, len(m.GetLabels()))
85+
for _, label := range m.GetLabels() {
86+
if label.Value == "" {
87+
continue
88+
}
89+
labelPairs = append(labelPairs, fmt.Sprintf("%s=%s", label.Name, MetricLabelValueEncoder.Replace(label.Value)))
90+
}
91+
labelPairs.Sort()
92+
return metricKey{
93+
username: req.username,
94+
workspaceName: req.workspaceName,
95+
agentName: req.agentName,
96+
templateName: req.templateName,
97+
metricName: m.Name,
98+
labelsStr: strings.Join(labelPairs, ","),
99+
}
100+
}
101+
102+
var _ prometheus.Collector = new(MetricsAggregator)
103+
78104
func (am *annotatedMetric) asPrometheus() (prometheus.Metric, error) {
79105
labels := make([]string, 0, len(agentMetricsLabels)+len(am.Labels))
80106
labelValues := make([]string, 0, len(agentMetricsLabels)+len(am.Labels))
@@ -101,14 +127,25 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
101127
metricsCleanupInterval = duration
102128
}
103129

130+
storeSizeGauge := prometheus.NewGauge(prometheus.GaugeOpts{
131+
Namespace: "coderd",
132+
Subsystem: "prometheusmetrics",
133+
Name: "metrics_aggregator_store_size",
134+
Help: "The number of metrics stored in the aggregator",
135+
})
136+
err := registerer.Register(storeSizeGauge)
137+
if err != nil {
138+
return nil, err
139+
}
140+
104141
updateHistogram := prometheus.NewHistogram(prometheus.HistogramOpts{
105142
Namespace: "coderd",
106143
Subsystem: "prometheusmetrics",
107144
Name: "metrics_aggregator_execution_update_seconds",
108145
Help: "Histogram for duration of metrics aggregator update in seconds.",
109146
Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30},
110147
})
111-
err := registerer.Register(updateHistogram)
148+
err = registerer.Register(updateHistogram)
112149
if err != nil {
113150
return nil, err
114151
}
@@ -129,9 +166,12 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
129166
log: logger.Named(loggerName),
130167
metricsCleanupInterval: metricsCleanupInterval,
131168

169+
store: map[metricKey]annotatedMetric{},
170+
132171
collectCh: make(chan (chan []prometheus.Metric), sizeCollectCh),
133172
updateCh: make(chan updateRequest, sizeUpdateCh),
134173

174+
storeSizeGauge: storeSizeGauge,
135175
updateHistogram: updateHistogram,
136176
cleanupHistogram: cleanupHistogram,
137177
}, nil
@@ -152,32 +192,32 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
152192
ma.log.Debug(ctx, "update metrics")
153193

154194
timer := prometheus.NewTimer(ma.updateHistogram)
155-
UpdateLoop:
156195
for _, m := range req.metrics {
157-
for i, q := range ma.queue {
158-
if q.is(req, m) {
159-
ma.queue[i].Stats_Metric.Value = m.Value
160-
ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
161-
continue UpdateLoop
196+
key := hashKey(&req, m)
197+
198+
if val, ok := ma.store[key]; ok {
199+
val.Stats_Metric.Value = m.Value
200+
val.expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
201+
ma.store[key] = val
202+
} else {
203+
ma.store[key] = annotatedMetric{
204+
Stats_Metric: m,
205+
username: req.username,
206+
workspaceName: req.workspaceName,
207+
agentName: req.agentName,
208+
templateName: req.templateName,
209+
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
162210
}
163211
}
164-
165-
ma.queue = append(ma.queue, annotatedMetric{
166-
Stats_Metric: m,
167-
username: req.username,
168-
workspaceName: req.workspaceName,
169-
agentName: req.agentName,
170-
templateName: req.templateName,
171-
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
172-
})
173212
}
174-
175213
timer.ObserveDuration()
214+
215+
ma.storeSizeGauge.Set(float64(len(ma.store)))
176216
case outputCh := <-ma.collectCh:
177217
ma.log.Debug(ctx, "collect metrics")
178218

179-
output := make([]prometheus.Metric, 0, len(ma.queue))
180-
for _, m := range ma.queue {
219+
output := make([]prometheus.Metric, 0, len(ma.store))
220+
for _, m := range ma.store {
181221
promMetric, err := m.asPrometheus()
182222
if err != nil {
183223
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err))
@@ -191,29 +231,17 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
191231
ma.log.Debug(ctx, "clean expired metrics")
192232

193233
timer := prometheus.NewTimer(ma.cleanupHistogram)
194-
195234
now := time.Now()
196235

197-
var hasExpiredMetrics bool
198-
for _, m := range ma.queue {
199-
if now.After(m.expiryDate) {
200-
hasExpiredMetrics = true
201-
break
202-
}
203-
}
204-
205-
if hasExpiredMetrics {
206-
fresh := make([]annotatedMetric, 0, len(ma.queue))
207-
for _, m := range ma.queue {
208-
if m.expiryDate.After(now) {
209-
fresh = append(fresh, m)
210-
}
236+
for key, val := range ma.store {
237+
if now.After(val.expiryDate) {
238+
delete(ma.store, key)
211239
}
212-
ma.queue = fresh
213240
}
214241

215242
timer.ObserveDuration()
216243
cleanupTicker.Reset(ma.metricsCleanupInterval)
244+
ma.storeSizeGauge.Set(float64(len(ma.store)))
217245

218246
case <-ctx.Done():
219247
ma.log.Debug(ctx, "metrics aggregator is stopped")

0 commit comments

Comments
 (0)