Skip to content

Commit e8db21c

Browse files
chore: add additional network telemetry stats & events (#13800)
1 parent 38035da commit e8db21c

File tree

9 files changed

+279
-79
lines changed

9 files changed

+279
-79
lines changed

cli/ssh.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func (r *RootCmd) ssh() *serpent.Command {
437437
}
438438

439439
err = sshSession.Wait()
440-
conn.SendDisconnectedTelemetry("ssh")
440+
conn.SendDisconnectedTelemetry()
441441
if err != nil {
442442
if exitErr := (&gossh.ExitError{}); errors.As(err, &exitErr) {
443443
// Clear the error since it's not useful beyond

coderd/telemetry/telemetry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,7 +1240,7 @@ type NetworkEvent struct {
12401240
NodeIDSelf uint64 `json:"node_id_self"`
12411241
NodeIDRemote uint64 `json:"node_id_remote"`
12421242
P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"`
1243-
HomeDERP string `json:"home_derp"`
1243+
HomeDERP int `json:"home_derp"`
12441244
DERPMap DERPMap `json:"derp_map"`
12451245
LatestNetcheck Netcheck `json:"latest_netcheck"`
12461246

@@ -1286,7 +1286,7 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
12861286
NodeIDSelf: proto.NodeIdSelf,
12871287
NodeIDRemote: proto.NodeIdRemote,
12881288
P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint),
1289-
HomeDERP: proto.HomeDerp,
1289+
HomeDERP: int(proto.HomeDerp),
12901290
DERPMap: derpMapFromProto(proto.DerpMap),
12911291
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),
12921292

codersdk/workspacesdk/agentconn.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,3 @@ func (c *AgentConn) apiClient() *http.Client {
380380
func (c *AgentConn) GetPeerDiagnostics() tailnet.PeerDiagnostics {
381381
return c.Conn.GetPeerDiagnostics(c.opts.AgentID)
382382
}
383-
384-
func (c *AgentConn) SendDisconnectedTelemetry(application string) {
385-
c.Conn.SendDisconnectedTelemetry(c.agentAddress(), application)
386-
}

codersdk/workspacesdk/connector_internal_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,12 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
228228
return uut.client != nil
229229
}, testutil.WaitShort, testutil.IntervalFast)
230230

231-
fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
231+
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
232232
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
233233
require.False(t, uut.telemetryUnavailable.Load())
234234
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
235235

236-
fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
236+
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
237237
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
238238
require.True(t, uut.telemetryUnavailable.Load())
239239
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
@@ -268,12 +268,12 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
268268
return uut.client != nil
269269
}, testutil.WaitShort, testutil.IntervalFast)
270270

271-
fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("Protocol Error")
271+
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("Protocol Error")
272272
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
273273
require.False(t, uut.telemetryUnavailable.Load())
274274
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
275275

276-
fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
276+
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
277277
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
278278
require.True(t, uut.telemetryUnavailable.Load())
279279
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
@@ -301,7 +301,7 @@ func newFakeTailnetConn() *fakeTailnetConn {
301301

302302
type fakeDRPCClient struct {
303303
postTelemetryCalls int64
304-
telemeteryErorr error
304+
telemetryError error
305305
fakeDRPPCMapStream
306306
}
307307

@@ -331,7 +331,7 @@ func (*fakeDRPCClient) DRPCConn() drpc.Conn {
331331
// PostTelemetry implements proto.DRPCTailnetClient.
332332
func (f *fakeDRPCClient) PostTelemetry(_ context.Context, _ *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
333333
atomic.AddInt64(&f.postTelemetryCalls, 1)
334-
return nil, f.telemeteryErorr
334+
return nil, f.telemetryError
335335
}
336336

337337
// StreamDERPMaps implements proto.DRPCTailnetClient.

tailnet/conn.go

Lines changed: 77 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"tailscale.com/types/key"
3232
tslogger "tailscale.com/types/logger"
3333
"tailscale.com/types/netlogtype"
34+
"tailscale.com/types/netmap"
3435
"tailscale.com/wgengine"
3536
"tailscale.com/wgengine/capture"
3637
"tailscale.com/wgengine/magicsock"
@@ -262,17 +263,8 @@ func NewConn(options *Options) (conn *Conn, err error) {
262263
)
263264
nodeUp.setAddresses(options.Addresses)
264265
nodeUp.setBlockEndpoints(options.BlockEndpoints)
265-
wireguardEngine.SetStatusCallback(nodeUp.setStatus)
266-
magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
267-
if telemetryStore != nil {
268-
wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) {
269-
nodeUp.setNetInfo(ni)
270-
telemetryStore.setNetInfo(ni)
271-
})
272-
} else {
273-
wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
274-
}
275266

