@@ -18,6 +18,7 @@ import (
18
18
"os/user"
19
19
"path/filepath"
20
20
"runtime"
21
+ "sort"
21
22
"strconv"
22
23
"strings"
23
24
"sync"
@@ -56,6 +57,14 @@ const (
56
57
// command just returning a nonzero exit code, and is chosen as an arbitrary, high number
57
58
// unlikely to shadow other exit codes, which are typically 1, 2, 3, etc.
58
59
MagicSessionErrorCode = 229
60
+
61
+ // MagicSSHSessionTypeEnvironmentVariable is used to track the purpose behind an SSH connection.
62
+ // This is stripped from any commands being executed, and is counted towards connection stats.
63
+ MagicSSHSessionTypeEnvironmentVariable = "__CODER_SSH_SESSION_TYPE"
64
+ // MagicSSHSessionTypeVSCode is set in the SSH config by the VS Code extension to identify itself.
65
+ MagicSSHSessionTypeVSCode = "vscode"
66
+ // MagicSSHSessionTypeJetBrains is set in the SSH config by the JetBrains extension to identify itself.
67
+ MagicSSHSessionTypeJetBrains = "jetbrains"
59
68
)
60
69
61
70
type Options struct {
@@ -146,6 +155,15 @@ type agent struct {
146
155
147
156
network * tailnet.Conn
148
157
connStatsChan chan * agentsdk.Stats
158
+
159
+ statRxPackets atomic.Int64
160
+ statRxBytes atomic.Int64
161
+ statTxPackets atomic.Int64
162
+ statTxBytes atomic.Int64
163
+ connCountVSCode atomic.Int64
164
+ connCountJetBrains atomic.Int64
165
+ connCountReconnectingPTY atomic.Int64
166
+ connCountSSHSession atomic.Int64
149
167
}
150
168
151
169
// runLoop attempts to start the agent in a retry loop.
@@ -350,33 +368,7 @@ func (a *agent) run(ctx context.Context) error {
350
368
return xerrors .New ("agent is closed" )
351
369
}
352
370
353
- setStatInterval := func (d time.Duration ) {
354
- network .SetConnStatsCallback (d , 2048 ,
355
- func (_ , _ time.Time , virtual , _ map [netlogtype.Connection ]netlogtype.Counts ) {
356
- select {
357
- case a .connStatsChan <- convertAgentStats (virtual ):
358
- default :
359
- a .logger .Warn (ctx , "network stat dropped" )
360
- }
361
- },
362
- )
363
- }
364
-
365
- // Report statistics from the created network.
366
- cl , err := a .client .ReportStats (ctx , a .logger , a .connStatsChan , setStatInterval )
367
- if err != nil {
368
- a .logger .Error (ctx , "report stats" , slog .Error (err ))
369
- } else {
370
- if err = a .trackConnGoroutine (func () {
371
- // This is OK because the agent never re-creates the tailnet
372
- // and the only shutdown indicator is agent.Close().
373
- <- a .closed
374
- _ = cl .Close ()
375
- }); err != nil {
376
- a .logger .Debug (ctx , "report stats goroutine" , slog .Error (err ))
377
- _ = cl .Close ()
378
- }
379
- }
371
+ a .startReportingConnectionStats (ctx )
380
372
} else {
381
373
// Update the DERP map!
382
374
network .SetDERPMap (metadata .DERPMap )
@@ -765,23 +757,6 @@ func (a *agent) init(ctx context.Context) {
765
757
go a .runLoop (ctx )
766
758
}
767
759
768
- func convertAgentStats (counts map [netlogtype.Connection ]netlogtype.Counts ) * agentsdk.Stats {
769
- stats := & agentsdk.Stats {
770
- ConnectionsByProto : map [string ]int64 {},
771
- ConnectionCount : int64 (len (counts )),
772
- }
773
-
774
- for conn , count := range counts {
775
- stats .ConnectionsByProto [conn .Proto .String ()]++
776
- stats .RxPackets += int64 (count .RxPackets )
777
- stats .RxBytes += int64 (count .RxBytes )
778
- stats .TxPackets += int64 (count .TxPackets )
779
- stats .TxBytes += int64 (count .TxBytes )
780
- }
781
-
782
- return stats
783
- }
784
-
785
760
// createCommand processes raw command input with OpenSSH-like behavior.
786
761
// If the rawCommand provided is empty, it will default to the users shell.
787
762
// This injects environment variables specified by the user at launch too.
@@ -892,7 +867,27 @@ func (a *agent) createCommand(ctx context.Context, rawCommand string, env []stri
892
867
893
868
func (a * agent ) handleSSHSession (session ssh.Session ) (retErr error ) {
894
869
ctx := session .Context ()
895
- cmd , err := a .createCommand (ctx , session .RawCommand (), session .Environ ())
870
+ env := session .Environ ()
871
+ var magicType string
872
+ for index , kv := range env {
873
+ if ! strings .HasPrefix (kv , MagicSSHSessionTypeEnvironmentVariable ) {
874
+ continue
875
+ }
876
+ magicType = strings .TrimPrefix (kv , MagicSSHSessionTypeEnvironmentVariable + "=" )
877
+ env = append (env [:index ], env [index + 1 :]... )
878
+ }
879
+ switch magicType {
880
+ case MagicSSHSessionTypeVSCode :
881
+ a .connCountVSCode .Add (1 )
882
+ case MagicSSHSessionTypeJetBrains :
883
+ a .connCountJetBrains .Add (1 )
884
+ case "" :
885
+ a .connCountSSHSession .Add (1 )
886
+ default :
887
+ a .logger .Warn (ctx , "invalid magic ssh session type specified" , slog .F ("type" , magicType ))
888
+ }
889
+
890
+ cmd , err := a .createCommand (ctx , session .RawCommand (), env )
896
891
if err != nil {
897
892
return err
898
893
}
@@ -990,6 +985,8 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
990
985
func (a * agent ) handleReconnectingPTY (ctx context.Context , logger slog.Logger , msg codersdk.WorkspaceAgentReconnectingPTYInit , conn net.Conn ) (retErr error ) {
991
986
defer conn .Close ()
992
987
988
+ a .connCountReconnectingPTY .Add (1 )
989
+
993
990
connectionID := uuid .NewString ()
994
991
logger = logger .With (slog .F ("id" , msg .ID ), slog .F ("connection_id" , connectionID ))
995
992
@@ -1180,6 +1177,103 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1180
1177
}
1181
1178
}
1182
1179
1180
+ // startReportingConnectionStats runs the connection stats reporting goroutine.
1181
+ func (a * agent ) startReportingConnectionStats (ctx context.Context ) {
1182
+ reportStats := func (networkStats map [netlogtype.Connection ]netlogtype.Counts ) {
1183
+ stats := & agentsdk.Stats {
1184
+ ConnectionCount : int64 (len (networkStats )),
1185
+ ConnectionsByProto : map [string ]int64 {},
1186
+ }
1187
+ // Tailscale resets counts on every report!
1188
+ // We'd rather have these compound, like Linux does!
1189
+ for conn , counts := range networkStats {
1190
+ stats .ConnectionsByProto [conn .Proto .String ()]++
1191
+ stats .RxBytes = a .statRxBytes .Add (int64 (counts .RxBytes ))
1192
+ stats .RxPackets = a .statRxPackets .Add (int64 (counts .RxPackets ))
1193
+ stats .TxBytes = a .statTxBytes .Add (int64 (counts .TxBytes ))
1194
+ stats .TxPackets = a .statTxPackets .Add (int64 (counts .TxPackets ))
1195
+ }
1196
+
1197
+ // Tailscale's connection stats are not cumulative, but it makes no sense to make
1198
+ // ours temporary.
1199
+ stats .SessionCountSSH = a .connCountSSHSession .Load ()
1200
+ stats .SessionCountVSCode = a .connCountVSCode .Load ()
1201
+ stats .SessionCountJetBrains = a .connCountJetBrains .Load ()
1202
+ stats .SessionCountReconnectingPTY = a .connCountReconnectingPTY .Load ()
1203
+
1204
+ // Compute the median connection latency!
1205
+ var wg sync.WaitGroup
1206
+ var mu sync.Mutex
1207
+ status := a .network .Status ()
1208
+ durations := []float64 {}
1209
+ ctx , cancelFunc := context .WithTimeout (ctx , 5 * time .Second )
1210
+ defer cancelFunc ()
1211
+ for nodeID , peer := range status .Peer {
1212
+ if ! peer .Active {
1213
+ continue
1214
+ }
1215
+ addresses , found := a .network .NodeAddresses (nodeID )
1216
+ if ! found {
1217
+ continue
1218
+ }
1219
+ if len (addresses ) == 0 {
1220
+ continue
1221
+ }
1222
+ wg .Add (1 )
1223
+ go func () {
1224
+ defer wg .Done ()
1225
+ duration , _ , _ , err := a .network .Ping (ctx , addresses [0 ].Addr ())
1226
+ if err != nil {
1227
+ return
1228
+ }
1229
+ mu .Lock ()
1230
+ durations = append (durations , float64 (duration .Microseconds ()))
1231
+ mu .Unlock ()
1232
+ }()
1233
+ }
1234
+ wg .Wait ()
1235
+ sort .Float64s (durations )
1236
+ durationsLength := len (durations )
1237
+ if durationsLength == 0 {
1238
+ stats .ConnectionMedianLatencyMS = - 1
1239
+ } else if durationsLength % 2 == 0 {
1240
+ stats .ConnectionMedianLatencyMS = (durations [durationsLength / 2 - 1 ] + durations [durationsLength / 2 ]) / 2
1241
+ } else {
1242
+ stats .ConnectionMedianLatencyMS = durations [durationsLength / 2 ]
1243
+ }
1244
+ // Convert from microseconds to milliseconds.
1245
+ stats .ConnectionMedianLatencyMS /= 1000
1246
+
1247
+ select {
1248
+ case a .connStatsChan <- stats :
1249
+ default :
1250
+ a .logger .Warn (ctx , "network stat dropped" )
1251
+ }
1252
+ }
1253
+
1254
+ // Report statistics from the created network.
1255
+ cl , err := a .client .ReportStats (ctx , a .logger , a .connStatsChan , func (d time.Duration ) {
1256
+ a .network .SetConnStatsCallback (d , 2048 ,
1257
+ func (_ , _ time.Time , virtual , _ map [netlogtype.Connection ]netlogtype.Counts ) {
1258
+ reportStats (virtual )
1259
+ },
1260
+ )
1261
+ })
1262
+ if err != nil {
1263
+ a .logger .Error (ctx , "report stats" , slog .Error (err ))
1264
+ } else {
1265
+ if err = a .trackConnGoroutine (func () {
1266
+ // This is OK because the agent never re-creates the tailnet
1267
+ // and the only shutdown indicator is agent.Close().
1268
+ <- a .closed
1269
+ _ = cl .Close ()
1270
+ }); err != nil {
1271
+ a .logger .Debug (ctx , "report stats goroutine" , slog .Error (err ))
1272
+ _ = cl .Close ()
1273
+ }
1274
+ }
1275
+ }
1276
+
1183
1277
// isClosed returns whether the API is closed or not.
1184
1278
func (a * agent ) isClosed () bool {
1185
1279
select {
0 commit comments