Skip to content

Commit 3ac9064

Browse files
committed
feat: change agent to use v2 API for reporting stats
1 parent c7f52b7 commit 3ac9064

File tree

8 files changed

+136
-301
lines changed

8 files changed

+136
-301
lines changed

agent/agent.go

+82-109
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Options struct {
8989

9090
type Client interface {
9191
ConnectRPC(ctx context.Context) (drpc.Conn, error)
92-
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
9392
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9493
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
9594
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
@@ -158,7 +157,6 @@ func New(options Options) Agent {
158157
lifecycleStates: []agentsdk.PostLifecycleRequest{{State: codersdk.WorkspaceAgentLifecycleCreated}},
159158
ignorePorts: options.IgnorePorts,
160159
portCacheDuration: options.PortCacheDuration,
161-
connStatsChan: make(chan *agentsdk.Stats, 1),
162160
reportMetadataInterval: options.ReportMetadataInterval,
163161
serviceBannerRefreshInterval: options.ServiceBannerRefreshInterval,
164162
sshMaxTimeout: options.SSHMaxTimeout,
@@ -216,8 +214,7 @@ type agent struct {
216214

217215
network *tailnet.Conn
218216
addresses []netip.Prefix
219-
connStatsChan chan *agentsdk.Stats
220-
latestStat atomic.Pointer[agentsdk.Stats]
217+
statsReporter *statsReporter
221218

222219
connCountReconnectingPTY atomic.Int64
223220

@@ -822,14 +819,13 @@ func (a *agent) run(ctx context.Context) error {
822819
closed := a.isClosed()
823820
if !closed {
824821
a.network = network
822+
a.statsReporter = newStatsReporter(a.logger, network, a)
825823
}
826824
a.closeMutex.Unlock()
827825
if closed {
828826
_ = network.Close()
829827
return xerrors.New("agent is closed")
830828
}
831-
832-
a.startReportingConnectionStats(ctx)
833829
} else {
834830
// Update the wireguard IPs if the agent ID changed.
835831
err := network.SetAddresses(a.wireguardAddresses(manifest.AgentID))
@@ -871,6 +867,15 @@ func (a *agent) run(ctx context.Context) error {
871867
return nil
872868
})
873869

870+
eg.Go(func() error {
871+
a.logger.Debug(egCtx, "running stats report loop")
872+
err := a.statsReporter.reportLoop(egCtx, aAPI)
873+
if err != nil {
874+
return xerrors.Errorf("report stats loop: %w", err)
875+
}
876+
return nil
877+
})
878+
874879
return eg.Wait()
875880
}
876881

@@ -1218,115 +1223,83 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
12181223
return rpty.Attach(ctx, connectionID, conn, msg.Height, msg.Width, connLogger)
12191224
}
12201225

1221-
// startReportingConnectionStats runs the connection stats reporting goroutine.
1222-
func (a *agent) startReportingConnectionStats(ctx context.Context) {
1223-
reportStats := func(networkStats map[netlogtype.Connection]netlogtype.Counts) {
1224-
a.logger.Debug(ctx, "computing stats report")
1225-
stats := &agentsdk.Stats{
1226-
ConnectionCount: int64(len(networkStats)),
1227-
ConnectionsByProto: map[string]int64{},
1228-
}
1229-
for conn, counts := range networkStats {
1230-
stats.ConnectionsByProto[conn.Proto.String()]++
1231-
stats.RxBytes += int64(counts.RxBytes)
1232-
stats.RxPackets += int64(counts.RxPackets)
1233-
stats.TxBytes += int64(counts.TxBytes)
1234-
stats.TxPackets += int64(counts.TxPackets)
1235-
}
1236-
1237-
// The count of active sessions.
1238-
sshStats := a.sshServer.ConnStats()
1239-
stats.SessionCountSSH = sshStats.Sessions
1240-
stats.SessionCountVSCode = sshStats.VSCode
1241-
stats.SessionCountJetBrains = sshStats.JetBrains
1242-
1243-
stats.SessionCountReconnectingPTY = a.connCountReconnectingPTY.Load()
1244-
1245-
// Compute the median connection latency!
1246-
a.logger.Debug(ctx, "starting peer latency measurement for stats")
1247-
var wg sync.WaitGroup
1248-
var mu sync.Mutex
1249-
status := a.network.Status()
1250-
durations := []float64{}
1251-
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
1252-
defer cancelFunc()
1253-
for nodeID, peer := range status.Peer {
1254-
if !peer.Active {
1255-
continue
1256-
}
1257-
addresses, found := a.network.NodeAddresses(nodeID)
1258-
if !found {
1259-
continue
1260-
}
1261-
if len(addresses) == 0 {
1262-
continue
1263-
}
1264-
wg.Add(1)
1265-
go func() {
1266-
defer wg.Done()
1267-
duration, _, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
1268-
if err != nil {
1269-
return
1270-
}
1271-
mu.Lock()
1272-
durations = append(durations, float64(duration.Microseconds()))
1273-
mu.Unlock()
1274-
}()
1226+
// Collect collects additional stats from the agent
1227+
func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connection]netlogtype.Counts) *proto.Stats {
1228+
a.logger.Debug(context.Background(), "computing stats report")
1229+
stats := &proto.Stats{
1230+
ConnectionCount: int64(len(networkStats)),
1231+
ConnectionsByProto: map[string]int64{},
1232+
}
1233+
for conn, counts := range networkStats {
1234+
stats.ConnectionsByProto[conn.Proto.String()]++
1235+
stats.RxBytes += int64(counts.RxBytes)
1236+
stats.RxPackets += int64(counts.RxPackets)
1237+
stats.TxBytes += int64(counts.TxBytes)
1238+
stats.TxPackets += int64(counts.TxPackets)
1239+
}
1240+
1241+
// The count of active sessions.
1242+
sshStats := a.sshServer.ConnStats()
1243+
stats.SessionCountSsh = sshStats.Sessions
1244+
stats.SessionCountVscode = sshStats.VSCode
1245+
stats.SessionCountJetbrains = sshStats.JetBrains
1246+
1247+
stats.SessionCountReconnectingPty = a.connCountReconnectingPTY.Load()
1248+
1249+
// Compute the median connection latency!
1250+
a.logger.Debug(ctx, "starting peer latency measurement for stats")
1251+
var wg sync.WaitGroup
1252+
var mu sync.Mutex
1253+
status := a.network.Status()
1254+
durations := []float64{}
1255+
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
1256+
defer cancelFunc()
1257+
for nodeID, peer := range status.Peer {
1258+
if !peer.Active {
1259+
continue
12751260
}
1276-
wg.Wait()
1277-
sort.Float64s(durations)
1278-
durationsLength := len(durations)
1279-
if durationsLength == 0 {
1280-
stats.ConnectionMedianLatencyMS = -1
1281-
} else if durationsLength%2 == 0 {
1282-
stats.ConnectionMedianLatencyMS = (durations[durationsLength/2-1] + durations[durationsLength/2]) / 2
1283-
} else {
1284-
stats.ConnectionMedianLatencyMS = durations[durationsLength/2]
1261+
addresses, found := a.network.NodeAddresses(nodeID)
1262+
if !found {
1263+
continue
12851264
}
1286-
// Convert from microseconds to milliseconds.
1287-
stats.ConnectionMedianLatencyMS /= 1000
1288-
1289-
// Collect agent metrics.
1290-
// Agent metrics are changing all the time, so there is no need to perform
1291-
// reflect.DeepEqual to see if stats should be transferred.
1292-
1293-
metricsCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
1294-
defer cancelFunc()
1295-
a.logger.Debug(ctx, "collecting agent metrics for stats")
1296-
stats.Metrics = a.collectMetrics(metricsCtx)
1297-
1298-
a.latestStat.Store(stats)
1299-
1300-
a.logger.Debug(ctx, "about to send stats")
1301-
select {
1302-
case a.connStatsChan <- stats:
1303-
a.logger.Debug(ctx, "successfully sent stats")
1304-
case <-a.closed:
1305-
a.logger.Debug(ctx, "didn't send stats because we are closed")
1265+
if len(addresses) == 0 {
1266+
continue
13061267
}
1268+
wg.Add(1)
1269+
go func() {
1270+
defer wg.Done()
1271+
duration, _, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
1272+
if err != nil {
1273+
return
1274+
}
1275+
mu.Lock()
1276+
durations = append(durations, float64(duration.Microseconds()))
1277+
mu.Unlock()
1278+
}()
13071279
}
1308-
1309-
// Report statistics from the created network.
1310-
cl, err := a.client.ReportStats(ctx, a.logger, a.connStatsChan, func(d time.Duration) {
1311-
a.network.SetConnStatsCallback(d, 2048,
1312-
func(_, _ time.Time, virtual, _ map[netlogtype.Connection]netlogtype.Counts) {
1313-
reportStats(virtual)
1314-
},
1315-
)
1316-
})
1317-
if err != nil {
1318-
a.logger.Error(ctx, "agent failed to report stats", slog.Error(err))
1280+
wg.Wait()
1281+
sort.Float64s(durations)
1282+
durationsLength := len(durations)
1283+
if durationsLength == 0 {
1284+
stats.ConnectionMedianLatencyMs = -1
1285+
} else if durationsLength%2 == 0 {
1286+
stats.ConnectionMedianLatencyMs = (durations[durationsLength/2-1] + durations[durationsLength/2]) / 2
13191287
} else {
1320-
if err = a.trackConnGoroutine(func() {
1321-
// This is OK because the agent never re-creates the tailnet
1322-
// and the only shutdown indicator is agent.Close().
1323-
<-a.closed
1324-
_ = cl.Close()
1325-
}); err != nil {
1326-
a.logger.Debug(ctx, "report stats goroutine", slog.Error(err))
1327-
_ = cl.Close()
1328-
}
1288+
stats.ConnectionMedianLatencyMs = durations[durationsLength/2]
13291289
}
1290+
// Convert from microseconds to milliseconds.
1291+
stats.ConnectionMedianLatencyMs /= 1000
1292+
1293+
// Collect agent metrics.
1294+
// Agent metrics are changing all the time, so there is no need to perform
1295+
// reflect.DeepEqual to see if stats should be transferred.
1296+
1297+
metricsCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
1298+
defer cancelFunc()
1299+
a.logger.Debug(ctx, "collecting agent metrics for stats")
1300+
stats.Metrics = a.collectMetrics(metricsCtx)
1301+
1302+
return stats
13301303
}
13311304

13321305
var prioritizedProcs = []string{"coder agent"}

agent/agent_test.go

+18-17
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"github.com/coder/coder/v2/agent/agentproc/agentproctest"
5353
"github.com/coder/coder/v2/agent/agentssh"
5454
"github.com/coder/coder/v2/agent/agenttest"
55+
"github.com/coder/coder/v2/agent/proto"
5556
"github.com/coder/coder/v2/codersdk"
5657
"github.com/coder/coder/v2/codersdk/agentsdk"
5758
"github.com/coder/coder/v2/pty/ptytest"
@@ -85,11 +86,11 @@ func TestAgent_Stats_SSH(t *testing.T) {
8586
err = session.Shell()
8687
require.NoError(t, err)
8788

88-
var s *agentsdk.Stats
89+
var s *proto.Stats
8990
require.Eventuallyf(t, func() bool {
9091
var ok bool
9192
s, ok = <-stats
92-
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountSSH == 1
93+
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountSsh == 1
9394
}, testutil.WaitLong, testutil.IntervalFast,
9495
"never saw stats: %+v", s,
9596
)
@@ -118,11 +119,11 @@ func TestAgent_Stats_ReconnectingPTY(t *testing.T) {
118119
_, err = ptyConn.Write(data)
119120
require.NoError(t, err)
120121

121-
var s *agentsdk.Stats
122+
var s *proto.Stats
122123
require.Eventuallyf(t, func() bool {
123124
var ok bool
124125
s, ok = <-stats
125-
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountReconnectingPTY == 1
126+
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 && s.SessionCountReconnectingPty == 1
126127
}, testutil.WaitLong, testutil.IntervalFast,
127128
"never saw stats: %+v", s,
128129
)
@@ -177,14 +178,14 @@ func TestAgent_Stats_Magic(t *testing.T) {
177178
require.Eventuallyf(t, func() bool {
178179
s, ok := <-stats
179180
t.Logf("got stats: ok=%t, ConnectionCount=%d, RxBytes=%d, TxBytes=%d, SessionCountVSCode=%d, ConnectionMedianLatencyMS=%f",
180-
ok, s.ConnectionCount, s.RxBytes, s.TxBytes, s.SessionCountVSCode, s.ConnectionMedianLatencyMS)
181+
ok, s.ConnectionCount, s.RxBytes, s.TxBytes, s.SessionCountVscode, s.ConnectionMedianLatencyMs)
181182
return ok && s.ConnectionCount > 0 && s.RxBytes > 0 && s.TxBytes > 0 &&
182183
// Ensure that the connection didn't count as a "normal" SSH session.
183184
// This was a special one, so it should be labeled specially in the stats!
184-
s.SessionCountVSCode == 1 &&
185+
s.SessionCountVscode == 1 &&
185186
// Ensure that connection latency is being counted!
186187
// If it isn't, it's set to -1.
187-
s.ConnectionMedianLatencyMS >= 0
188+
s.ConnectionMedianLatencyMs >= 0
188189
}, testutil.WaitLong, testutil.IntervalFast,
189190
"never saw stats",
190191
)
@@ -243,9 +244,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
243244
require.Eventuallyf(t, func() bool {
244245
s, ok := <-stats
245246
t.Logf("got stats with conn open: ok=%t, ConnectionCount=%d, SessionCountJetBrains=%d",
246-
ok, s.ConnectionCount, s.SessionCountJetBrains)
247+
ok, s.ConnectionCount, s.SessionCountJetbrains)
247248
return ok && s.ConnectionCount > 0 &&
248-
s.SessionCountJetBrains == 1
249+
s.SessionCountJetbrains == 1
249250
}, testutil.WaitLong, testutil.IntervalFast,
250251
"never saw stats with conn open",
251252
)
@@ -258,9 +259,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
258259
require.Eventuallyf(t, func() bool {
259260
s, ok := <-stats
260261
t.Logf("got stats after disconnect %t, %d",
261-
ok, s.SessionCountJetBrains)
262+
ok, s.SessionCountJetbrains)
262263
return ok &&
263-
s.SessionCountJetBrains == 0
264+
s.SessionCountJetbrains == 0
264265
}, testutil.WaitLong, testutil.IntervalFast,
265266
"never saw stats after conn closes",
266267
)
@@ -1346,7 +1347,7 @@ func TestAgent_Lifecycle(t *testing.T) {
13461347
RunOnStop: true,
13471348
}},
13481349
},
1349-
make(chan *agentsdk.Stats, 50),
1350+
make(chan *proto.Stats, 50),
13501351
tailnet.NewCoordinator(logger),
13511352
)
13521353
defer client.Close()
@@ -1667,7 +1668,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16671668
_ = coordinator.Close()
16681669
})
16691670
agentID := uuid.New()
1670-
statsCh := make(chan *agentsdk.Stats, 50)
1671+
statsCh := make(chan *proto.Stats, 50)
16711672
fs := afero.NewMemMapFs()
16721673
client := agenttest.NewClient(t,
16731674
logger.Named("agent"),
@@ -1816,7 +1817,7 @@ func TestAgent_Reconnect(t *testing.T) {
18161817
defer coordinator.Close()
18171818

18181819
agentID := uuid.New()
1819-
statsCh := make(chan *agentsdk.Stats, 50)
1820+
statsCh := make(chan *proto.Stats, 50)
18201821
derpMap, _ := tailnettest.RunDERPAndSTUN(t)
18211822
client := agenttest.NewClient(t,
18221823
logger,
@@ -1861,7 +1862,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
18611862
GitAuthConfigs: 1,
18621863
DERPMap: &tailcfg.DERPMap{},
18631864
},
1864-
make(chan *agentsdk.Stats, 50),
1865+
make(chan *proto.Stats, 50),
18651866
coordinator,
18661867
)
18671868
defer client.Close()
@@ -2018,7 +2019,7 @@ func setupSSHSession(
20182019
func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Duration, opts ...func(*agenttest.Client, *agent.Options)) (
20192020
*codersdk.WorkspaceAgentConn,
20202021
*agenttest.Client,
2021-
<-chan *agentsdk.Stats,
2022+
<-chan *proto.Stats,
20222023
afero.Fs,
20232024
agent.Agent,
20242025
) {
@@ -2046,7 +2047,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20462047
t.Cleanup(func() {
20472048
_ = coordinator.Close()
20482049
})
2049-
statsCh := make(chan *agentsdk.Stats, 50)
2050+
statsCh := make(chan *proto.Stats, 50)
20502051
fs := afero.NewMemMapFs()
20512052
c := agenttest.NewClient(t, logger.Named("agent"), metadata.AgentID, metadata, statsCh, coordinator)
20522053
t.Cleanup(c.Close)

0 commit comments

Comments
 (0)