@@ -32,6 +32,7 @@ import (
32
32
"golang.org/x/xerrors"
33
33
"tailscale.com/net/speedtest"
34
34
"tailscale.com/tailcfg"
35
+ "tailscale.com/types/netlogtype"
35
36
36
37
"cdr.dev/slog"
37
38
"github.com/coder/coder/agent/usershell"
@@ -98,7 +99,6 @@ func New(options Options) io.Closer {
98
99
exchangeToken : options .ExchangeToken ,
99
100
filesystem : options .Filesystem ,
100
101
tempDir : options .TempDir ,
101
- stats : & Stats {},
102
102
}
103
103
server .init (ctx )
104
104
return server
@@ -126,7 +126,6 @@ type agent struct {
126
126
sshServer * ssh.Server
127
127
128
128
network * tailnet.Conn
129
- stats * Stats
130
129
}
131
130
132
131
// runLoop attempts to start the agent in a retry loop.
@@ -238,22 +237,16 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
238
237
return nil , xerrors .New ("closed" )
239
238
}
240
239
network , err := tailnet .NewConn (& tailnet.Options {
241
- Addresses : []netip.Prefix {netip .PrefixFrom (codersdk .TailnetIP , 128 )},
242
- DERPMap : derpMap ,
243
- Logger : a .logger .Named ("tailnet" ),
240
+ Addresses : []netip.Prefix {netip .PrefixFrom (codersdk .TailnetIP , 128 )},
241
+ DERPMap : derpMap ,
242
+ Logger : a .logger .Named ("tailnet" ),
243
+ EnableTrafficStats : true ,
244
244
})
245
245
if err != nil {
246
246
a .closeMutex .Unlock ()
247
247
return nil , xerrors .Errorf ("create tailnet: %w" , err )
248
248
}
249
249
a .network = network
250
- network .SetForwardTCPCallback (func (conn net.Conn , listenerExists bool ) net.Conn {
251
- if listenerExists {
252
- // If a listener already exists, we would double-wrap the conn.
253
- return conn
254
- }
255
- return a .stats .wrapConn (conn )
256
- })
257
250
a .connCloseWait .Add (4 )
258
251
a .closeMutex .Unlock ()
259
252
@@ -268,7 +261,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
268
261
if err != nil {
269
262
return
270
263
}
271
- go a .sshServer .HandleConn (a . stats . wrapConn ( conn ) )
264
+ go a .sshServer .HandleConn (conn )
272
265
}
273
266
}()
274
267
@@ -284,7 +277,6 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
284
277
a .logger .Debug (ctx , "accept pty failed" , slog .Error (err ))
285
278
return
286
279
}
287
- conn = a .stats .wrapConn (conn )
288
280
// This cannot use a JSON decoder, since that can
289
281
// buffer additional data that is required for the PTY.
290
282
rawLen := make ([]byte , 2 )
@@ -523,7 +515,13 @@ func (a *agent) init(ctx context.Context) {
523
515
524
516
go a .runLoop (ctx )
525
517
cl , err := a .client .AgentReportStats (ctx , a .logger , func () * codersdk.AgentStats {
526
- return a .stats .Copy ()
518
+ stats := map [netlogtype.Connection ]netlogtype.Counts {}
519
+ a .closeMutex .Lock ()
520
+ if a .network != nil {
521
+ stats = a .network .ExtractTrafficStats ()
522
+ }
523
+ a .closeMutex .Unlock ()
524
+ return convertAgentStats (stats )
527
525
})
528
526
if err != nil {
529
527
a .logger .Error (ctx , "report stats" , slog .Error (err ))
@@ -537,6 +535,23 @@ func (a *agent) init(ctx context.Context) {
537
535
}()
538
536
}
539
537
538
+ func convertAgentStats (counts map [netlogtype.Connection ]netlogtype.Counts ) * codersdk.AgentStats {
539
+ stats := & codersdk.AgentStats {
540
+ ConnsByProto : map [string ]int64 {},
541
+ NumConns : int64 (len (counts )),
542
+ }
543
+
544
+ for conn , count := range counts {
545
+ stats .ConnsByProto [conn .Proto .String ()]++
546
+ stats .RxPackets += int64 (count .RxPackets )
547
+ stats .RxBytes += int64 (count .RxBytes )
548
+ stats .TxPackets += int64 (count .TxPackets )
549
+ stats .TxBytes += int64 (count .TxBytes )
550
+ }
551
+
552
+ return stats
553
+ }
554
+
540
555
// createCommand processes raw command input with OpenSSH-like behavior.
541
556
// If the rawCommand provided is empty, it will default to the users shell.
542
557
// This injects environment variables specified by the user at launch too.
0 commit comments