Skip to content

Commit b936232

Browse files
committed
feat: use map instead slice in metrics aggregator
1 parent 979a920 commit b936232

File tree

2 files changed

+73
-36
lines changed

2 files changed

+73
-36
lines changed

coderd/prometheusmetrics/aggregator.go

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package prometheusmetrics
22

33
import (
44
"context"
5+
"fmt"
6+
"strings"
57
"time"
68

79
"github.com/prometheus/client_golang/prometheus"
@@ -30,7 +32,7 @@ const (
3032
)
3133

3234
type MetricsAggregator struct {
33-
queue []annotatedMetric
35+
store map[string]annotatedMetric
3436

3537
log slog.Logger
3638
metricsCleanupInterval time.Duration
@@ -64,6 +66,20 @@ type annotatedMetric struct {
6466
expiryDate time.Time
6567
}
6668

69+
func hashKey(req *updateRequest, m *agentproto.Stats_Metric) string {
70+
var sbLabels strings.Builder
71+
for i, label := range m.GetLabels() {
72+
_, _ = sbLabels.WriteString(label.Name)
73+
_ = sbLabels.WriteByte('=')
74+
_, _ = sbLabels.WriteString(label.Value)
75+
76+
if i-1 != len(m.GetLabels()) {
77+
_ = sbLabels.WriteByte(',')
78+
}
79+
}
80+
return fmt.Sprintf("%s|%s|%s|%s|%s|%s", req.username, req.workspaceName, req.agentName, req.templateName, m.GetName(), sbLabels.String())
81+
}
82+
6783
var _ prometheus.Collector = new(MetricsAggregator)
6884

6985
func (am *annotatedMetric) is(req updateRequest, m *agentproto.Stats_Metric) bool {
@@ -129,6 +145,8 @@ func NewMetricsAggregator(logger slog.Logger, registerer prometheus.Registerer,
129145
log: logger.Named(loggerName),
130146
metricsCleanupInterval: metricsCleanupInterval,
131147

148+
store: map[string]annotatedMetric{},
149+
132150
collectCh: make(chan (chan []prometheus.Metric), sizeCollectCh),
133151
updateCh: make(chan updateRequest, sizeUpdateCh),
134152

@@ -152,32 +170,30 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
152170
ma.log.Debug(ctx, "update metrics")
153171

154172
timer := prometheus.NewTimer(ma.updateHistogram)
155-
UpdateLoop:
156173
for _, m := range req.metrics {
157-
for i, q := range ma.queue {
158-
if q.is(req, m) {
159-
ma.queue[i].Stats_Metric.Value = m.Value
160-
ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
161-
continue UpdateLoop
174+
key := hashKey(&req, m)
175+
if val, ok := ma.store[key]; ok {
176+
val.Stats_Metric.Value = m.Value
177+
val.expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
178+
ma.store[key] = val
179+
} else {
180+
ma.store[key] = annotatedMetric{
181+
Stats_Metric: m,
182+
username: req.username,
183+
workspaceName: req.workspaceName,
184+
agentName: req.agentName,
185+
templateName: req.templateName,
186+
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
162187
}
163188
}
164-
165-
ma.queue = append(ma.queue, annotatedMetric{
166-
Stats_Metric: m,
167-
username: req.username,
168-
workspaceName: req.workspaceName,
169-
agentName: req.agentName,
170-
templateName: req.templateName,
171-
expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
172-
})
173189
}
174190

175191
timer.ObserveDuration()
176192
case outputCh := <-ma.collectCh:
177193
ma.log.Debug(ctx, "collect metrics")
178194

179-
output := make([]prometheus.Metric, 0, len(ma.queue))
180-
for _, m := range ma.queue {
195+
output := make([]prometheus.Metric, 0, len(ma.store))
196+
for _, m := range ma.store {
181197
promMetric, err := m.asPrometheus()
182198
if err != nil {
183199
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err))
@@ -191,25 +207,12 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
191207
ma.log.Debug(ctx, "clean expired metrics")
192208

193209
timer := prometheus.NewTimer(ma.cleanupHistogram)
194-
195210
now := time.Now()
196211

197-
var hasExpiredMetrics bool
198-
for _, m := range ma.queue {
199-
if now.After(m.expiryDate) {
200-
hasExpiredMetrics = true
201-
break
202-
}
203-
}
204-
205-
if hasExpiredMetrics {
206-
fresh := make([]annotatedMetric, 0, len(ma.queue))
207-
for _, m := range ma.queue {
208-
if m.expiryDate.After(now) {
209-
fresh = append(fresh, m)
210-
}
212+
for key, val := range ma.store {
213+
if now.After(val.expiryDate) {
214+
delete(ma.store, key)
211215
}
212-
ma.queue = fresh
213216
}
214217

215218
timer.ObserveDuration()

coderd/prometheusmetrics/aggregator_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package prometheusmetrics_test
33
import (
44
"context"
55
"sort"
6+
"strings"
67
"sync/atomic"
78
"testing"
89
"time"
@@ -80,15 +81,14 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) {
8081
}
8182
expected := []*agentproto.Stats_Metric{
8283
{Name: "a_counter_one", Type: agentproto.Stats_Metric_COUNTER, Value: 1, Labels: commonLabels},
83-
{Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: 4, Labels: commonLabels},
8484
{Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: -9, Labels: []*agentproto.Stats_Metric_Label{
8585
{Name: "agent_name", Value: testAgentName},
8686
{Name: "lizz", Value: "rizz"},
8787
{Name: "username", Value: testUsername},
8888
{Name: "workspace_name", Value: testWorkspaceName},
8989
{Name: "template_name", Value: testTemplateName},
9090
}},
91-
{Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 5, Labels: commonLabels},
91+
{Name: "b_counter_two", Type: agentproto.Stats_Metric_COUNTER, Value: 4, Labels: commonLabels},
9292
{Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 2, Labels: []*agentproto.Stats_Metric_Label{
9393
{Name: "agent_name", Value: testAgentName},
9494
{Name: "foobar", Value: "Foobaz"},
@@ -97,6 +97,7 @@ func TestUpdateMetrics_MetricsDoNotExpire(t *testing.T) {
9797
{Name: "workspace_name", Value: testWorkspaceName},
9898
{Name: "template_name", Value: testTemplateName},
9999
}},
100+
{Name: "c_gauge_three", Type: agentproto.Stats_Metric_GAUGE, Value: 5, Labels: commonLabels},
100101
{Name: "d_gauge_four", Type: agentproto.Stats_Metric_GAUGE, Value: 6, Labels: commonLabels},
101102
}
102103

@@ -130,6 +131,39 @@ func verifyCollectedMetrics(t *testing.T, expected []*agentproto.Stats_Metric, a
130131
return false
131132
}
132133

134+
prometheusMetricString := func(m prometheus.Metric) string {
135+
var sb strings.Builder
136+
137+
desc := m.Desc()
138+
_, _ = sb.WriteString(desc.String())
139+
_ = sb.WriteByte('|')
140+
141+
var d dto.Metric
142+
err := m.Write(&d)
143+
require.NoError(t, err)
144+
dtoLabels := asMetricAgentLabels(d.GetLabel())
145+
sort.Slice(dtoLabels, func(i, j int) bool {
146+
return dtoLabels[i].Name < dtoLabels[j].Name
147+
})
148+
149+
for i, dtoLabel := range dtoLabels {
150+
_, _ = sb.WriteString(dtoLabel.Name)
151+
_ = sb.WriteByte('=')
152+
_, _ = sb.WriteString(dtoLabel.Value)
153+
154+
if i-1 != len(dtoLabels) {
155+
_ = sb.WriteByte(',')
156+
}
157+
}
158+
return sb.String()
159+
}
160+
161+
sort.Slice(actual, func(i, j int) bool {
162+
m1 := prometheusMetricString(actual[i])
163+
m2 := prometheusMetricString(actual[j])
164+
return m1 < m2
165+
})
166+
133167
for i, e := range expected {
134168
desc := actual[i].Desc()
135169
assert.Contains(t, desc.String(), e.Name)

0 commit comments

Comments
 (0)