Skip to content

feat: Implement aggregator for agent metrics #7259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6516216
API contract
mtojek Apr 24, 2023
dc202c4
Send agent metrics
mtojek Apr 24, 2023
7747f2d
Ignore metrics to save bandwidth
mtojek Apr 24, 2023
9fd4ddb
fix lint
mtojek Apr 24, 2023
9af0246
logEntry
mtojek Apr 24, 2023
4207dff
make gen
mtojek Apr 24, 2023
99fe1bf
Use errGroup
mtojek Apr 24, 2023
df80e9b
Use MustNewConstMetric
mtojek Apr 25, 2023
d86496e
PoC works
mtojek Apr 25, 2023
10e6d8d
Metrics aggregator with channels
mtojek Apr 25, 2023
8df9eea
Metrics expiry
mtojek Apr 25, 2023
1f5273b
histograms
mtojek Apr 25, 2023
1b8c486
unit test
mtojek Apr 26, 2023
423420b
fmt
mtojek Apr 26, 2023
23bbe94
test: metrics can expire
mtojek Apr 26, 2023
b7011ae
Aggregator
mtojek Apr 26, 2023
29a8702
Address PR comments
mtojek Apr 26, 2023
7acd113
wrap errors
mtojek Apr 26, 2023
b15c7b7
fix
mtojek Apr 26, 2023
2ae7e4e
Update coderd/prometheusmetrics/aggregator.go
mtojek Apr 27, 2023
b04d232
refactor: PTY & SSH (#7100)
spikecurtis Apr 24, 2023
1d93f66
feat(community-templates): Added vscode-server-template (#7219)
nanospearing Apr 24, 2023
c604633
chore: Proxy health status checks + endpoint (#7233)
Emyrk Apr 24, 2023
7d84745
Revert "feat(UI): add workspace restart button (#7137)" (#7268)
Kira-Pilot Apr 24, 2023
407c332
refactor(site): Group app and agent actions together (#7267)
BrunoQuaresma Apr 24, 2023
49b81df
fix(coderd): ensure that user API keys are deleted when a user is (#7…
johnstcn Apr 24, 2023
44217de
chore(dogfood): remove unnecessary docker host replace (#7269)
coadler Apr 25, 2023
e659c36
Fix macOS pty race with dropped output (#7278)
spikecurtis Apr 25, 2023
6dc8b1f
feat: add regions endpoint for proxies feature (#7277)
deansheather Apr 25, 2023
d2233be
fix(healthcheck): don't allow panics to exit coderd (#7276)
coadler Apr 25, 2023
f3f5bed
chore: add security advisories to docs (#7282)
johnstcn Apr 25, 2023
50f60cb
fix(site): Do not show template params if there is no param to be dis…
BrunoQuaresma Apr 25, 2023
1bf1b06
fix(site): Fix default value for options (#7265)
BrunoQuaresma Apr 25, 2023
5f6b4dc
chore: fix flake in apptest reconnecting-pty test (#7281)
deansheather Apr 26, 2023
9141f7c
Reconnecting PTY waits for command output or EOF (#7279)
spikecurtis Apr 26, 2023
e0879b5
docs(site): Mention template editor in template edit docs (#7261)
BrunoQuaresma Apr 26, 2023
b6322d1
fix(site): Fix secondary buttons with popovers (#7296)
BrunoQuaresma Apr 26, 2023
1e3eb06
chore: change some wording in the dashboard (#7293)
bpmct Apr 26, 2023
366859b
feat(agent): add http debug routes for magicsock (#7287)
coadler Apr 26, 2023
ed8106d
feat: add license expiration warning (#7264)
rodrimaia Apr 26, 2023
5733abc
feat: add license settings UI (#7210)
rodrimaia Apr 26, 2023
4937e75
chore: add envbox documentation (#7198)
sreya Apr 26, 2023
619e470
docs: Fix relay link in HA doc (#7159)
winter0mute Apr 27, 2023
16b5353
Merge branch 'main' into 6724-api-collect-metrics
mtojek Apr 27, 2023
c1bd4d2
Refactor Collect channel
mtojek Apr 27, 2023
8baed98
fix
mtojek Apr 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Metrics expiry
  • Loading branch information
mtojek committed Apr 25, 2023
commit 8df9eeac51cff84e37dd7691cfea2a2ae2cf9808
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
}
defer closeAgentStatsFunc()

metricsAggregator := prometheusmetrics.NewMetricsAggregator(logger)
metricsAggregator := prometheusmetrics.NewMetricsAggregator(logger, 0)
cancelMetricsAggregator := metricsAggregator.Run(ctx)
defer cancelMetricsAggregator()

Expand Down
61 changes: 56 additions & 5 deletions coderd/prometheusmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package prometheusmetrics

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"
Expand All @@ -22,12 +23,15 @@ const (
const (
sizeCollectCh = 10
sizeUpdateCh = 1024

defaultMetricsCleanupInterval = 2 * time.Minute
)

type MetricsAggregator struct {
queue []annotatedMetric

log slog.Logger
log slog.Logger
metricsCleanupInterval time.Duration

collectCh chan (chan<- prometheus.Metric)
updateCh chan updateRequest
Expand All @@ -39,6 +43,8 @@ type updateRequest struct {
agentName string

metrics []agentsdk.AgentMetric

timestamp time.Time
}

type annotatedMetric struct {
Expand All @@ -47,13 +53,20 @@ type annotatedMetric struct {
username string
workspaceName string
agentName string

expiryDate time.Time
}

var _ prometheus.Collector = new(MetricsAggregator)

func NewMetricsAggregator(logger slog.Logger) *MetricsAggregator {
func NewMetricsAggregator(logger slog.Logger, duration time.Duration) *MetricsAggregator {
metricsCleanupInterval := defaultMetricsCleanupInterval
if duration > 0 {
metricsCleanupInterval = duration
}
return &MetricsAggregator{
log: logger,
log: logger,
metricsCleanupInterval: metricsCleanupInterval,

collectCh: make(chan (chan<- prometheus.Metric), sizeCollectCh),
updateCh: make(chan updateRequest, sizeUpdateCh),
Expand All @@ -64,17 +77,22 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
ctx, cancelFunc := context.WithCancel(ctx)
done := make(chan struct{})

cleanupTicker := time.NewTicker(ma.metricsCleanupInterval)
go func() {
defer close(done)
defer cleanupTicker.Stop()

for {
select {
case req := <-ma.updateCh:
ma.log.Debug(ctx, "metrics aggregator: update metrics")

UpdateLoop:
for _, m := range req.metrics {
for i, q := range ma.queue {
if q.username == req.username && q.workspaceName == req.workspaceName && q.agentName == req.agentName && q.Name == m.Name {
ma.queue[i].AgentMetric.Value = m.Value
ma.queue[i].expiryDate = req.timestamp.Add(ma.metricsCleanupInterval)
continue UpdateLoop
}
}
Expand All @@ -85,20 +103,51 @@ func (ma *MetricsAggregator) Run(ctx context.Context) func() {
agentName: req.agentName,

AgentMetric: m,

expiryDate: req.timestamp.Add(ma.metricsCleanupInterval),
})
}
case inputCh := <-ma.collectCh:
ma.log.Debug(ctx, "metrics aggregator: collect metrics")

for _, m := range ma.queue {
desc := prometheus.NewDesc(m.Name, metricHelpForAgent, agentMetricsLabels, nil)
valueType, err := asPrometheusValueType(m.Type)
if err != nil {
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("value_type", m.Type), slog.Error(err))
ma.log.Error(ctx, "can't convert Prometheus value type", slog.F("name", m.Name), slog.F("type", m.Type), slog.F("value", m.Value), slog.Error(err))
continue
}
constMetric := prometheus.MustNewConstMetric(desc, valueType, m.Value, m.username, m.workspaceName, m.agentName)
inputCh <- constMetric
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this will be used, but I worry about the use of channels. Essentially the way this behaves now is that it's enough for one misbehaving consumer to lock up this entire goroutine since it's dependent on the collected metrics being consumed in a timely manner.

So essentially we can insert 128 entries here immediately, but if more is queued up then it's on the one who called Collect(ch) to ensure things progress smoothly. This could prevent update and cleanup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So essentially we can insert 128 entries here immediately, but if more is queued up then it's on the one who called Collect(ch) to ensure things progress smoothly. This could prevent update and cleanup.

128 entries correspond to 128 agents submitting their metrics at the same time. I'm aware of the issue, but considering our scale, I don't think that we should encounter this problem soon. Speaking of a potential solution, do you think that we should introduce a "middle channel" to timeout the Collect(ch) if metrics are not streamed? I'm afraid that it may complicate the existing logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see. I think another option, instead of an intermediate channel would be to send a slice over this channel instead of 128 items over the channel. Then Collect can process the slice instead of consuming the channel and the goroutine can immediately continue its work. This way the logic doesn't need to change much at all. This ofc leads to increased memory usage when we exceed 128, but then again, if we do exceed it now it might not be ideal either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another option, instead of an intermediate channel would be to send a slice over this channel instead of 128 items over the channel. Then Collect can process the slice instead of consuming the channel and the goroutine can immediately continue its work.

In this case Collect will need to know when the slice is ready to be "ranged over", otherwise it would be a data race.

I slightly modified your idea. I kept the channel, but I'm passing the entire slice at once, so 128 items limit does not exist anymore. Please let me know your thoughts, @mafredri.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, and it actually looks like how I intended it 😄

}
close(inputCh)
case <-cleanupTicker.C:
ma.log.Debug(ctx, "metrics aggregator: clean expired metrics")

now := time.Now()

var hasExpiredMetrics bool
for _, m := range ma.queue {
if m.expiryDate.After(now) {
hasExpiredMetrics = true
break
}
}

if !hasExpiredMetrics {
continue
}

var j int
fresh := make([]annotatedMetric, len(ma.queue))
for _, m := range ma.queue {
if m.expiryDate.After(now) {
fresh[j] = m
j++
}
}
fresh = fresh[:j]
ma.queue = fresh
case <-ctx.Done():
ma.log.Debug(ctx, "metrics aggregator: is stopped")
return
Expand Down Expand Up @@ -140,9 +189,11 @@ func (ma *MetricsAggregator) Update(ctx context.Context, username, workspaceName
workspaceName: workspaceName,
agentName: agentName,
metrics: metrics,

timestamp: time.Now(),
}:
case <-ctx.Done():
ma.log.Debug(ctx, "metrics aggregator: update is canceled")
ma.log.Debug(ctx, "metrics aggregator: update request is canceled")
default:
ma.log.Error(ctx, "metrics aggregator: update queue is full")
}
Expand Down