-
Notifications
You must be signed in to change notification settings - Fork 899
feat: Implement aggregator for agent metrics #7259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
6516216
dc202c4
7747f2d
9fd4ddb
9af0246
4207dff
99fe1bf
df80e9b
d86496e
10e6d8d
8df9eea
1f5273b
1b8c486
423420b
23bbe94
b7011ae
29a8702
7acd113
b15c7b7
2ae7e4e
b04d232
1d93f66
c604633
7d84745
407c332
49b81df
44217de
e659c36
6dc8b1f
d2233be
f3f5bed
50f60cb
1bf1b06
5f6b4dc
9141f7c
e0879b5
b6322d1
1e3eb06
366859b
ed8106d
5733abc
4937e75
619e470
16b5353
c1bd4d2
8baed98
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,7 +2,6 @@ package prometheusmetrics | |||||
|
||||||
import ( | ||||||
"context" | ||||||
"sync" | ||||||
|
||||||
"github.com/prometheus/client_golang/prometheus" | ||||||
"golang.org/x/xerrors" | ||||||
|
@@ -20,10 +19,26 @@ const ( | |||||
metricHelpForAgent = "Metric is forwarded from workspace agent connected to this instance of coderd." | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Suggestion: I think it's preferable to use the plural form here. Side-note: In a HA setup with multiple coderd's, would metrics only be for those few agents connected to that specific coderd instance? Or is it global? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fixed
Only connected ones as metrics are not fetched from a database but just cached in memory. I think that it applies to most of Coder metrics, right? |
||||||
) | ||||||
|
||||||
const ( | ||||||
sizeCollectCh = 10 | ||||||
sizeUpdateCh = 1024 | ||||||
) | ||||||
|
||||||
type MetricsAggregator struct { | ||||||
m sync.Mutex | ||||||
log slog.Logger | ||||||
queue []annotatedMetric | ||||||
|
||||||
log slog.Logger | ||||||
|
||||||
collectCh chan (chan<- prometheus.Metric) | ||||||
updateCh chan updateRequest | ||||||
} | ||||||
|
||||||
type updateRequest struct { | ||||||
username string | ||||||
workspaceName string | ||||||
agentName string | ||||||
|
||||||
metrics []agentsdk.AgentMetric | ||||||
} | ||||||
|
||||||
type annotatedMetric struct { | ||||||
|
@@ -36,6 +51,66 @@ type annotatedMetric struct { | |||||
|
||||||
var _ prometheus.Collector = new(MetricsAggregator) | ||||||
|
||||||
func NewMetricsAggregator(logger slog.Logger) *MetricsAggregator { | ||||||
return &MetricsAggregator{ | ||||||
log: logger, | ||||||
|
||||||
collectCh: make(chan (chan<- prometheus.Metric), sizeCollectCh), | ||||||
updateCh: make(chan updateRequest, sizeUpdateCh), | ||||||
} | ||||||
} | ||||||
|
||||||
func (ma *MetricsAggregator) Run(ctx context.Context) func() { | ||||||
ctx, cancelFunc := context.WithCancel(ctx) | ||||||
done := make(chan struct{}) | ||||||
|
||||||
go func() { | ||||||
defer close(done) | ||||||
|
||||||
for { | ||||||
select { | ||||||
case req := <-ma.updateCh: | ||||||
UpdateLoop: | ||||||
for _, m := range req.metrics { | ||||||
for i, q := range ma.queue { | ||||||
if q.username == req.username && q.workspaceName == req.workspaceName && q.agentName == req.agentName && q.Name == m.Name { | ||||||
ma.queue[i].AgentMetric.Value = m.Value | ||||||
continue UpdateLoop | ||||||
} | ||||||
} | ||||||
|
||||||
ma.queue = append(ma.queue, annotatedMetric{ | ||||||
username: req.username, | ||||||
workspaceName: req.workspaceName, | ||||||
agentName: req.agentName, | ||||||
|
||||||
AgentMetric: m, | ||||||
}) | ||||||
} | ||||||
case inputCh := <-ma.collectCh: | ||||||
for _, m := range ma.queue { | ||||||
desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil) | ||||||
valueType, err := asPrometheusValueType(m.Type) | ||||||
if err != nil { | ||||||
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("value_type", m.Type), slog.Error(err)) | ||||||
continue | ||||||
} | ||||||
constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName) | ||||||
inputCh <- constMetric | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how this will be used, but I worry about the use of channels. Essentially the way this behaves now is that it's enough for one misbehaving consumer to lock up this entire goroutine since it's dependent on the collected metrics being consumed in a timely manner. So essentially we can insert 128 entries here immediately, but if more is queued up then it's on the one who called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
128 entries correspond to 128 agents submitting their metrics at the same time. I'm aware of the issue, but considering our scale, I don't think that we should encounter this problem soon. Speaking of a potential solution, do you think that we should introduce a "middle channel" to timeout the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I see. I think another option, instead of an intermediate channel would be to send a slice over this channel instead of 128 items over the channel. Then Collect can process the slice instead of consuming the channel and the goroutine can immediately continue its work. This way the logic doesn't need to change much at all. This ofc leads to increased memory usage when we exceed 128, but then again, if we do exceed it now it might not be ideal either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In this case I slightly modified your idea. I kept the channel, but I'm passing the entire slice at once, so 128 items limit does not exist anymore. Please let me know your thoughts, @mafredri. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, and it actually looks like how I intended it 😄 |
||||||
} | ||||||
close(inputCh) | ||||||
case <-ctx.Done(): | ||||||
ma.log.Debug(ctx, "metrics aggregator: is stopped") | ||||||
return | ||||||
} | ||||||
} | ||||||
}() | ||||||
return func() { | ||||||
cancelFunc() | ||||||
<-done | ||||||
} | ||||||
} | ||||||
|
||||||
// Describe function does not have any knowledge about the metrics schema, | ||||||
// so it does not emit anything. | ||||||
func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) { | ||||||
|
@@ -44,42 +119,32 @@ func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) { | |||||
var agentMetricsLabels = []string{usernameLabel, workspaceNameLabel, agentNameLabel} | ||||||
|
||||||
func (ma *MetricsAggregator) Collect(ch chan<- prometheus.Metric) { | ||||||
ma.m.Lock() | ||||||
defer ma.m.Unlock() | ||||||
|
||||||
for _, m := range ma.queue { | ||||||
desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil) | ||||||
valueType, err := asPrometheusValueType(m.Type) | ||||||
if err != nil { | ||||||
ma.log.Error(context.Background(), "can't convert Prometheus value type", slog.F("value_type", m.Type), slog.Error(err)) | ||||||
} | ||||||
constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName) | ||||||
ch <- constMetric | ||||||
} | ||||||
} | ||||||
|
||||||
// TODO Run function with done channel | ||||||
collect := make(chan prometheus.Metric, 128) | ||||||
|
||||||
func (ma *MetricsAggregator) Update(_ context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) { | ||||||
ma.m.Lock() | ||||||
defer ma.m.Unlock() | ||||||
|
||||||
UpdateLoop: | ||||||
for _, m := range metrics { | ||||||
for i, q := range ma.queue { | ||||||
if q.username == username && q.workspaceName == workspaceName && q.agentName == agentName && q.Name == m.Name { | ||||||
ma.queue[i].AgentMetric.Value = m.Value | ||||||
continue UpdateLoop | ||||||
} | ||||||
} | ||||||
select { | ||||||
case ma.collectCh <- collect: | ||||||
default: | ||||||
ma.log.Error(context.Background(), "metrics aggregator: collect queue is full") | ||||||
return | ||||||
} | ||||||
|
||||||
ma.queue = append(ma.queue, annotatedMetric{ | ||||||
username: username, | ||||||
workspaceName: workspaceName, | ||||||
agentName: agentName, | ||||||
for m := range collect { | ||||||
ch <- m | ||||||
} | ||||||
} | ||||||
|
||||||
AgentMetric: m, | ||||||
}) | ||||||
func (ma *MetricsAggregator) Update(ctx context.Context, username, workspaceName, agentName string, metrics []agentsdk.AgentMetric) { | ||||||
select { | ||||||
case ma.updateCh <- updateRequest{ | ||||||
username: username, | ||||||
workspaceName: workspaceName, | ||||||
agentName: agentName, | ||||||
metrics: metrics, | ||||||
}: | ||||||
case <-ctx.Done(): | ||||||
ma.log.Debug(ctx, "metrics aggregator: update is canceled") | ||||||
default: | ||||||
ma.log.Error(ctx, "metrics aggregator: update queue is full") | ||||||
} | ||||||
} | ||||||
|
||||||
|
Uh oh!
There was an error while loading. Please reload this page.