@@ -54,6 +54,7 @@ type Options struct {
54
54
EnableWireguard bool
55
55
UploadWireguardKeys UploadWireguardKeys
56
56
ListenWireguardPeers ListenWireguardPeers
57
+ StatsReporter StatsReporter
57
58
ReconnectingPTYTimeout time.Duration
58
59
EnvironmentVariables map [string ]string
59
60
Logger slog.Logger
@@ -93,6 +94,10 @@ func New(dialer Dialer, options *Options) io.Closer {
93
94
enableWireguard : options .EnableWireguard ,
94
95
postKeys : options .UploadWireguardKeys ,
95
96
listenWireguardPeers : options .ListenWireguardPeers ,
97
+ stats : & Stats {
98
+ ActiveConns : make (map [int64 ]* ConnStats ),
99
+ },
100
+ statsReporter : options .StatsReporter ,
96
101
}
97
102
server .init (ctx )
98
103
return server
@@ -120,6 +125,9 @@ type agent struct {
120
125
network * peerwg.Network
121
126
postKeys UploadWireguardKeys
122
127
listenWireguardPeers ListenWireguardPeers
128
+
129
+ stats * Stats
130
+ statsReporter StatsReporter
123
131
}
124
132
125
133
func (a * agent ) run (ctx context.Context ) {
@@ -220,17 +228,17 @@ func (a *agent) runStartupScript(ctx context.Context, script string) error {
220
228
return nil
221
229
}
222
230
223
- func (a * agent ) handlePeerConn (ctx context.Context , conn * peer.Conn ) {
231
+ func (a * agent ) handlePeerConn (ctx context.Context , peerConn * peer.Conn ) {
224
232
go func () {
225
233
select {
226
234
case <- a .closed :
227
- case <- conn .Closed ():
235
+ case <- peerConn .Closed ():
228
236
}
229
- _ = conn .Close ()
237
+ _ = peerConn .Close ()
230
238
a .connCloseWait .Done ()
231
239
}()
232
240
for {
233
- channel , err := conn .Accept (ctx )
241
+ channel , err := peerConn .Accept (ctx )
234
242
if err != nil {
235
243
if errors .Is (err , peer .ErrClosed ) || a .isClosed () {
236
244
return
@@ -239,19 +247,21 @@ func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) {
239
247
return
240
248
}
241
249
242
- switch channel .Protocol () {
243
- case ProtocolSSH :
244
- go a .sshServer .HandleConn (channel .NetConn ())
245
- case ProtocolReconnectingPTY :
246
- go a .handleReconnectingPTY (ctx , channel .Label (), channel .NetConn ())
247
- case ProtocolDial :
248
- go a .handleDial (ctx , channel .Label (), channel .NetConn ())
249
- default :
250
- a .logger .Warn (ctx , "unhandled protocol from channel" ,
251
- slog .F ("protocol" , channel .Protocol ()),
252
- slog .F ("label" , channel .Label ()),
253
- )
254
- }
250
+ a .stats .goConn (channel .NetConn (), channel .Protocol (), func (conn net.Conn ) {
251
+ switch channel .Protocol () {
252
+ case ProtocolSSH :
253
+ a .sshServer .HandleConn (conn )
254
+ case ProtocolReconnectingPTY :
255
+ a .handleReconnectingPTY (ctx , channel .Label (), conn )
256
+ case ProtocolDial :
257
+ a .handleDial (ctx , channel .Label (), conn )
258
+ default :
259
+ a .logger .Warn (ctx , "unhandled protocol from channel" ,
260
+ slog .F ("protocol" , channel .Protocol ()),
261
+ slog .F ("label" , channel .Label ()),
262
+ )
263
+ }
264
+ })
255
265
}
256
266
}
257
267
@@ -339,6 +349,25 @@ func (a *agent) init(ctx context.Context) {
339
349
}
340
350
341
351
go a .run (ctx )
352
+ if a .statsReporter != nil {
353
+ // If each report is approximately 100 bytes, and send a report every
354
+ // 60 seconds, we send 60*24*100 or 144kB a day per agent. If there
355
+ // are 100 agents with a retention policy of 30 days, we have 432MB
356
+ // of logs, which we consider acceptable.
357
+ go func () {
358
+ timer := time .NewTimer (time .Minute )
359
+ defer timer .Stop ()
360
+
361
+ select {
362
+ case <- timer .C :
363
+ a .stats .RLock ()
364
+ a .statsReporter (a .stats )
365
+ a .stats .RUnlock ()
366
+ case <- ctx .Done ():
367
+ return
368
+ }
369
+ }()
370
+ }
342
371
}
343
372
344
373
// createCommand processes raw command input with OpenSSH-like behavior.
0 commit comments