Skip to content

feat: Collect agent SSH metrics #7584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
092e1d0
feat: Collect agent SSH metrics
mtojek May 17, 2023
0c0df91
more metrics
mtojek May 17, 2023
84b59ea
err
mtojek May 17, 2023
000586a
session metrics
mtojek May 18, 2023
11fb056
session error
mtojek May 18, 2023
07b2b0e
fix
mtojek May 18, 2023
e38a7be
fmt
mtojek May 18, 2023
ba4bb4d
WIP
mtojek May 18, 2023
0d0f300
Refactored to client_golang/prometheus
mtojek May 18, 2023
315b5ce
fix
mtojek May 18, 2023
43d5d40
fix
mtojek May 18, 2023
85f8860
refactor
mtojek May 18, 2023
7b26267
Merge branch 'main' into 6724-ssh-metrics
mtojek May 18, 2023
34f07fc
refactor
mtojek May 18, 2023
59fd585
fix test
mtojek May 18, 2023
8cd927c
fix
mtojek May 18, 2023
6eec4d7
fix
mtojek May 18, 2023
a059edf
fix
mtojek May 18, 2023
c004c04
fix
mtojek May 18, 2023
9b0e31a
Address PR comments
mtojek May 19, 2023
7d4ccce
x11HostnameError
mtojek May 19, 2023
90b351d
Remove callbacks
mtojek May 19, 2023
1773c24
failedConnectionsTotal
mtojek May 19, 2023
5ac27b7
connectionsTotal
mtojek May 19, 2023
6eb1a95
sftpConnectionsTotal
mtojek May 19, 2023
e3d7493
sessionError
mtojek May 19, 2023
9620452
sftpServerErrors
mtojek May 19, 2023
f05466c
remove handlerError
mtojek May 19, 2023
5887ee8
WIP
mtojek May 19, 2023
bb3602b
WIP
mtojek May 22, 2023
27fc9a0
WIP
mtojek May 22, 2023
3f4696b
Finish impl
mtojek May 23, 2023
8cd07f2
Aggregator: labels
mtojek May 23, 2023
a51cde9
Merge branch 'main' into 6724-ssh-metrics
mtojek May 23, 2023
389dd9f
TestAgent_Metrics_SSH
mtojek May 23, 2023
8e10d6d
Address PR comments
mtojek May 24, 2023
1858dc2
use labelIndex
mtojek May 24, 2023
8dde9f9
Merge branch 'main' into 6724-ssh-metrics
mtojek May 24, 2023
db725a3
Merge branch 'main' into 6724-ssh-metrics
mtojek May 25, 2023
f416287
PR comments part 1
mtojek May 25, 2023
4daf37d
PR comments part 2
mtojek May 25, 2023
d9203b8
PR comments part 3
mtojek May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/armon/circbuf"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -63,6 +64,8 @@ type Options struct {
SSHMaxTimeout time.Duration
TailnetListenPort uint16
Subsystem codersdk.AgentSubsystem

PrometheusRegistry *prometheus.Registry
}

type Client interface {
Expand Down Expand Up @@ -102,6 +105,12 @@ func New(options Options) Agent {
return "", nil
}
}

prometheusRegistry := options.PrometheusRegistry
if prometheusRegistry == nil {
prometheusRegistry = prometheus.NewRegistry()
}

