Skip to content

Commit 10e6d8d

Browse files
committed
Metrics aggregator with channels
1 parent d86496e commit 10e6d8d

File tree

2 files changed

+106
-38
lines changed

2 files changed

+106
-38
lines changed

cli/server.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -724,9 +724,12 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
724724
}
725725
defer closeAgentStatsFunc()
726726

727-
var metricsAggregator prometheusmetrics.MetricsAggregator
727+
metricsAggregator := prometheusmetrics.NewMetricsAggregator(logger)
728+
cancelMetricsAggregator := metricsAggregator.Run(ctx)
729+
defer cancelMetricsAggregator()
730+
728731
options.UpdateAgentMetrics = metricsAggregator.Update
729-
options.PrometheusRegistry.MustRegister(&metricsAggregator)
732+
options.PrometheusRegistry.MustRegister(metricsAggregator)
730733
}
731734

732735
//nolint:revive

coderd/prometheusmetrics/aggregator.go

Lines changed: 101 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package prometheusmetrics
22

33
import (
44
"context"
5-
"sync"
65

76
"github.com/prometheus/client_golang/prometheus"
87
"golang.org/x/xerrors"
@@ -20,10 +19,26 @@ const (
2019
metricHelpForAgent = "Metric is forwarded from workspace agent connected to this instance of coderd."
2120
)
2221

22+
const (
23+
sizeCollectCh = 10
24+
sizeUpdateCh = 1024
25+
)
26+
2327
type MetricsAggregator struct {
24-
m sync.Mutex
25-
log slog.Logger
2628
queue []annotatedMetric
29+
30+
log slog.Logger
31+
32+
collectCh chan (chan<- prometheus.Metric)
33+
updateCh chan updateRequest
34+
}
35+
36+
type updateRequest struct {
37+
username string
38+
workspaceName string
39+
agentName string
40+
41+
metrics []agentsdk.AgentMetric
2742
}
2843

2944
type annotatedMetric struct {
@@ -36,6 +51,66 @@ type annotatedMetric struct {
3651

3752
var _ prometheus.Collector = new(MetricsAggregator)
3853

54+
func NewMetricsAggregator(logger slog.Logger) *MetricsAggregator {
55+
return &MetricsAggregator{
56+
log: logger,
57+
58+
collectCh: make(chan (chan<- prometheus.Metric), sizeCollectCh),
59+
updateCh: make(chan updateRequest, sizeUpdateCh),
60+
}
61+
}
62+
63+
func (ma *MetricsAggregator) Run(ctx context.Context) func() {
64+
ctx, cancelFunc := context.WithCancel(ctx)
65+
done := make(chan struct{})
66+
67+
go func() {
68+
defer close(done)
69+
70+
for {
71+
select {
72+
case req := <-ma.updateCh:
73+
UpdateLoop:
74+
for _, m := range req.metrics {
75+
for i, q := range ma.queue {
76+
if q.username == req.username && q.workspaceName == req.workspaceName && q.agentName == req.agentName && q.Name == m.Name {
77+
ma.queue[i].AgentMetric.Value = m.Value
78+
continue UpdateLoop
79+
}
80+
}
81+
82+
ma.queue = append(ma.queue, annotatedMetric{
83+
username: req.username,
84+
workspaceName: req.workspaceName,
85+
agentName: req.agentName,
86+
87+
AgentMetric: m,
88+
})
89+
}
90+
case inputCh := <-ma.collectCh:
91+
for _, m := range ma.queue {
92+
desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil)
93+
valueType, err := asPrometheusValueType(m.Type)
94+
if err != nil {
95+
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("value_type", m.Type), slog.Error(err))
96+
continue
97+
}
98+
constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName)
99+
inputCh <- constMetric
100+
}
101+
close(inputCh)
102+
case <-ctx.Done():
103+
ma.log.Debug(ctx, "metrics aggregator: is stopped")
104+
return
105+
}
106+
}
107+
}()
108+
return func() {
109+
cancelFunc()
110+
<-done
111+
}
112+
}
113+
39114
// Describe function does not have any knowledge about the metrics schema,
40115
// so it does not emit anything.
41116
func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) {
@@ -44,42 +119,32 @@ func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) {
44119
var agentMetricsLabels = []string{usernameLabel, workspaceNameLabel, agentNameLabel}
45120

46121
func (ma *MetricsAggregator) Collect(ch chan<- prometheus.Metric) {
47-
ma.m.Lock()
48-
defer ma.m.Unlock()
49-
50-
for _, m := range ma.queue {
51-
desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil)
52-
valueType, err := asPrometheusValueType(m.Type)
53-
if err != nil {
54-
ma.log.Error(context.Background(), "can't convert Prometheus value type", slog.F("value_type", m.Type), slog.Error(err))
55-
}
56-
constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName)
57-
ch <- constMetric
58-
}
59-
}
60-
61-
// TODO Run function with done channel
122+
collect := make(chan prometheus.Metric, 128)
62123

63-
func (ma *MetricsAggregator) Update(_ context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) {
64-
ma.m.Lock()
65-
defer ma.m.Unlock()
66-
67-
UpdateLoop:
68-
for _, m := range metrics {
69-
for i, q := range ma.queue {
70-
if q.username == username && q.workspaceName == workspaceName && q.agentName == agentName && q.Name == m.Name {
71-
ma.queue[i].AgentMetric.Value = m.Value
72-
continue UpdateLoop
73-
}
74-
}
124+
select {
125+
case ma.collectCh <- collect:
126+
default:
127+
ma.log.Error(context.Background(), "metrics aggregator: collect queue is full")
128+
return
129+
}
75130

76-
ma.queue = append(ma.queue, annotatedMetric{
77-
username: username,
78-
workspaceName: workspaceName,
79-
agentName: agentName,
131+
for m := range collect {
132+
ch <- m
133+
}
134+
}
80135

81-
AgentMetric: m,
82-
})
136+
func (ma *MetricsAggregator) Update(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) {
137+
select {
138+
case ma.updateCh <- updateRequest{
139+
username: username,
140+
workspaceName: workspaceName,
141+
agentName: agentName,
142+
metrics: metrics,
143+
}:
144+
case <-ctx.Done():
145+
ma.log.Debug(ctx, "metrics aggregator: update is canceled")
146+
default:
147+
ma.log.Error(ctx, "metrics aggregator: update queue is full")
83148
}
84149
}
85150

0 commit comments

Comments
 (0)