267+
ctx, ctxCancel := context.WithCancel(context.Background())
276268
server := &Conn{
277269
id: uuid.New(),
278270
closed: make(chan struct{}),
@@ -290,13 +282,32 @@ func NewConn(options *Options) (conn *Conn, err error) {
290282
configMaps: cfgMaps,
291283
nodeUpdater: nodeUp,
292284
telemetrySink: options.TelemetrySink,
293-
telemeteryStore: telemetryStore,
285+
telemetryStore: telemetryStore,
286+
createdAt: time.Now(),
287+
watchCtx: ctx,
288+
watchCancel: ctxCancel,
294289
}
295290
defer func() {
296291
if err != nil {
297292
_ = server.Close()
298293
}
299294
}()
295+
if server.telemetryStore != nil {
296+
server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) {
297+
server.telemetryStore.setNetInfo(ni)
298+
nodeUp.setNetInfo(ni)
299+
server.telemetryStore.pingPeer(server)
300+
})
301+
server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) {
302+
server.telemetryStore.updateNetworkMap(nm)
303+
server.telemetryStore.pingPeer(server)
304+
})
305+
go server.watchConnChange()
306+
} else {
307+
server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
308+
}
309+
server.wireguardEngine.SetStatusCallback(nodeUp.setStatus)
310+
server.magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
300311

301312
netStack.GetTCPHandlerForFlow = server.forwardTCP
302313

@@ -351,11 +362,15 @@ type Conn struct {
351362
wireguardEngine wgengine.Engine
352363
listeners map[listenKey]*listener
353364
clientType proto.TelemetryEvent_ClientType
365+
createdAt time.Time
354366

355367
telemetrySink TelemetrySink
356-
// telemeteryStore will be nil if telemetrySink is nil.
357-
telemeteryStore *TelemetryStore
358-
telemetryWg sync.WaitGroup
368+
// telemetryStore will be nil if telemetrySink is nil.
369+
telemetryStore *TelemetryStore
370+
telemetryWg sync.WaitGroup
371+
372+
watchCtx context.Context
373+
watchCancel func()
359374

360375
trafficStats *connstats.Statistics
361376
}
@@ -390,8 +405,8 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {
390405

391406
// SetDERPMap updates the DERPMap of a connection.
392407
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
393-
if c.configMaps.setDERPMap(derpMap) && c.telemeteryStore != nil {
394-
c.telemeteryStore.updateDerpMap(derpMap)
408+
if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil {
409+
c.telemetryStore.updateDerpMap(derpMap)
395410
}
396411
}
397412

@@ -540,6 +555,7 @@ func (c *Conn) Closed() <-chan struct{} {
540555
// Close shuts down the Wireguard connection.
541556
func (c *Conn) Close() error {
542557
c.logger.Info(context.Background(), "closing tailnet Conn")
558+
c.watchCancel()
543559
c.telemetryWg.Wait()
544560
c.configMaps.close()
545561
c.nodeUpdater.close()
@@ -709,54 +725,34 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
709725
c.magicConn.ServeHTTPDebug(w, r)
710726
}
711727

728+
// SendConnectedTelemetry should be called when connection to a peer with the given IP is established.
712729
func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) {
713730
if c.telemetrySink == nil {
714731
return
715732
}
733+
c.telemetryStore.markConnected(&ip, application)
716734
e := c.newTelemetryEvent()
717735
e.Status = proto.TelemetryEvent_CONNECTED
718-
e.Application = application
719-
pip, ok := c.wireguardEngine.PeerForIP(ip)
720-
if ok {
721-
e.NodeIdRemote = uint64(pip.Node.ID)
722-
}
723-
c.telemetryWg.Add(1)
724-
go func() {
725-
defer c.telemetryWg.Done()
726-
c.telemetrySink.SendTelemetryEvent(e)
727-
}()
736+
c.sendTelemetryBackground(e)
728737
}
729738

730-
func (c *Conn) SendDisconnectedTelemetry(ip netip.Addr, application string) {
739+
func (c *Conn) SendDisconnectedTelemetry() {
731740
if c.telemetrySink == nil {
732741
return
733742
}
734743
e := c.newTelemetryEvent()
735744
e.Status = proto.TelemetryEvent_DISCONNECTED
736-
e.Application = application
737-
pip, ok := c.wireguardEngine.PeerForIP(ip)
738-
if ok {
739-
e.NodeIdRemote = uint64(pip.Node.ID)
740-
}
741-
c.telemetryWg.Add(1)
742-
go func() {
743-
defer c.telemetryWg.Done()
744-
c.telemetrySink.SendTelemetryEvent(e)
745-
}()
745+
c.sendTelemetryBackground(e)
746746
}
747747

748748
func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) {
749749
if c.telemetrySink == nil {
750750
return
751751
}
752752
e := c.newTelemetryEvent()
753-
e.Status = proto.TelemetryEvent_CONNECTED
754753
e.ThroughputMbits = wrapperspb.Float(float32(throughputMbits))
755-
c.telemetryWg.Add(1)
756-
go func() {
757-
defer c.telemetryWg.Done()
758-
c.telemetrySink.SendTelemetryEvent(e)
759-
}()
754+
e.Status = proto.TelemetryEvent_CONNECTED
755+
c.sendTelemetryBackground(e)
760756
}
761757

762758
// nolint:revive
@@ -769,31 +765,59 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
769765
latency := durationpb.New(time.Duration(pr.LatencySeconds * float64(time.Second)))
770766
if pr.Endpoint != "" {
771767
e.P2PLatency = latency
772-
e.P2PEndpoint = c.telemeteryStore.toEndpoint(pr.Endpoint)
768+
e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint)
773769
} else {
774770
e.DerpLatency = latency
775771
}
776772
e.Status = proto.TelemetryEvent_CONNECTED
777-
c.telemetryWg.Add(1)
778-
go func() {
779-
defer c.telemetryWg.Done()
780-
c.telemetrySink.SendTelemetryEvent(e)
781-
}()
773+
c.sendTelemetryBackground(e)
782774
}
783775