ctx, cancelFunc := context.WithCancel(context.Background())
a := &agent{
tailnetListenPort: options.TailnetListenPort,
Expand All @@ -121,6 +130,9 @@ func New(options Options) Agent {
connStatsChan: make(chan *agentsdk.Stats, 1),
sshMaxTimeout: options.SSHMaxTimeout,
subsystem: options.Subsystem,

prometheusRegistry: prometheusRegistry,
metrics: newAgentMetrics(prometheusRegistry),
}
a.init(ctx)
return a
Expand Down Expand Up @@ -165,10 +177,13 @@ type agent struct {
latestStat atomic.Pointer[agentsdk.Stats]

connCountReconnectingPTY atomic.Int64

prometheusRegistry *prometheus.Registry
metrics *agentMetrics
}

func (a *agent) init(ctx context.Context) {
sshSrv, err := agentssh.NewServer(ctx, a.logger.Named("ssh-server"), a.filesystem, a.sshMaxTimeout, "")
sshSrv, err := agentssh.NewServer(ctx, a.logger.Named("ssh-server"), a.prometheusRegistry, a.filesystem, a.sshMaxTimeout, "")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -983,6 +998,7 @@ func (a *agent) trackScriptLogs(ctx context.Context, reader io.Reader) (chan str

func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) {
defer conn.Close()
a.metrics.connectionsTotal.Add(1)

a.connCountReconnectingPTY.Add(1)
defer a.connCountReconnectingPTY.Add(-1)
Expand Down Expand Up @@ -1022,6 +1038,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
// Empty command will default to the users shell!
cmd, err := a.sshServer.CreateCommand(ctx, msg.Command, nil)
if err != nil {
a.metrics.reconnectingPTYErrors.WithLabelValues("create_command").Add(1)
return xerrors.Errorf("create command: %w", err)
}
cmd.Env = append(cmd.Env, "TERM=xterm-256color")
Expand All @@ -1034,6 +1051,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m

ptty, process, err := pty.Start(cmd)
if err != nil {
a.metrics.reconnectingPTYErrors.WithLabelValues("start_command").Add(1)
return xerrors.Errorf("start command: %w", err)
}

Expand All @@ -1060,7 +1078,12 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
if err != nil {
// When the PTY is closed, this is triggered.
// Error is typically a benign EOF, so only log for debugging.
logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err))
if errors.Is(err, io.EOF) {
logger.Debug(ctx, "unable to read pty output, command exited?", slog.Error(err))
} else {
logger.Warn(ctx, "unable to read pty output, command exited?", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("output_reader").Add(1)
}
break
}
part := buffer[:read]
Expand All @@ -1075,11 +1098,12 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
for cid, conn := range rpty.activeConns {
_, err = conn.Write(part)
if err != nil {
logger.Debug(ctx,
logger.Warn(ctx,
"error writing to active conn",
slog.F("other_conn_id", cid),
slog.Error(err),
)
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
}
}
rpty.activeConnsMutex.Unlock()
Expand All @@ -1099,6 +1123,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
if err != nil {
// We can continue after this, it's not fatal!
logger.Error(ctx, "resize", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
}
// Write any previously stored data for the TTY.
rpty.circularBufferMutex.RLock()
Expand All @@ -1111,6 +1136,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
// while also holding circularBufferMutex seems dangerous.
_, err = conn.Write(prevBuf)
if err != nil {
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
return xerrors.Errorf("write buffer to conn: %w", err)
}
// Multiple connections to the same TTY are permitted.
Expand Down Expand Up @@ -1161,6 +1187,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
_, err = rpty.ptty.InputWriter().Write([]byte(req.Data))
if err != nil {
logger.Warn(ctx, "write to pty", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("input_writer").Add(1)
return nil
}
// Check if a resize needs to happen!
Expand All @@ -1171,6 +1198,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
if err != nil {
// We can continue after this, it's not fatal!
logger.Error(ctx, "resize", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
}
}
}
Expand Down Expand Up @@ -1203,7 +1231,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
var mu sync.Mutex
status := a.network.Status()
durations := []float64{}
ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
for nodeID, peer := range status.Peer {
if !peer.Active {
Expand All @@ -1219,7 +1247,7 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
duration, _, _, err := a.network.Ping(ctx, addresses[0].Addr())
duration, _, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
if err != nil {
return
}
Expand All @@ -1244,7 +1272,10 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
// Collect agent metrics.
// Agent metrics are changing all the time, so there is no need to perform
// reflect.DeepEqual to see if stats should be transferred.
stats.Metrics = collectMetrics()

metricsCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
stats.Metrics = a.collectMetrics(metricsCtx)

a.latestStat.Store(stats)

Expand Down
60 changes: 51 additions & 9 deletions agent/agentssh/agentssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/gliderlabs/ssh"
"github.com/pkg/sftp"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"go.uber.org/atomic"
gossh "golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -69,9 +70,11 @@ type Server struct {
connCountVSCode atomic.Int64
connCountJetBrains atomic.Int64
connCountSSHSession atomic.Int64

metrics *sshServerMetrics
}

func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout time.Duration, x11SocketDir string) (*Server, error) {
func NewServer(ctx context.Context, logger slog.Logger, prometheusRegistry *prometheus.Registry, fs afero.Fs, maxTimeout time.Duration, x11SocketDir string) (*Server, error) {
// Clients' should ignore the host key when connecting.
// The agent needs to authenticate with coderd to SSH,
// so SSH authentication doesn't improve security.
Expand All @@ -90,13 +93,16 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout
forwardHandler := &ssh.ForwardedTCPHandler{}
unixForwardHandler := &forwardedUnixHandler{log: logger}

metrics := newSSHServerMetrics(prometheusRegistry)
s := &Server{
listeners: make(map[net.Listener]struct{}),
fs: fs,
conns: make(map[net.Conn]struct{}),
sessions: make(map[ssh.Session]struct{}),
logger: logger,
x11SocketDir: x11SocketDir,

metrics: metrics,
}

s.srv = &ssh.Server{
Expand All @@ -106,7 +112,8 @@ func NewServer(ctx context.Context, logger slog.Logger, fs afero.Fs, maxTimeout
"session": ssh.DefaultSessionHandler,
},
ConnectionFailedCallback: func(_ net.Conn, err error) {
s.logger.Info(ctx, "ssh connection ended", slog.Error(err))
s.logger.Warn(ctx, "ssh connection failed", slog.Error(err))
metrics.failedConnectionsTotal.Add(1)
},
Handler: s.sessionHandler,
HostSigners: []ssh.Signer{randomSigner},
Expand Down Expand Up @@ -197,7 +204,7 @@ func (s *Server) sessionHandler(session ssh.Session) {
err := s.sessionStart(session, extraEnv)
var exitError *exec.ExitError
if xerrors.As(err, &exitError) {
s.logger.Debug(ctx, "ssh session returned", slog.Error(exitError))
s.logger.Warn(ctx, "ssh session returned", slog.Error(exitError))
_ = session.Exit(exitError.ExitCode())
return
}
Expand All @@ -211,6 +218,16 @@ func (s *Server) sessionHandler(session ssh.Session) {
_ = session.Exit(0)
}

func magicType(session ssh.Session) string {
for _, kv := range session.Environ() {
if !strings.HasPrefix(kv, MagicSessionTypeEnvironmentVariable) {
continue
}
return strings.TrimPrefix(kv, MagicSessionTypeEnvironmentVariable+"=")
}
return ""
}

func (s *Server) sessionStart(session ssh.Session, extraEnv []string) (retErr error) {
ctx := session.Context()
env := append(session.Environ(), extraEnv...)
Expand All @@ -236,14 +253,18 @@ func (s *Server) sessionStart(session ssh.Session, extraEnv []string) (retErr er
s.logger.Warn(ctx, "invalid magic ssh session type specified", slog.F("type", magicType))
}

magicTypeLabel := magicTypeMetricLabel(magicType)

cmd, err := s.CreateCommand(ctx, session.RawCommand(), env)
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "create_command").Add(1)
return err
}

if ssh.AgentRequested(session) {
l, err := ssh.NewAgentListener()
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "listener").Add(1)
return xerrors.Errorf("new agent listener: %w", err)
}
defer l.Close()
Expand All @@ -253,26 +274,33 @@ func (s *Server) sessionStart(session ssh.Session, extraEnv []string) (retErr er

sshPty, windowSize, isPty := session.Pty()
if isPty {
return s.startPTYSession(session, cmd, sshPty, windowSize)
return s.startPTYSession(session, magicTypeLabel, cmd, sshPty, windowSize)
}
return startNonPTYSession(session, cmd.AsExec())
return s.startNonPTYSession(session, magicTypeLabel, cmd.AsExec())
}

func startNonPTYSession(session ssh.Session, cmd *exec.Cmd) error {
func (s *Server) startNonPTYSession(session ssh.Session, magicTypeLabel string, cmd *exec.Cmd) error {
s.metrics.sessionsTotal.WithLabelValues(magicTypeLabel, "no").Add(1)

cmd.Stdout = session
cmd.Stderr = session.Stderr()
// This blocks forever until stdin is received if we don't
// use StdinPipe. It's unknown what causes this.
stdinPipe, err := cmd.StdinPipe()
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_pipe").Add(1)
return xerrors.Errorf("create stdin pipe: %w", err)
}
go func() {
_, _ = io.Copy(stdinPipe, session)
_, err := io.Copy(stdinPipe, session)
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_io_copy").Add(1)
}
_ = stdinPipe.Close()
}()
err = cmd.Start()
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "start_command").Add(1)
return xerrors.Errorf("start: %w", err)
}
return cmd.Wait()
Expand All @@ -287,7 +315,9 @@ type ptySession interface {
RawCommand() string
}

func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pty, windowSize <-chan ssh.Window) (retErr error) {
func (s *Server) startPTYSession(session ptySession, magicTypeLabel string, cmd *pty.Cmd, sshPty ssh.Pty, windowSize <-chan ssh.Window) (retErr error) {
s.metrics.sessionsTotal.WithLabelValues(magicTypeLabel, "yes").Add(1)

ctx := session.Context()
// Disable minimal PTY emulation set by gliderlabs/ssh (NL-to-CRNL).
// See https://github.com/coder/coder/issues/3371.
Expand All @@ -299,6 +329,7 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt
err := showMOTD(session, manifest.MOTDFile)
if err != nil {
s.logger.Error(ctx, "show MOTD", slog.Error(err))
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "motd").Add(1)
}
} else {
s.logger.Warn(ctx, "metadata lookup failed, unable to show MOTD")
Expand All @@ -313,12 +344,14 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt
pty.WithLogger(slog.Stdlib(ctx, s.logger, slog.LevelInfo)),
))
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "start_command").Add(1)
return xerrors.Errorf("start command: %w", err)
}
defer func() {
closeErr := ptty.Close()
if closeErr != nil {
s.logger.Warn(ctx, "failed to close tty", slog.Error(closeErr))
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "close").Add(1)
if retErr == nil {
retErr = closeErr
}
Expand All @@ -330,12 +363,16 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt
// If the pty is closed, then command has exited, no need to log.
if resizeErr != nil && !errors.Is(resizeErr, pty.ErrClosed) {
s.logger.Warn(ctx, "failed to resize tty", slog.Error(resizeErr))
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "resize").Add(1)
}
}
}()

