@@ -24,6 +24,7 @@ import (
24
24
25
25
"github.com/armon/circbuf"
26
26
"github.com/google/uuid"
27
+ "github.com/prometheus/client_golang/prometheus"
27
28
"github.com/spf13/afero"
28
29
"go.uber.org/atomic"
29
30
"golang.org/x/exp/slices"
@@ -62,6 +63,8 @@ type Options struct {
62
63
IgnorePorts map [int ]string
63
64
SSHMaxTimeout time.Duration
64
65
TailnetListenPort uint16
66
+
67
+ PrometheusRegistry * prometheus.Registry
65
68
}
66
69
67
70
type Client interface {
@@ -101,6 +104,10 @@ func New(options Options) Agent {
101
104
return "" , nil
102
105
}
103
106
}
107
+ if options .PrometheusRegistry == nil {
108
+ options .PrometheusRegistry = prometheus .NewRegistry ()
109
+ }
110
+
104
111
ctx , cancelFunc := context .WithCancel (context .Background ())
105
112
a := & agent {
106
113
tailnetListenPort : options .TailnetListenPort ,
@@ -119,6 +126,8 @@ func New(options Options) Agent {
119
126
ignorePorts : options .IgnorePorts ,
120
127
connStatsChan : make (chan * agentsdk.Stats , 1 ),
121
128
sshMaxTimeout : options .SSHMaxTimeout ,
129
+ prometheusRegistry : options .PrometheusRegistry ,
130
+ metrics : newAgentMetrics (options .PrometheusRegistry ),
122
131
}
123
132
a .init (ctx )
124
133
return a
@@ -162,10 +171,13 @@ type agent struct {
162
171
latestStat atomic.Pointer [agentsdk.Stats ]
163
172
164
173
connCountReconnectingPTY atomic.Int64
174
+
175
+ prometheusRegistry * prometheus.Registry
176
+ metrics * agentMetrics
165
177
}
166
178
167
179
func (a * agent ) init (ctx context.Context ) {
168
- sshSrv , err := agentssh .NewServer (ctx , a .logger .Named ("ssh-server" ), a .filesystem , a .sshMaxTimeout , "" )
180
+ sshSrv , err := agentssh .NewServer (ctx , a .logger .Named ("ssh-server" ), a .prometheusRegistry , a . filesystem , a .sshMaxTimeout , "" )
169
181
if err != nil {
170
182
panic (err )
171
183
}
@@ -979,7 +991,7 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan str
979
991
980
992
func (a * agent ) handleReconnectingPTY (ctx context.Context , logger slog.Logger , msg codersdk.WorkspaceAgentReconnectingPTYInit , conn net.Conn ) (retErr error ) {
981
993
defer conn .Close ()
982
- metricReconnectingPTYHandler .Add (1 )
994
+ a . metrics . handler .Add (1 )
983
995
984
996
a .connCountReconnectingPTY .Add (1 )
985
997
defer a .connCountReconnectingPTY .Add (- 1 )
@@ -1000,7 +1012,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1000
1012
logger .Debug (ctx , "session error after agent close" , slog .Error (err ))
1001
1013
} else {
1002
1014
logger .Error (ctx , "session error" , slog .Error (err ))
1003
- metricReconnectingPTYError .Add (1 )
1015
+ a . metrics . handlerError .Add (1 )
1004
1016
}
1005
1017
}
1006
1018
logger .Debug (ctx , "session closed" )
@@ -1020,7 +1032,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1020
1032
// Empty command will default to the users shell!
1021
1033
cmd , err := a .sshServer .CreateCommand (ctx , msg .Command , nil )
1022
1034
if err != nil {
1023
- metricReconnectingPTYCreateCommandError .Add (1 )
1035
+ a . metrics . createCommandError .Add (1 )
1024
1036
return xerrors .Errorf ("create command: %w" , err )
1025
1037
}
1026
1038
cmd .Env = append (cmd .Env , "TERM=xterm-256color" )
@@ -1033,7 +1045,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1033
1045
1034
1046
ptty , process , err := pty .Start (cmd )
1035
1047
if err != nil {
1036
- metricReconnectingPTYCmdStartError .Add (1 )
1048
+ a . metrics . cmdStartError .Add (1 )
1037
1049
return xerrors .Errorf ("start command: %w" , err )
1038
1050
}
1039
1051
@@ -1064,7 +1076,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1064
1076
logger .Debug (ctx , "unable to read pty output, command exited?" , slog .Error (err ))
1065
1077
} else {
1066
1078
logger .Warn (ctx , "unable to read pty output, command exited?" , slog .Error (err ))
1067
- metricReconnectingPTYOutputReaderError .Add (1 )
1079
+ a . metrics . outputReaderError .Add (1 )
1068
1080
}
1069
1081
break
1070
1082
}
@@ -1085,7 +1097,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1085
1097
slog .F ("other_conn_id" , cid ),
1086
1098
slog .Error (err ),
1087
1099
)
1088
- metricReconnectingPTYWriteError .Add (1 )
1100
+ a . metrics . writeError .Add (1 )
1089
1101
}
1090
1102
}
1091
1103
rpty .activeConnsMutex .Unlock ()
@@ -1105,7 +1117,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1105
1117
if err != nil {
1106
1118
// We can continue after this, it's not fatal!
1107
1119
logger .Error (ctx , "resize" , slog .Error (err ))
1108
- metricReconnectingPTYResizeError .Add (1 )
1120
+ a . metrics . resizeError .Add (1 )
1109
1121
}
1110
1122
// Write any previously stored data for the TTY.
1111
1123
rpty .circularBufferMutex .RLock ()
@@ -1118,7 +1130,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1118
1130
// while also holding circularBufferMutex seems dangerous.
1119
1131
_ , err = conn .Write (prevBuf )
1120
1132
if err != nil {
1121
- metricReconnectingPTYWriteError .Add (1 )
1133
+ a . metrics . writeError .Add (1 )
1122
1134
return xerrors .Errorf ("write buffer to conn: %w" , err )
1123
1135
}
1124
1136
// Multiple connections to the same TTY are permitted.
@@ -1169,7 +1181,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1169
1181
_ , err = rpty .ptty .InputWriter ().Write ([]byte (req .Data ))
1170
1182
if err != nil {
1171
1183
logger .Warn (ctx , "write to pty" , slog .Error (err ))
1172
- metricReconnectingPTYInputWriterError .Add (1 )
1184
+ a . metrics . inputWriterError .Add (1 )
1173
1185
return nil
1174
1186
}
1175
1187
// Check if a resize needs to happen!
@@ -1180,7 +1192,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1180
1192
if err != nil {
1181
1193
// We can continue after this, it's not fatal!
1182
1194
logger .Error (ctx , "resize" , slog .Error (err ))
1183
- metricReconnectingPTYResizeError .Add (1 )
1195
+ a . metrics . resizeError .Add (1 )
1184
1196
}
1185
1197
}
1186
1198
}
@@ -1213,7 +1225,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
1213
1225
var mu sync.Mutex
1214
1226
status := a .network .Status ()
1215
1227
durations := []float64 {}
1216
- ctx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1228
+ pingCtx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1217
1229
defer cancelFunc ()
1218
1230
for nodeID , peer := range status .Peer {
1219
1231
if ! peer .Active {
@@ -1229,7 +1241,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
1229
1241
wg .Add (1 )
1230
1242
go func () {
1231
1243
defer wg .Done ()
1232
- duration , _ , _ , err := a .network .Ping (ctx , addresses [0 ].Addr ())
1244
+ duration , _ , _ , err := a .network .Ping (pingCtx , addresses [0 ].Addr ())
1233
1245
if err != nil {
1234
1246
return
1235
1247
}
@@ -1254,7 +1266,10 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
1254
1266
// Collect agent metrics.
1255
1267
// Agent metrics are changing all the time, so there is no need to perform
1256
1268
// reflect.DeepEqual to see if stats should be transferred.
1257
- stats .Metrics = collectMetrics ()
1269
+
1270
+ metricsCtx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1271
+ defer cancelFunc ()
1272
+ stats .Metrics = a .collectMetrics (metricsCtx )
1258
1273
1259
1274
a .latestStat .Store (stats )
1260
1275
0 commit comments