Skip to content

feat: change agent to use v2 API for reporting stats #12024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 82 additions & 109 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ type Options struct {

type Client interface {
ConnectRPC(ctx context.Context) (drpc.Conn, error)
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
Expand Down Expand Up @@ -158,7 +157,6 @@ func New(options Options) Agent {
lifecycleStates: []agentsdk.PostLifecycleRequest{{State: codersdk.WorkspaceAgentLifecycleCreated}},
ignorePorts: options.IgnorePorts,
portCacheDuration: options.PortCacheDuration,
connStatsChan: make(chan *agentsdk.Stats, 1),
reportMetadataInterval: options.ReportMetadataInterval,
serviceBannerRefreshInterval: options.ServiceBannerRefreshInterval,
sshMaxTimeout: options.SSHMaxTimeout,
Expand Down Expand Up @@ -216,8 +214,7 @@ type agent struct {

network *tailnet.Conn
addresses []netip.Prefix
connStatsChan chan *agentsdk.Stats
latestStat atomic.Pointer[agentsdk.Stats]
statsReporter *statsReporter

connCountReconnectingPTY atomic.Int64

Expand Down Expand Up @@ -822,14 +819,13 @@ func (a *agent) run(ctx context.Context) error {
closed := a.isClosed()
if !closed {
a.network = network
a.statsReporter = newStatsReporter(a.logger, network, a)
}
a.closeMutex.Unlock()
if closed {
_ = network.Close()
return xerrors.New("agent is closed")
}

a.startReportingConnectionStats(ctx)
} else {
// Update the wireguard IPs if the agent ID changed.
err := network.SetAddresses(a.wireguardAddresses(manifest.AgentID))
Expand Down Expand Up @@ -871,6 +867,15 @@ func (a *agent) run(ctx context.Context) error {
return nil
})

eg.Go(func() error {
a.logger.Debug(egCtx, "running stats report loop")
err := a.statsReporter.reportLoop(egCtx, aAPI)
if err != nil {
return xerrors.Errorf("report stats loop: %w", err)
}
return nil
})

return eg.Wait()
}

Expand Down Expand Up @@ -1218,115 +1223,83 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
return rpty.Attach(ctx, connectionID, conn, msg.Height, msg.Width, connLogger)
}

// startReportingConnectionStats runs the connection stats reporting goroutine.
func (a *agent) startReportingConnectionStats(ctx context.Context) {
reportStats := func(networkStats map[netlogtype.Connection]netlogtype.Counts) {
a.logger.Debug(ctx, "computing stats report")
stats := &agentsdk.Stats{
ConnectionCount: int64(len(networkStats)),
ConnectionsByProto: map[string]int64{},
}
for conn, counts := range networkStats {
stats.ConnectionsByProto[conn.Proto.String()]++
stats.RxBytes += int64(counts.RxBytes)
stats.RxPackets += int64(counts.RxPackets)
stats.TxBytes += int64(counts.TxBytes)
stats.TxPackets += int64(counts.TxPackets)
}

// The count of active sessions.
sshStats := a.sshServer.ConnStats()
stats.SessionCountSSH = sshStats.Sessions
stats.SessionCountVSCode = sshStats.VSCode
stats.SessionCountJetBrains = sshStats.JetBrains

stats.SessionCountReconnectingPTY = a.connCountReconnectingPTY.Load()

// Compute the median connection latency!
a.logger.Debug(ctx, "starting peer latency measurement for stats")
var wg sync.WaitGroup
var mu sync.Mutex
status := a.network.Status()
durations := []float64{}
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
for nodeID, peer := range status.Peer {
if !peer.Active {
continue
}
addresses, found := a.network.NodeAddresses(nodeID)
if !found {
continue
}
if len(addresses) == 0 {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
duration, _, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
if err != nil {
return
}
mu.Lock()
durations = append(durations, float64(duration.Microseconds()))
mu.Unlock()
}()
// Collect collects additional stats from the agent
func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connection]netlogtype.Counts) *proto.Stats {
a.logger.Debug(context.Background(), "computing stats report")
stats := &proto.Stats{
ConnectionCount: int64(len(networkStats)),
ConnectionsByProto: map[string]int64{},
}
for conn, counts := range networkStats {
stats.ConnectionsByProto[conn.Proto.String()]++
stats.RxBytes += int64(counts.RxBytes)
stats.RxPackets += int64(counts.RxPackets)
stats.TxBytes += int64(counts.TxBytes)
stats.TxPackets += int64(counts.TxPackets)
}

// The count of active sessions.
sshStats := a.sshServer.ConnStats()
stats.SessionCountSsh = sshStats.Sessions
stats.SessionCountVscode = sshStats.VSCode
stats.SessionCountJetbrains = sshStats.JetBrains

stats.SessionCountReconnectingPty = a.connCountReconnectingPTY.Load()

// Compute the median connection latency!
a.logger.Debug(ctx, "starting peer latency measurement for stats")
var wg sync.WaitGroup
var mu sync.Mutex
status := a.network.Status()
durations := []float64{}
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
for nodeID, peer := range status.Peer {
if !peer.Active {
continue
}
wg.Wait()
sort.Float64s(durations)
durationsLength := len(durations)
if durationsLength == 0 {
stats.ConnectionMedianLatencyMS = -1
} else if durationsLength%2 == 0 {
stats.ConnectionMedianLatencyMS = (durations[durationsLength/2-1] + durations[durationsLength/2]) / 2
} else {
stats.ConnectionMedianLatencyMS = durations[durationsLength/2]
addresses, found := a.network.NodeAddresses(nodeID)
if !found {
continue
}
// Convert from microseconds to milliseconds.
stats.ConnectionMedianLatencyMS /= 1000

// Collect agent metrics.
// Agent metrics are changing all the time, so there is no need to perform
// reflect.DeepEqual to see if stats should be transferred.

metricsCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
a.logger.Debug(ctx, "collecting agent metrics for stats")
stats.Metrics = a.collectMetrics(metricsCtx)

a.latestStat.Store(stats)

a.logger.Debug(ctx, "about to send stats")
select {
case a.connStatsChan <- stats:
a.logger.Debug(ctx, "successfully sent stats")
case <-a.closed:
a.logger.Debug(ctx, "didn't send stats because we are closed")
if len(addresses) == 0 {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
duration, _, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
if err != nil {
return
}
mu.Lock()
defer mu.Unlock()
durations = append(durations, float64(duration.Microseconds()))
}()
}

// Report statistics from the created network.
cl, err := a.client.ReportStats(ctx, a.logger, a.connStatsChan, func(d time.Duration) {
a.network.SetConnStatsCallback(d, 2048,
func(_, _ time.Time, virtual, _ map[netlogtype.Connection]netlogtype.Counts) {
reportStats(virtual)
},
)
})
if err != nil {
a.logger.Error(ctx, "agent failed to report stats", slog.Error(err))
wg.Wait()
sort.Float64s(durations)
durationsLength := len(durations)
if durationsLength == 0 {
stats.ConnectionMedianLatencyMs = -1
} else if durationsLength%2 == 0 {
stats.ConnectionMedianLatencyMs = (durations[durationsLength/2-1] + durations[durationsLength/2]) / 2
} else {
if err = a.trackConnGoroutine(func() {
// This is OK because the agent never re-creates the tailnet
// and the only shutdown indicator is agent.Close().
<-a.closed
_ = cl.Close()
}); err != nil {
a.logger.Debug(ctx, "report stats goroutine", slog.Error(err))
_ = cl.Close()
}
stats.ConnectionMedianLatencyMs = durations[durationsLength/2]
}
// Convert from microseconds to milliseconds.
stats.ConnectionMedianLatencyMs /= 1000

// Collect agent metrics.
// Agent metrics are changing all the time, so there is no need to perform
// reflect.DeepEqual to see if stats should be transferred.

metricsCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
a.logger.Debug(ctx, "collecting agent metrics for stats")
stats.Metrics = a.collectMetrics(metricsCtx)

return stats
}

var prioritizedProcs = []string{"coder agent"}
Expand Down
35 changes: 18 additions & 17 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/coder/coder/v2/agent/agentproc/agentproctest"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/agenttest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/pty/ptytest"
Expand Down Expand Up @@ -85,11 +86,11 @@ func TestAgent_Stats_SSH(t *testing.T) {
err = session.Shell()
require.NoError(t, err)

var s *agentsdk.Stats
var s *proto.Stats
require.Eventuallyf(t, func() bool {
var ok bool
s, ok = <-stats
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountSSH == 1
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountSsh == 1
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats: %+v", s,
)
Expand Down Expand Up @@ -118,11 +119,11 @@ func TestAgent_Stats_ReconnectingPTY(t *testing.T) {
_, err = ptyConn.Write(data)
require.NoError(t, err)

var s *agentsdk.Stats
var s *proto.Stats
require.Eventuallyf(t, func() bool {
var ok bool
s, ok = <-stats
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountReconnectingPTY == 1
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountReconnectingPty == 1
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats: %+v", s,
)
Expand Down Expand Up @@ -177,14 +178,14 @@ func TestAgent_Stats_Magic(t *testing.T) {
require.Eventuallyf(t, func() bool {
s, ok := <-stats
t.Logf("got stats: ok=%t, ConnectionCount=%d, RxBytes=%d, TxBytes=%d, SessionCountVSCode=%d, ConnectionMedianLatencyMS=%f",
ok, s.ConnectionCount, s.RxBytes, s.TxBytes, s.SessionCountVSCode, s.ConnectionMedianLatencyMS)
ok, s.ConnectionCount, s.RxBytes, s.TxBytes, s.SessionCountVscode, s.ConnectionMedianLatencyMs)
Comment on lines -180 to +181
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VSCode vs Vscode
PTY vs Pty

are these renamed on purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK type fields were written by humans, the proto type fields are machine translated from lowercase_with_underscores so it doesn't get capitalization correct in all cases. Unfortunate from a style perspective, but I haven't chased down how to fix it yet.

return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 &&
// Ensure that the connection didn't count as a "normal" SSH session.
// This was a special one, so it should be labeled specially in the stats!
s.SessionCountVSCode == 1 &&
s.SessionCountVscode == 1 &&
// Ensure that connection latency is being counted!
// If it isn't, it's set to -1.
s.ConnectionMedianLatencyMS >= 0
s.ConnectionMedianLatencyMs >= 0
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats",
)
Expand Down Expand Up @@ -243,9 +244,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
require.Eventuallyf(t, func() bool {
s, ok := <-stats
t.Logf("got stats with conn open: ok=%t, ConnectionCount=%d, SessionCountJetBrains=%d",
ok, s.ConnectionCount, s.SessionCountJetBrains)
ok, s.ConnectionCount, s.SessionCountJetbrains)
return ok && s.ConnectionCount > 0 &&
s.SessionCountJetBrains == 1
s.SessionCountJetbrains == 1
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats with conn open",
)
Expand All @@ -258,9 +259,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
require.Eventuallyf(t, func() bool {
s, ok := <-stats
t.Logf("got stats after disconnect %t, %d",
ok, s.SessionCountJetBrains)
ok, s.SessionCountJetbrains)
return ok &&
s.SessionCountJetBrains == 0
s.SessionCountJetbrains == 0
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats after conn closes",
)
Expand Down Expand Up @@ -1346,7 +1347,7 @@ func TestAgent_Lifecycle(t *testing.T) {
RunOnStop: true,
}},
},
make(chan *agentsdk.Stats, 50),
make(chan *proto.Stats, 50),
tailnet.NewCoordinator(logger),
)
defer client.Close()
Expand Down Expand Up @@ -1667,7 +1668,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
_ = coordinator.Close()
})
agentID := uuid.New()
statsCh := make(chan *agentsdk.Stats, 50)
statsCh := make(chan *proto.Stats, 50)
fs := afero.NewMemMapFs()
client := agenttest.NewClient(t,
logger.Named("agent"),
Expand Down Expand Up @@ -1816,7 +1817,7 @@ func TestAgent_Reconnect(t *testing.T) {
defer coordinator.Close()

agentID := uuid.New()
statsCh := make(chan *agentsdk.Stats, 50)
statsCh := make(chan *proto.Stats, 50)
derpMap, _ := tailnettest.RunDERPAndSTUN(t)
client := agenttest.NewClient(t,
logger,
Expand Down Expand Up @@ -1861,7 +1862,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
GitAuthConfigs: 1,
DERPMap: &tailcfg.DERPMap{},
},
make(chan *agentsdk.Stats, 50),
make(chan *proto.Stats, 50),
coordinator,
)
defer client.Close()
Expand Down Expand Up @@ -2018,7 +2019,7 @@ func setupSSHSession(
func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Duration, opts ...func(*agenttest.Client, *agent.Options)) (
*codersdk.WorkspaceAgentConn,
*agenttest.Client,
<-chan *agentsdk.Stats,
<-chan *proto.Stats,
afero.Fs,
agent.Agent,
) {
Expand Down Expand Up @@ -2046,7 +2047,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
t.Cleanup(func() {
_ = coordinator.Close()
})
statsCh := make(chan *agentsdk.Stats, 50)
statsCh := make(chan *proto.Stats, 50)
fs := afero.NewMemMapFs()
c := agenttest.NewClient(t, logger.Named("agent"), metadata.AgentID, metadata, statsCh, coordinator)
t.Cleanup(c.Close)
Expand Down
Loading