go func() {
_, _ = io.Copy(ptty.InputWriter(), session)
_, err := io.Copy(ptty.InputWriter(), session)
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "input_io_copy").Add(1)
}
}()

// We need to wait for the command output to finish copying. It's safe to
Expand All @@ -349,6 +386,7 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt
n, err := io.Copy(session, ptty.OutputReader())
s.logger.Debug(ctx, "copy output done", slog.F("bytes", n), slog.Error(err))
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "output_io_copy").Add(1)
return xerrors.Errorf("copy error: %w", err)
}
// We've gotten all the output, but we need to wait for the process to
Expand All @@ -360,6 +398,7 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt
// and not something to be concerned about. But, if it's something else, we should log it.
if err != nil && !xerrors.As(err, &exitErr) {
s.logger.Warn(ctx, "wait error", slog.Error(err))
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "wait").Add(1)
}
if err != nil {
return xerrors.Errorf("process wait: %w", err)
Expand All @@ -368,6 +407,8 @@ func (s *Server) startPTYSession(session ptySession, cmd *pty.Cmd, sshPty ssh.Pt
}

func (s *Server) sftpHandler(session ssh.Session) {
s.metrics.sftpConnectionsTotal.Add(1)

ctx := session.Context()

// Typically sftp sessions don't request a TTY, but if they do,
Expand Down Expand Up @@ -407,6 +448,7 @@ func (s *Server) sftpHandler(session ssh.Session) {
return
}
s.logger.Warn(ctx, "sftp server closed with error", slog.Error(err))
s.metrics.sftpServerErrors.Add(1)
_ = session.Exit(1)
}

Expand Down
Loading