Skip to content

Commit ba4bb4d

Browse files
committed
WIP
1 parent e38a7be commit ba4bb4d

File tree

5 files changed

+234
-49
lines changed

5 files changed

+234
-49
lines changed

agent/agent.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/armon/circbuf"
2626
"github.com/google/uuid"
27+
"github.com/prometheus/client_golang/prometheus"
2728
"github.com/spf13/afero"
2829
"go.uber.org/atomic"
2930
"golang.org/x/exp/slices"
@@ -62,6 +63,8 @@ type Options struct {
6263
IgnorePorts map[int]string
6364
SSHMaxTimeout time.Duration
6465
TailnetListenPort uint16
66+
67+
PrometheusRegistry *prometheus.Registry
6568
}
6669

6770
type Client interface {
@@ -101,6 +104,10 @@ func New(options Options) Agent {
101104
return "", nil
102105
}
103106
}
107+
if options.PrometheusRegistry == nil {
108+
options.PrometheusRegistry = prometheus.NewRegistry()
109+
}
110+
104111
ctx, cancelFunc := context.WithCancel(context.Background())
105112
a := &agent{
106113
tailnetListenPort: options.TailnetListenPort,
@@ -119,6 +126,8 @@ func New(options Options) Agent {
119126
ignorePorts: options.IgnorePorts,
120127
connStatsChan: make(chan *agentsdk.Stats, 1),
121128
sshMaxTimeout: options.SSHMaxTimeout,
129+
prometheusRegistry: options.PrometheusRegistry,
130+
metrics: newAgentMetrics(options.PrometheusRegistry),
122131
}
123132
a.init(ctx)
124133
return a
@@ -162,10 +171,13 @@ type agent struct {
162171
latestStat atomic.Pointer[agentsdk.Stats]
163172

164173
connCountReconnectingPTY atomic.Int64
174+
175+
prometheusRegistry *prometheus.Registry
176+
metrics *agentMetrics
165177
}
166178

167179
func (a *agent) init(ctx context.Context) {
168-
sshSrv, err := agentssh.NewServer(ctx, a.logger.Named("ssh-server"), a.filesystem, a.sshMaxTimeout, "")
180+
sshSrv, err := agentssh.NewServer(ctx, a.logger.Named("ssh-server"), a.prometheusRegistry, a.filesystem, a.sshMaxTimeout, "")
169181
if err != nil {
170182
panic(err)
171183
}
@@ -979,7 +991,7 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan str
979991

980992
func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {
981993
defer conn.Close()
982-
metricReconnectingPTYHandler.Add(1)
994+
a.metrics.handler.Add(1)
983995

984996
a.connCountReconnectingPTY.Add(1)
985997
defer a.connCountReconnectingPTY.Add(-1)
@@ -1000,7 +1012,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10001012
logger.Debug(ctx, "session error after agent close", slog.Error(err))
10011013
} else {
10021014
logger.Error(ctx, "session error", slog.Error(err))
1003-
metricReconnectingPTYError.Add(1)
1015+
a.metrics.handlerError.Add(1)
10041016
}
10051017
}
10061018
logger.Debug(ctx, "session closed")
@@ -1020,7 +1032,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10201032
// Empty command will default to the users shell!
10211033
cmd, err := a.sshServer.CreateCommand(ctx, msg.Command, nil)
10221034
if err != nil {
1023-
metricReconnectingPTYCreateCommandError.Add(1)
1035+
a.metrics.createCommandError.Add(1)
10241036
return xerrors.Errorf("create command: %w", err)
10251037
}
10261038
cmd.Env = append(cmd.Env, "TERM=xterm-256color")
@@ -1033,7 +1045,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10331045

10341046
ptty, process, err := pty.Start(cmd)
10351047
if err != nil {
1036-
metricReconnectingPTYCmdStartError.Add(1)
1048+
a.metrics.cmdStartError.Add(1)
10371049
return xerrors.Errorf("start command: %w", err)
10381050
}
10391051

@@ -1064,7 +1076,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10641076
logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err))
10651077
} else {
10661078
logger.Warn(ctx, "unable to read pty output, command exited?", slog.Error(err))
1067-
metricReconnectingPTYOutputReaderError.Add(1)
1079+
a.metrics.outputReaderError.Add(1)
10681080
}
10691081
break
10701082
}
@@ -1085,7 +1097,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
10851097
slog.F("other_conn_id", cid),
10861098
slog.Error(err),
10871099
)
1088-
metricReconnectingPTYWriteError.Add(1)
1100+
a.metrics.writeError.Add(1)
10891101
}
10901102
}
10911103
rpty.activeConnsMutex.Unlock()
@@ -1105,7 +1117,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11051117
if err != nil {
11061118
// We can continue after this, it's not fatal!
11071119
logger.Error(ctx, "resize", slog.Error(err))
1108-
metricReconnectingPTYResizeError.Add(1)
1120+
a.metrics.resizeError.Add(1)
11091121
}
11101122
// Write any previously stored data for the TTY.
11111123
rpty.circularBufferMutex.RLock()
@@ -1118,7 +1130,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11181130
// while also holding circularBufferMutex seems dangerous.
11191131
_, err = conn.Write(prevBuf)
11201132
if err != nil {
1121-
metricReconnectingPTYWriteError.Add(1)
1133+
a.metrics.writeError.Add(1)
11221134
return xerrors.Errorf("write buffer to conn: %w", err)
11231135
}
11241136
// Multiple connections to the same TTY are permitted.
@@ -1169,7 +1181,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11691181
_, err = rpty.ptty.InputWriter().Write([]byte(req.Data))
11701182
if err != nil {
11711183
logger.Warn(ctx, "write to pty", slog.Error(err))
1172-
metricReconnectingPTYInputWriterError.Add(1)
1184+
a.metrics.inputWriterError.Add(1)
11731185
return nil
11741186
}
11751187
// Check if a resize needs to happen!
@@ -1180,7 +1192,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
11801192
if err != nil {
11811193
// We can continue after this, it's not fatal!
11821194
logger.Error(ctx, "resize", slog.Error(err))
1183-
metricReconnectingPTYResizeError.Add(1)
1195+
a.metrics.resizeError.Add(1)
11841196
}
11851197
}
11861198
}
@@ -1213,7 +1225,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
12131225
var mu sync.Mutex
12141226
status := a.network.Status()
12151227
durations := []float64{}
1216-
ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
1228+
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
12171229
defer cancelFunc()
12181230
for nodeID, peer := range status.Peer {
12191231
if !peer.Active {
@@ -1229,7 +1241,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
12291241
wg.Add(1)
12301242
go func() {
12311243
defer wg.Done()
1232-
duration, _, _, err := a.network.Ping(ctx, addresses[0].Addr())
1244+
duration, _, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
12331245
if err != nil {
12341246
return
12351247
}
@@ -1254,7 +1266,10 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
12541266
// Collect agent metrics.
12551267
// Agent metrics are changing all the time, so there is no need to perform
12561268
// reflect.DeepEqual to see if stats should be transferred.
1257-
stats.Metrics = collectMetrics()
1269+
1270+
metricsCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
1271+
defer cancelFunc()
1272+
stats.Metrics = a.collectMetrics(metricsCtx)
12581273

12591274
a.latestStat.Store(stats)
12601275

agent/agentssh/agentssh.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/gliderlabs/ssh"
2222
"github.com/pkg/sftp"
23+
"github.com/prometheus/client_golang/prometheus"
2324
"github.com/spf13/afero"
2425
"go.uber.org/atomic"
2526
gossh "golang.org/x/crypto/ssh"
@@ -69,9 +70,12 @@ type Server struct {
6970
connCountVSCode atomic.Int64
7071
connCountJetBrains atomic.Int64
7172
connCountSSHSession atomic.Int64
73+
74+
prometheusRegistry *prometheus.Registry
75+
metrics *sshServerMetrics
7276
}
7377

74-
func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout time.Duration, x11SocketDir string) (*Server, error) {
78+
func NewServer(ctx context.Context, logger slog.Logger, prometheusRegistry *prometheus.Registry, fs afero.Fs, maxTimeout time.Duration, x11SocketDir string) (*Server, error) {
7579
// Clients' should ignore the host key when connecting.
7680
// The agent needs to authenticate with coderd to SSH,
7781
// so SSH authentication doesn't improve security.
@@ -90,13 +94,17 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout
9094
forwardHandler := &ssh.ForwardedTCPHandler{}
9195
unixForwardHandler := &forwardedUnixHandler{log: logger}
9296

97+
metrics := newSSHServerMetrics(prometheusRegistry)
9398
s := &Server{
9499
listeners: make(map[net.Listener]struct{}),
95100
fs: fs,
96101
conns: make(map[net.Conn]struct{}),
97102
sessions: make(map[ssh.Session]struct{}),
98103
logger: logger,
99104
x11SocketDir: x11SocketDir,
105+
106+
prometheusRegistry: prometheusRegistry,
107+
metrics: metrics,
100108
}
101109

102110
s.srv = &ssh.Server{
@@ -107,7 +115,7 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout
107115
},
108116
ConnectionFailedCallback: func(_ net.Conn, err error) {
109117
s.logger.Warn(ctx, "ssh connection failed", slog.Error(err))
110-
metricConnectionFailedCallback.Add(1)
118+
metrics.connectionFailedCallback.Add(1)
111119
},
112120
Handler: s.sessionHandler,
113121
HostSigners: []ssh.Signer{randomSigner},
@@ -116,19 +124,19 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout
116124
s.logger.Debug(ctx, "local port forward",
117125
slog.F("destination-host", destinationHost),
118126
slog.F("destination-port", destinationPort))
119-
metricLocalPortForwardingCallback.Add(1)
127+
metrics.localPortForwardingCallback.Add(1)
120128
return true
121129
},
122130
PtyCallback: func(ctx ssh.Context, pty ssh.Pty) bool {
123-
metricPtyCallback.Add(1)
131+
metrics.ptyCallback.Add(1)
124132
return true
125133
},
126134
ReversePortForwardingCallback: func(ctx ssh.Context, bindHost string, bindPort uint32) bool {
127135
// Allow reverse port forwarding all!
128136
s.logger.Debug(ctx, "local port forward",
129137
slog.F("bind-host", bindHost),
130138
slog.F("bind-port", bindPort))
131-
metricReversePortForwardingCallback.Add(1)
139+
metrics.reversePortForwardingCallback.Add(1)
132140
return true
133141
},
134142
RequestHandlers: map[string]ssh.RequestHandler{
@@ -405,7 +413,7 @@ func (s *Server) startPTYSession(session ptySession, m sessionMetricsObject, cmd
405413
}
406414

407415
func (s *Server) sftpHandler(session ssh.Session) {
408-
metricSftpHandler.Add(1)
416+
s.metrics.sftpHandler.Add(1)
409417

410418
ctx := session.Context()
411419

@@ -446,7 +454,7 @@ func (s *Server) sftpHandler(session ssh.Session) {
446454
return
447455
}
448456
s.logger.Warn(ctx, "sftp server closed with error", slog.Error(err))
449-
metricSftpServerError.Add(1)
457+
s.metrics.sftpServerError.Add(1)
450458
_ = session.Exit(1)
451459
}
452460

agent/agentssh/metrics.go

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,85 @@ package agentssh
33
import (
44
"fmt"
55

6+
"github.com/prometheus/client_golang/prometheus"
67
"tailscale.com/util/clientmetric"
78
)
89

9-
var (
10+
type sshServerMetrics struct {
1011
// SSH callbacks
11-
metricConnectionFailedCallback = clientmetric.NewCounter("ssh_connection_failed_callback")
12-
metricLocalPortForwardingCallback = clientmetric.NewCounter("ssh_local_port_forwarding_callback")
13-
metricPtyCallback = clientmetric.NewCounter("ssh_pty_callback")
14-
metricReversePortForwardingCallback = clientmetric.NewCounter("ssh_reverse_port_forwarding_callback")
15-
metricX11Callback = clientmetric.NewCounter("ssh_x11_callback")
12+
connectionFailedCallback prometheus.Counter
13+
localPortForwardingCallback prometheus.Counter
14+
ptyCallback prometheus.Counter
15+
reversePortForwardingCallback prometheus.Counter
16+
x11Callback prometheus.Counter
1617

1718
// SFTP
18-
metricSftpHandler = clientmetric.NewCounter("ssh_sftp_handler")
19-
metricSftpServerError = clientmetric.NewCounter("ssh_sftp_server_error")
19+
sftpHandler prometheus.Counter
20+
sftpServerError prometheus.Counter
2021

2122
// X11
22-
metricX11SocketDirError = clientmetric.NewCounter("ssh_x11_socket_dir_error")
23-
metricX11XauthorityError = clientmetric.NewCounter("ssh_x11_xauthority_error")
24-
)
23+
x11SocketDirError prometheus.Counter
24+
x11XauthorityError prometheus.Counter
25+
}
26+
27+
func newSSHServerMetrics(registerer prometheus.Registerer) *sshServerMetrics {
28+
connectionFailedCallback := prometheus.NewCounter(prometheus.CounterOpts{
29+
Namespace: "agent", Subsystem: "ssh_server", Name: "connection_failed_callback",
30+
})
31+
registerer.MustRegister(connectionFailedCallback)
32+
33+
localPortForwardingCallback := prometheus.NewCounter(prometheus.CounterOpts{
34+
Namespace: "agent", Subsystem: "ssh_server", Name: "local_port_forwarding_callback",
35+
})
36+
registerer.MustRegister(localPortForwardingCallback)
37+
38+
ptyCallback := prometheus.NewCounter(prometheus.CounterOpts{
39+
Namespace: "agent", Subsystem: "ssh_server", Name: "pty_callback",
40+
})
41+
registerer.MustRegister(ptyCallback)
42+
43+
reversePortForwardingCallback := prometheus.NewCounter(prometheus.CounterOpts{
44+
Namespace: "agent", Subsystem: "ssh_server", Name: "reverse_port_forwarding_callback",
45+
})
46+
registerer.MustRegister(reversePortForwardingCallback)
47+
48+
x11Callback := prometheus.NewCounter(prometheus.CounterOpts{
49+
Namespace: "agent", Subsystem: "ssh_server", Name: "x11_callback",
50+
})
51+
registerer.MustRegister(x11Callback)
52+
53+
sftpHandler := prometheus.NewCounter(prometheus.CounterOpts{
54+
Namespace: "agent", Subsystem: "ssh_server", Name: "sftp_handler",
55+
})
56+
registerer.MustRegister(sftpHandler)
57+
58+
sftpServerError := prometheus.NewCounter(prometheus.CounterOpts{
59+
Namespace: "agent", Subsystem: "ssh_server", Name: "sftp_server_error",
60+
})
61+
registerer.MustRegister(sftpServerError)
62+
63+
x11SocketDirError := prometheus.NewCounter(prometheus.CounterOpts{
64+
Namespace: "agent", Subsystem: "ssh_server", Name: "x11_socket_dir_error",
65+
})
66+
registerer.MustRegister(x11SocketDirError)
67+
68+
x11XauthorityError := prometheus.NewCounter(prometheus.CounterOpts{
69+
Namespace: "agent", Subsystem: "ssh_server", Name: "x11_xauthority_error",
70+
})
71+
registerer.MustRegister(x11XauthorityError)
72+
73+
return &sshServerMetrics{
74+
connectionFailedCallback: connectionFailedCallback,
75+
localPortForwardingCallback: localPortForwardingCallback,
76+
ptyCallback: ptyCallback,
77+
reversePortForwardingCallback: reversePortForwardingCallback,
78+
x11Callback: x11Callback,
79+
sftpHandler: sftpHandler,
80+
sftpServerError: sftpServerError,
81+
x11SocketDirError: x11SocketDirError,
82+
x11XauthorityError: x11XauthorityError,
83+
}
84+
}
2585

2686
var sessionMetrics = map[string]sessionMetricsObject{}
2787

0 commit comments

Comments
 (0)