@@ -2,7 +2,6 @@ package prometheusmetrics
2
2
3
3
import (
4
4
"context"
5
- "sync"
6
5
7
6
"github.com/prometheus/client_golang/prometheus"
8
7
"golang.org/x/xerrors"
@@ -20,10 +19,26 @@ const (
20
19
metricHelpForAgent = "Metric is forwarded from workspace agent connected to this instance of coderd."
21
20
)
22
21
22
+ const (
23
+ sizeCollectCh = 10
24
+ sizeUpdateCh = 1024
25
+ )
26
+
23
27
type MetricsAggregator struct {
24
- m sync.Mutex
25
- log slog.Logger
26
28
queue []annotatedMetric
29
+
30
+ log slog.Logger
31
+
32
+ collectCh chan (chan <- prometheus.Metric )
33
+ updateCh chan updateRequest
34
+ }
35
+
36
+ type updateRequest struct {
37
+ username string
38
+ workspaceName string
39
+ agentName string
40
+
41
+ metrics []agentsdk.AgentMetric
27
42
}
28
43
29
44
type annotatedMetric struct {
@@ -36,6 +51,66 @@ type annotatedMetric struct {
36
51
37
52
var _ prometheus.Collector = new (MetricsAggregator )
38
53
54
+ func NewMetricsAggregator (logger slog.Logger ) * MetricsAggregator {
55
+ return & MetricsAggregator {
56
+ log : logger ,
57
+
58
+ collectCh : make (chan (chan <- prometheus.Metric ), sizeCollectCh ),
59
+ updateCh : make (chan updateRequest , sizeUpdateCh ),
60
+ }
61
+ }
62
+
63
+ func (ma * MetricsAggregator ) Run (ctx context.Context ) func () {
64
+ ctx , cancelFunc := context .WithCancel (ctx )
65
+ done := make (chan struct {})
66
+
67
+ go func () {
68
+ defer close (done )
69
+
70
+ for {
71
+ select {
72
+ case req := <- ma .updateCh :
73
+ UpdateLoop:
74
+ for _ , m := range req .metrics {
75
+ for i , q := range ma .queue {
76
+ if q .username == req .username && q .workspaceName == req .workspaceName && q .agentName == req .agentName && q .Name == m .Name {
77
+ ma .queue [i ].AgentMetric .Value = m .Value
78
+ continue UpdateLoop
79
+ }
80
+ }
81
+
82
+ ma .queue = append (ma .queue , annotatedMetric {
83
+ username : req .username ,
84
+ workspaceName : req .workspaceName ,
85
+ agentName : req .agentName ,
86
+
87
+ AgentMetric : m ,
88
+ })
89
+ }
90
+ case inputCh := <- ma .collectCh :
91
+ for _ , m := range ma .queue {
92
+ desc := prometheus .NewDesc (m .Name , metricHelpForAgent , agentMetricsLabels , nil )
93
+ valueType , err := asPrometheusValueType (m .Type )
94
+ if err != nil {
95
+ ma .log .Error (ctx , "can't convert Prometheus value type" , slog .F ("value_type" , m .Type ), slog .Error (err ))
96
+ continue
97
+ }
98
+ constMetric := prometheus .MustNewConstMetric (desc , valueType , m .Value , m .username , m .workspaceName , m .agentName )
99
+ inputCh <- constMetric
100
+ }
101
+ close (inputCh )
102
+ case <- ctx .Done ():
103
+ ma .log .Debug (ctx , "metrics aggregator: is stopped" )
104
+ return
105
+ }
106
+ }
107
+ }()
108
+ return func () {
109
+ cancelFunc ()
110
+ <- done
111
+ }
112
+ }
113
+
39
114
// Describe function does not have any knowledge about the metrics schema,
40
115
// so it does not emit anything.
41
116
func (* MetricsAggregator ) Describe (_ chan <- * prometheus.Desc ) {
@@ -44,42 +119,32 @@ func (*MetricsAggregator) Describe(_ chan<- *prometheus.Desc) {
44
119
var agentMetricsLabels = []string {usernameLabel , workspaceNameLabel , agentNameLabel }
45
120
46
121
func (ma * MetricsAggregator ) Collect (ch chan <- prometheus.Metric ) {
47
- ma .m .Lock ()
48
- defer ma .m .Unlock ()
49
-
50
- for _ , m := range ma .queue {
51
- desc := prometheus .NewDesc (m .Name , metricHelpForAgent , agentMetricsLabels , nil )
52
- valueType , err := asPrometheusValueType (m .Type )
53
- if err != nil {
54
- ma .log .Error (context .Background (), "can't convert Prometheus value type" , slog .F ("value_type" , m .Type ), slog .Error (err ))
55
- }
56
- constMetric := prometheus .MustNewConstMetric (desc , valueType , m .Value , m .username , m .workspaceName , m .agentName )
57
- ch <- constMetric
58
- }
59
- }
60
-
61
- // TODO Run function with done channel
122
+ collect := make (chan prometheus.Metric , 128 )
62
123
63
- func (ma * MetricsAggregator ) Update (_ context.Context , username , workspaceName , agentName string , metrics []agentsdk.AgentMetric ) {
64
- ma .m .Lock ()
65
- defer ma .m .Unlock ()
66
-
67
- UpdateLoop:
68
- for _ , m := range metrics {
69
- for i , q := range ma .queue {
70
- if q .username == username && q .workspaceName == workspaceName && q .agentName == agentName && q .Name == m .Name {
71
- ma .queue [i ].AgentMetric .Value = m .Value
72
- continue UpdateLoop
73
- }
74
- }
124
+ select {
125
+ case ma .collectCh <- collect :
126
+ default :
127
+ ma .log .Error (context .Background (), "metrics aggregator: collect queue is full" )
128
+ return
129
+ }
75
130
76
- ma . queue = append ( ma . queue , annotatedMetric {
77
- username : username ,
78
- workspaceName : workspaceName ,
79
- agentName : agentName ,
131
+ for m := range collect {
132
+ ch <- m
133
+ }
134
+ }
80
135
81
- AgentMetric : m ,
82
- })
136
+ func (ma * MetricsAggregator ) Update (ctx context.Context , username , workspaceName , agentName string , metrics []agentsdk.AgentMetric ) {
137
+ select {
138
+ case ma .updateCh <- updateRequest {
139
+ username : username ,
140
+ workspaceName : workspaceName ,
141
+ agentName : agentName ,
142
+ metrics : metrics ,
143
+ }:
144
+ case <- ctx .Done ():
145
+ ma .log .Debug (ctx , "metrics aggregator: update is canceled" )
146
+ default :
147
+ ma .log .Error (ctx , "metrics aggregator: update queue is full" )
83
148
}
84
149
}
85
150
0 commit comments