@@ -89,7 +89,6 @@ type Options struct {
89
89
90
90
type Client interface {
91
91
ConnectRPC (ctx context.Context ) (drpc.Conn , error )
92
- ReportStats (ctx context.Context , log slog.Logger , statsChan <- chan * agentsdk.Stats , setInterval func (time.Duration )) (io.Closer , error )
93
92
PostLifecycle (ctx context.Context , state agentsdk.PostLifecycleRequest ) error
94
93
PostMetadata (ctx context.Context , req agentsdk.PostMetadataRequest ) error
95
94
PatchLogs (ctx context.Context , req agentsdk.PatchLogs ) error
@@ -158,7 +157,6 @@ func New(options Options) Agent {
158
157
lifecycleStates : []agentsdk.PostLifecycleRequest {{State : codersdk .WorkspaceAgentLifecycleCreated }},
159
158
ignorePorts : options .IgnorePorts ,
160
159
portCacheDuration : options .PortCacheDuration ,
161
- connStatsChan : make (chan * agentsdk.Stats , 1 ),
162
160
reportMetadataInterval : options .ReportMetadataInterval ,
163
161
serviceBannerRefreshInterval : options .ServiceBannerRefreshInterval ,
164
162
sshMaxTimeout : options .SSHMaxTimeout ,
@@ -216,8 +214,7 @@ type agent struct {
216
214
217
215
network * tailnet.Conn
218
216
addresses []netip.Prefix
219
- connStatsChan chan * agentsdk.Stats
220
- latestStat atomic.Pointer [agentsdk.Stats ]
217
+ statsReporter * statsReporter
221
218
222
219
connCountReconnectingPTY atomic.Int64
223
220
@@ -822,14 +819,13 @@ func (a *agent) run(ctx context.Context) error {
822
819
closed := a .isClosed ()
823
820
if ! closed {
824
821
a .network = network
822
+ a .statsReporter = newStatsReporter (a .logger , network , a )
825
823
}
826
824
a .closeMutex .Unlock ()
827
825
if closed {
828
826
_ = network .Close ()
829
827
return xerrors .New ("agent is closed" )
830
828
}
831
-
832
- a .startReportingConnectionStats (ctx )
833
829
} else {
834
830
// Update the wireguard IPs if the agent ID changed.
835
831
err := network .SetAddresses (a .wireguardAddresses (manifest .AgentID ))
@@ -871,6 +867,15 @@ func (a *agent) run(ctx context.Context) error {
871
867
return nil
872
868
})
873
869
870
+ eg .Go (func () error {
871
+ a .logger .Debug (egCtx , "running stats report loop" )
872
+ err := a .statsReporter .reportLoop (egCtx , aAPI )
873
+ if err != nil {
874
+ return xerrors .Errorf ("report stats loop: %w" , err )
875
+ }
876
+ return nil
877
+ })
878
+
874
879
return eg .Wait ()
875
880
}
876
881
@@ -1218,115 +1223,83 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1218
1223
return rpty .Attach (ctx , connectionID , conn , msg .Height , msg .Width , connLogger )
1219
1224
}
1220
1225
1221
- // startReportingConnectionStats runs the connection stats reporting goroutine.
1222
- func (a * agent ) startReportingConnectionStats (ctx context.Context ) {
1223
- reportStats := func (networkStats map [netlogtype.Connection ]netlogtype.Counts ) {
1224
- a .logger .Debug (ctx , "computing stats report" )
1225
- stats := & agentsdk.Stats {
1226
- ConnectionCount : int64 (len (networkStats )),
1227
- ConnectionsByProto : map [string ]int64 {},
1228
- }
1229
- for conn , counts := range networkStats {
1230
- stats .ConnectionsByProto [conn .Proto .String ()]++
1231
- stats .RxBytes += int64 (counts .RxBytes )
1232
- stats .RxPackets += int64 (counts .RxPackets )
1233
- stats .TxBytes += int64 (counts .TxBytes )
1234
- stats .TxPackets += int64 (counts .TxPackets )
1235
- }
1236
-
1237
- // The count of active sessions.
1238
- sshStats := a .sshServer .ConnStats ()
1239
- stats .SessionCountSSH = sshStats .Sessions
1240
- stats .SessionCountVSCode = sshStats .VSCode
1241
- stats .SessionCountJetBrains = sshStats .JetBrains
1242
-
1243
- stats .SessionCountReconnectingPTY = a .connCountReconnectingPTY .Load ()
1244
-
1245
- // Compute the median connection latency!
1246
- a .logger .Debug (ctx , "starting peer latency measurement for stats" )
1247
- var wg sync.WaitGroup
1248
- var mu sync.Mutex
1249
- status := a .network .Status ()
1250
- durations := []float64 {}
1251
- pingCtx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1252
- defer cancelFunc ()
1253
- for nodeID , peer := range status .Peer {
1254
- if ! peer .Active {
1255
- continue
1256
- }
1257
- addresses , found := a .network .NodeAddresses (nodeID )
1258
- if ! found {
1259
- continue
1260
- }
1261
- if len (addresses ) == 0 {
1262
- continue
1263
- }
1264
- wg .Add (1 )
1265
- go func () {
1266
- defer wg .Done ()
1267
- duration , _ , _ , err := a .network .Ping (pingCtx , addresses [0 ].Addr ())
1268
- if err != nil {
1269
- return
1270
- }
1271
- mu .Lock ()
1272
- durations = append (durations , float64 (duration .Microseconds ()))
1273
- mu .Unlock ()
1274
- }()
1226
+ // Collect collects additional stats from the agent
1227
+ func (a * agent ) Collect (ctx context.Context , networkStats map [netlogtype.Connection ]netlogtype.Counts ) * proto.Stats {
1228
+ a .logger .Debug (context .Background (), "computing stats report" )
1229
+ stats := & proto.Stats {
1230
+ ConnectionCount : int64 (len (networkStats )),
1231
+ ConnectionsByProto : map [string ]int64 {},
1232
+ }
1233
+ for conn , counts := range networkStats {
1234
+ stats .ConnectionsByProto [conn .Proto .String ()]++
1235
+ stats .RxBytes += int64 (counts .RxBytes )
1236
+ stats .RxPackets += int64 (counts .RxPackets )
1237
+ stats .TxBytes += int64 (counts .TxBytes )
1238
+ stats .TxPackets += int64 (counts .TxPackets )
1239
+ }
1240
+
1241
+ // The count of active sessions.
1242
+ sshStats := a .sshServer .ConnStats ()
1243
+ stats .SessionCountSsh = sshStats .Sessions
1244
+ stats .SessionCountVscode = sshStats .VSCode
1245
+ stats .SessionCountJetbrains = sshStats .JetBrains
1246
+
1247
+ stats .SessionCountReconnectingPty = a .connCountReconnectingPTY .Load ()
1248
+
1249
+ // Compute the median connection latency!
1250
+ a .logger .Debug (ctx , "starting peer latency measurement for stats" )
1251
+ var wg sync.WaitGroup
1252
+ var mu sync.Mutex
1253
+ status := a .network .Status ()
1254
+ durations := []float64 {}
1255
+ pingCtx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1256
+ defer cancelFunc ()
1257
+ for nodeID , peer := range status .Peer {
1258
+ if ! peer .Active {
1259
+ continue
1275
1260
}
1276
- wg .Wait ()
1277
- sort .Float64s (durations )
1278
- durationsLength := len (durations )
1279
- if durationsLength == 0 {
1280
- stats .ConnectionMedianLatencyMS = - 1
1281
- } else if durationsLength % 2 == 0 {
1282
- stats .ConnectionMedianLatencyMS = (durations [durationsLength / 2 - 1 ] + durations [durationsLength / 2 ]) / 2
1283
- } else {
1284
- stats .ConnectionMedianLatencyMS = durations [durationsLength / 2 ]
1261
+ addresses , found := a .network .NodeAddresses (nodeID )
1262
+ if ! found {
1263
+ continue
1285
1264
}
1286
- // Convert from microseconds to milliseconds.
1287
- stats .ConnectionMedianLatencyMS /= 1000
1288
-
1289
- // Collect agent metrics.
1290
- // Agent metrics are changing all the time, so there is no need to perform
1291
- // reflect.DeepEqual to see if stats should be transferred.
1292
-
1293
- metricsCtx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1294
- defer cancelFunc ()
1295
- a .logger .Debug (ctx , "collecting agent metrics for stats" )
1296
- stats .Metrics = a .collectMetrics (metricsCtx )
1297
-
1298
- a .latestStat .Store (stats )
1299
-
1300
- a .logger .Debug (ctx , "about to send stats" )
1301
- select {
1302
- case a .connStatsChan <- stats :
1303
- a .logger .Debug (ctx , "successfully sent stats" )
1304
- case <- a .closed :
1305
- a .logger .Debug (ctx , "didn't send stats because we are closed" )
1265
+ if len (addresses ) == 0 {
1266
+ continue
1306
1267
}
1268
+ wg .Add (1 )
1269
+ go func () {
1270
+ defer wg .Done ()
1271
+ duration , _ , _ , err := a .network .Ping (pingCtx , addresses [0 ].Addr ())
1272
+ if err != nil {
1273
+ return
1274
+ }
1275
+ mu .Lock ()
1276
+ durations = append (durations , float64 (duration .Microseconds ()))
1277
+ mu .Unlock ()
1278
+ }()
1307
1279
}
1308
-
1309
- // Report statistics from the created network.
1310
- cl , err := a .client .ReportStats (ctx , a .logger , a .connStatsChan , func (d time.Duration ) {
1311
- a .network .SetConnStatsCallback (d , 2048 ,
1312
- func (_ , _ time.Time , virtual , _ map [netlogtype.Connection ]netlogtype.Counts ) {
1313
- reportStats (virtual )
1314
- },
1315
- )
1316
- })
1317
- if err != nil {
1318
- a .logger .Error (ctx , "agent failed to report stats" , slog .Error (err ))
1280
+ wg .Wait ()
1281
+ sort .Float64s (durations )
1282
+ durationsLength := len (durations )
1283
+ if durationsLength == 0 {
1284
+ stats .ConnectionMedianLatencyMs = - 1
1285
+ } else if durationsLength % 2 == 0 {
1286
+ stats .ConnectionMedianLatencyMs = (durations [durationsLength / 2 - 1 ] + durations [durationsLength / 2 ]) / 2
1319
1287
} else {
1320
- if err = a .trackConnGoroutine (func () {
1321
- // This is OK because the agent never re-creates the tailnet
1322
- // and the only shutdown indicator is agent.Close().
1323
- <- a .closed
1324
- _ = cl .Close ()
1325
- }); err != nil {
1326
- a .logger .Debug (ctx , "report stats goroutine" , slog .Error (err ))
1327
- _ = cl .Close ()
1328
- }
1288
+ stats .ConnectionMedianLatencyMs = durations [durationsLength / 2 ]
1329
1289
}
1290
+ // Convert from microseconds to milliseconds.
1291
+ stats .ConnectionMedianLatencyMs /= 1000
1292
+
1293
+ // Collect agent metrics.
1294
+ // Agent metrics are changing all the time, so there is no need to perform
1295
+ // reflect.DeepEqual to see if stats should be transferred.
1296
+
1297
+ metricsCtx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1298
+ defer cancelFunc ()
1299
+ a .logger .Debug (ctx , "collecting agent metrics for stats" )
1300
+ stats .Metrics = a .collectMetrics (metricsCtx )
1301
+
1302
+ return stats
1330
1303
}
1331
1304
1332
1305
var prioritizedProcs = []string {"coder agent" }
0 commit comments