784776
// The returned telemetry event will not have it's status set.
785777
func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent {
786778
// Infallible
787779
id, _ := c.id.MarshalBinary()
788-
event := c.telemeteryStore.newEvent()
780+
event := c.telemetryStore.newEvent()
789781
event.ClientType = c.clientType
790782
event.Id = id
791-
selfNode := c.Node()
792-
event.NodeIdSelf = uint64(selfNode.ID)
793-
event.HomeDerp = strconv.Itoa(selfNode.PreferredDERP)
783+
event.ConnectionAge = durationpb.New(time.Since(c.createdAt))
794784
return event
795785
}
796786

787+
func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) {
788+
c.telemetryWg.Add(1)
789+
go func() {
790+
defer c.telemetryWg.Done()
791+
c.telemetrySink.SendTelemetryEvent(e)
792+
}()
793+
}
794+
795+
// Watch for changes in the connection type (P2P<->DERP) and send telemetry events.
796+
func (c *Conn) watchConnChange() {
797+
ticker := time.NewTicker(time.Millisecond * 50)
798+
defer ticker.Stop()
799+
for {
800+
select {
801+
case <-c.watchCtx.Done():
802+
return
803+
case <-ticker.C:
804+
}
805+
status := c.Status()
806+
peers := status.Peers()
807+
if len(peers) > 1 {
808+
// Not a CLI<->agent connection, stop watching
809+
return
810+
} else if len(peers) == 0 {
811+
continue
812+
}
813+
peer := status.Peer[peers[0]]
814+
// If the connection type has changed, send a telemetry event with the latest ping stats
815+
if c.telemetryStore.changedConntype(peer.CurAddr) {
816+
c.telemetryStore.pingPeer(c)
817+
}
818+
}
819+
}
820+
797821
// PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted
798822
// tunnel to a peer via a Conn
799823
type PeerDiagnostics struct {

tailnet/proto/tailnet.pb.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tailnet/proto/tailnet.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ message TelemetryEvent {
169169
uint64 node_id_self = 7;
170170
uint64 node_id_remote = 8;
171171
P2PEndpoint p2p_endpoint = 9;
172-
string home_derp = 10;
172+
int32 home_derp = 10;
173173
DERPMap derp_map = 11;
174174
Netcheck latest_netcheck = 12;
175175

0 commit comments

Comments
 (0)