From 6ee3bfa3c6b919a7f55c8dc98146a570450ecbb7 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 05:30:09 +0000 Subject: [PATCH 01/17] chore: add additional network telemetry stats & events --- cli/ssh.go | 3 +- coderd/telemetry/telemetry.go | 4 +- .../workspacesdk/connector_internal_test.go | 12 ++--- tailnet/conn.go | 52 ++++++++++++++----- tailnet/proto/tailnet.pb.go | 8 +-- tailnet/proto/tailnet.proto | 2 +- tailnet/telemetry.go | 24 +++++++-- 7 files changed, 75 insertions(+), 30 deletions(-) diff --git a/cli/ssh.go b/cli/ssh.go index 9b853b704978c..d7825188a5edb 100644 --- a/cli/ssh.go +++ b/cli/ssh.go @@ -37,6 +37,7 @@ import ( "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/pty" + "github.com/coder/coder/v2/tailnet" "github.com/coder/retry" "github.com/coder/serpent" ) @@ -437,7 +438,7 @@ func (r *RootCmd) ssh() *serpent.Command { } err = sshSession.Wait() - conn.SendDisconnectedTelemetry("ssh") + conn.SendDisconnectedTelemetry(tailnet.TelemetryApplicationSSH) if err != nil { if exitErr := (&gossh.ExitError{}); errors.As(err, &exitErr) { // Clear the error since it's not useful beyond diff --git a/coderd/telemetry/telemetry.go b/coderd/telemetry/telemetry.go index 53055c686f72b..a68d756f811f5 100644 --- a/coderd/telemetry/telemetry.go +++ b/coderd/telemetry/telemetry.go @@ -1240,7 +1240,7 @@ type NetworkEvent struct { NodeIDSelf uint64 `json:"node_id_self"` NodeIDRemote uint64 `json:"node_id_remote"` P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"` - HomeDERP string `json:"home_derp"` + HomeDERP int `json:"home_derp"` DERPMap DERPMap `json:"derp_map"` LatestNetcheck Netcheck `json:"latest_netcheck"` @@ -1286,7 +1286,7 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er NodeIDSelf: proto.NodeIdSelf, NodeIDRemote: proto.NodeIdRemote, P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint), - HomeDERP: proto.HomeDerp, + HomeDERP: int(proto.HomeDerp), DERPMap: derpMapFromProto(proto.DerpMap), LatestNetcheck: netcheckFromProto(proto.LatestNetcheck), diff --git a/codersdk/workspacesdk/connector_internal_test.go b/codersdk/workspacesdk/connector_internal_test.go index 00463d2076016..0106c271b68a4 100644 --- a/codersdk/workspacesdk/connector_internal_test.go +++ b/codersdk/workspacesdk/connector_internal_test.go @@ -228,12 +228,12 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) { return uut.client != nil }, testutil.WaitShort, testutil.IntervalFast) - fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), 0) + fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), 0) uut.SendTelemetryEvent(&proto.TelemetryEvent{}) require.False(t, uut.telemetryUnavailable.Load()) require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls)) - fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented) + fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented) uut.SendTelemetryEvent(&proto.TelemetryEvent{}) require.True(t, uut.telemetryUnavailable.Load()) uut.SendTelemetryEvent(&proto.TelemetryEvent{}) @@ -268,12 +268,12 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) { return uut.client != nil }, testutil.WaitShort, testutil.IntervalFast) - fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("Protocol Error") + fakeDRPCClient.telemetryError = drpc.ProtocolError.New("Protocol Error") uut.SendTelemetryEvent(&proto.TelemetryEvent{}) require.False(t, uut.telemetryUnavailable.Load()) require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls)) - fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry") + fakeDRPCClient.telemetryError = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry") uut.SendTelemetryEvent(&proto.TelemetryEvent{}) require.True(t, uut.telemetryUnavailable.Load()) uut.SendTelemetryEvent(&proto.TelemetryEvent{}) @@ -301,7 +301,7 @@ func newFakeTailnetConn() *fakeTailnetConn { type fakeDRPCClient struct { postTelemetryCalls int64 - telemeteryErorr error + telemetryError error fakeDRPPCMapStream } @@ -331,7 +331,7 @@ func (*fakeDRPCClient) DRPCConn() drpc.Conn { // PostTelemetry implements proto.DRPCTailnetClient. func (f *fakeDRPCClient) PostTelemetry(_ context.Context, _ *proto.TelemetryRequest) (*proto.TelemetryResponse, error) { atomic.AddInt64(&f.postTelemetryCalls, 1) - return nil, f.telemeteryErorr + return nil, f.telemetryError } // StreamDERPMaps implements proto.DRPCTailnetClient. diff --git a/tailnet/conn.go b/tailnet/conn.go index 6c60dedfd22b5..b57318327fa15 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -290,13 +290,15 @@ func NewConn(options *Options) (conn *Conn, err error) { configMaps: cfgMaps, nodeUpdater: nodeUp, telemetrySink: options.TelemetrySink, - telemeteryStore: telemetryStore, + telemetryStore: telemetryStore, + createdAt: time.Now(), } defer func() { if err != nil { _ = server.Close() } }() + server.SetNodeCallback(nil) netStack.GetTCPHandlerForFlow = server.forwardTCP @@ -351,11 +353,12 @@ type Conn struct { wireguardEngine wgengine.Engine listeners map[listenKey]*listener clientType proto.TelemetryEvent_ClientType + createdAt time.Time telemetrySink TelemetrySink - // telemeteryStore will be nil if telemetrySink is nil. - telemeteryStore *TelemetryStore - telemetryWg sync.WaitGroup + // telemetryStore will be nil if telemetrySink is nil. + telemetryStore *TelemetryStore + telemetryWg sync.WaitGroup trafficStats *connstats.Statistics } @@ -384,14 +387,25 @@ func (c *Conn) SetAddresses(ips []netip.Prefix) error { return nil } +// Sets the callback for when the node is updated. +// If telemetry is enabled, the callback will first update the telemetry store, +// send the updated telemetry, and then call the provided callback. func (c *Conn) SetNodeCallback(callback func(node *Node)) { - c.nodeUpdater.setCallback(callback) + if c.telemetryStore != nil { + c.nodeUpdater.setCallback(func(node *Node) { + c.telemetryStore.updateByNode(node) + c.sendUpdatedTelemetry() + callback(node) + }) + } else { + c.nodeUpdater.setCallback(callback) + } } // SetDERPMap updates the DERPMap of a connection. func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { - if c.configMaps.setDERPMap(derpMap) && c.telemeteryStore != nil { - c.telemeteryStore.updateDerpMap(derpMap) + if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil { + c.telemetryStore.updateDerpMap(derpMap) } } @@ -715,6 +729,7 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { } e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED + e.ConnectionSetup = durationpb.New(time.Since(c.createdAt)) e.Application = application pip, ok := c.wireguardEngine.PeerForIP(ip) if ok { @@ -727,6 +742,21 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { }() } +// Called whenever the Node is updated. +// Expects that the telemetry store has the latest node information. +func (c *Conn) sendUpdatedTelemetry() { + if c.telemetrySink == nil { + return + } + e := c.newTelemetryEvent() + e.Status = proto.TelemetryEvent_CONNECTED + c.telemetryWg.Add(1) + go func() { + defer c.telemetryWg.Done() + c.telemetrySink.SendTelemetryEvent(e) + }() +} + func (c *Conn) SendDisconnectedTelemetry(ip netip.Addr, application string) { if c.telemetrySink == nil { return @@ -769,7 +799,7 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) { latency := durationpb.New(time.Duration(pr.LatencySeconds * float64(time.Second))) if pr.Endpoint != "" { e.P2PLatency = latency - e.P2PEndpoint = c.telemeteryStore.toEndpoint(pr.Endpoint) + e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint) } else { e.DerpLatency = latency } @@ -785,12 +815,10 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) { func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent { // Infallible id, _ := c.id.MarshalBinary() - event := c.telemeteryStore.newEvent() + event := c.telemetryStore.newEvent() event.ClientType = c.clientType event.Id = id - selfNode := c.Node() - event.NodeIdSelf = uint64(selfNode.ID) - event.HomeDerp = strconv.Itoa(selfNode.PreferredDERP) + event.ConnectionAge = durationpb.New(time.Since(c.createdAt)) return event } diff --git a/tailnet/proto/tailnet.pb.go b/tailnet/proto/tailnet.pb.go index f777b956beffa..26416ff18ad90 100644 --- a/tailnet/proto/tailnet.pb.go +++ b/tailnet/proto/tailnet.pb.go @@ -819,7 +819,7 @@ type TelemetryEvent struct { NodeIdSelf uint64 `protobuf:"varint,7,opt,name=node_id_self,json=nodeIdSelf,proto3" json:"node_id_self,omitempty"` NodeIdRemote uint64 `protobuf:"varint,8,opt,name=node_id_remote,json=nodeIdRemote,proto3" json:"node_id_remote,omitempty"` P2PEndpoint *TelemetryEvent_P2PEndpoint `protobuf:"bytes,9,opt,name=p2p_endpoint,json=p2pEndpoint,proto3" json:"p2p_endpoint,omitempty"` - HomeDerp string `protobuf:"bytes,10,opt,name=home_derp,json=homeDerp,proto3" json:"home_derp,omitempty"` + HomeDerp int32 `protobuf:"varint,10,opt,name=home_derp,json=homeDerp,proto3" json:"home_derp,omitempty"` DerpMap *DERPMap `protobuf:"bytes,11,opt,name=derp_map,json=derpMap,proto3" json:"derp_map,omitempty"` LatestNetcheck *Netcheck `protobuf:"bytes,12,opt,name=latest_netcheck,json=latestNetcheck,proto3" json:"latest_netcheck,omitempty"` ConnectionAge *durationpb.Duration `protobuf:"bytes,13,opt,name=connection_age,json=connectionAge,proto3" json:"connection_age,omitempty"` @@ -925,11 +925,11 @@ func (x *TelemetryEvent) GetP2PEndpoint() *TelemetryEvent_P2PEndpoint { return nil } -func (x *TelemetryEvent) GetHomeDerp() string { +func (x *TelemetryEvent) GetHomeDerp() int32 { if x != nil { return x.HomeDerp } - return "" + return 0 } func (x *TelemetryEvent) GetDerpMap() *DERPMap { @@ -2006,7 +2006,7 @@ var file_tailnet_proto_tailnet_proto_rawDesc = []byte{ 0x2e, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x32, 0x50, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x0b, 0x70, 0x32, 0x70, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x68, 0x6f, 0x6d, 0x65, - 0x5f, 0x64, 0x65, 0x72, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x6d, + 0x5f, 0x64, 0x65, 0x72, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x68, 0x6f, 0x6d, 0x65, 0x44, 0x65, 0x72, 0x70, 0x12, 0x34, 0x0a, 0x08, 0x64, 0x65, 0x72, 0x70, 0x5f, 0x6d, 0x61, 0x70, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x45, 0x52, 0x50, 0x4d, diff --git a/tailnet/proto/tailnet.proto b/tailnet/proto/tailnet.proto index 6d025b1eb1749..728024ce725d7 100644 --- a/tailnet/proto/tailnet.proto +++ b/tailnet/proto/tailnet.proto @@ -169,7 +169,7 @@ message TelemetryEvent { uint64 node_id_self = 7; uint64 node_id_remote = 8; P2PEndpoint p2p_endpoint = 9; - string home_derp = 10; + int32 home_derp = 10; DERPMap derp_map = 11; Netcheck latest_netcheck = 12; diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index b8012e33a1ad4..f3d91ade970d2 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -31,6 +31,8 @@ type TelemetryStore struct { cleanDerpMap *tailcfg.DERPMap cleanNetCheck *proto.Netcheck + nodeID uint64 + homeDerp int32 } func newTelemetryStore() (*TelemetryStore, error) { @@ -53,11 +55,11 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent { Time: timestamppb.Now(), DerpMap: DERPMapToProto(b.cleanDerpMap), LatestNetcheck: b.cleanNetCheck, + NodeIdSelf: b.nodeID, + HomeDerp: b.homeDerp, // TODO(ethanndickson): - ConnectionAge: &durationpb.Duration{}, - ConnectionSetup: &durationpb.Duration{}, - P2PSetup: &durationpb.Duration{}, + P2PSetup: &durationpb.Duration{}, } } @@ -85,11 +87,24 @@ func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) { b.cleanDerpMap = cleanMap } +func (b *TelemetryStore) updateByNode(n *Node) { + b.mu.Lock() + defer b.mu.Unlock() + + b.nodeID = uint64(n.ID) + b.homeDerp = int32(n.PreferredDERP) +} + // Store an anonymized proto.Netcheck given a tailscale NetInfo. -func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) { +func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) bool { b.mu.Lock() defer b.mu.Unlock() + derpHomeChanged := false + if b.cleanNetCheck != nil { + derpHomeChanged = b.cleanNetCheck.PreferredDERP != int64(ni.PreferredDERP) + } + b.cleanNetCheck = &proto.Netcheck{ UDP: ni.UDP, IPv6: ni.IPv6, @@ -127,6 +142,7 @@ func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) { for rid, seconds := range ni.DERPLatencyV6 { b.cleanNetCheck.RegionV6Latency[int64(rid)] = durationpb.New(time.Duration(seconds * float64(time.Second))) } + return derpHomeChanged } func (b *TelemetryStore) toEndpoint(ipport string) *proto.TelemetryEvent_P2PEndpoint { From 6e0ec17677f9b033ae14ea4d36b496d5fe51baa9 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 05:42:12 +0000 Subject: [PATCH 02/17] better connectionage --- tailnet/conn.go | 2 +- tailnet/telemetry.go | 20 +++++++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index b57318327fa15..1733f93c384d0 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -727,9 +727,9 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { if c.telemetrySink == nil { return } + c.telemetryStore.markConnected(time.Since(c.createdAt)) e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED - e.ConnectionSetup = durationpb.New(time.Since(c.createdAt)) e.Application = application pip, ok := c.wireguardEngine.PeerForIP(ip) if ok { diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index f3d91ade970d2..09e836d3dc811 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -33,6 +33,8 @@ type TelemetryStore struct { cleanNetCheck *proto.Netcheck nodeID uint64 homeDerp int32 + + connSetupTime time.Duration } func newTelemetryStore() (*TelemetryStore, error) { @@ -52,17 +54,25 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent { defer b.mu.Unlock() return &proto.TelemetryEvent{ - Time: timestamppb.Now(), - DerpMap: DERPMapToProto(b.cleanDerpMap), - LatestNetcheck: b.cleanNetCheck, - NodeIdSelf: b.nodeID, - HomeDerp: b.homeDerp, + Time: timestamppb.Now(), + DerpMap: DERPMapToProto(b.cleanDerpMap), + LatestNetcheck: b.cleanNetCheck, + NodeIdSelf: b.nodeID, + HomeDerp: b.homeDerp, + ConnectionSetup: durationpb.New(b.connSetupTime), // TODO(ethanndickson): P2PSetup: &durationpb.Duration{}, } } +func (b *TelemetryStore) markConnected(connSetupTime time.Duration) { + b.mu.Lock() + defer b.mu.Unlock() + + b.connSetupTime = connSetupTime +} + // Given a DERPMap, anonymise all IPs and hostnames. // Keep track of seen hostnames/cert names to anonymize them from future logs. // b.mu must NOT be held. From ae17bb3ead50a54a49e979ad52a79b422b2bc0d6 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 06:02:38 +0000 Subject: [PATCH 03/17] send update events only when home derp changes --- tailnet/conn.go | 7 ++++--- tailnet/telemetry.go | 25 +++++++++++++------------ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 1733f93c384d0..53553343d4dec 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -266,8 +266,8 @@ func NewConn(options *Options) (conn *Conn, err error) { magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket) if telemetryStore != nil { wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) { - nodeUp.setNetInfo(ni) telemetryStore.setNetInfo(ni) + nodeUp.setNetInfo(ni) }) } else { wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo) @@ -393,8 +393,9 @@ func (c *Conn) SetAddresses(ips []netip.Prefix) error { func (c *Conn) SetNodeCallback(callback func(node *Node)) { if c.telemetryStore != nil { c.nodeUpdater.setCallback(func(node *Node) { - c.telemetryStore.updateByNode(node) - c.sendUpdatedTelemetry() + if c.telemetryStore.updateByNode(node) { + c.sendUpdatedTelemetry() + } callback(node) }) } else { diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 09e836d3dc811..0812c096b7049 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -34,7 +34,7 @@ type TelemetryStore struct { nodeID uint64 homeDerp int32 - connSetupTime time.Duration + connSetupTime *durationpb.Duration } func newTelemetryStore() (*TelemetryStore, error) { @@ -59,7 +59,7 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent { LatestNetcheck: b.cleanNetCheck, NodeIdSelf: b.nodeID, HomeDerp: b.homeDerp, - ConnectionSetup: durationpb.New(b.connSetupTime), + ConnectionSetup: b.connSetupTime, // TODO(ethanndickson): P2PSetup: &durationpb.Duration{}, @@ -70,7 +70,7 @@ func (b *TelemetryStore) markConnected(connSetupTime time.Duration) { b.mu.Lock() defer b.mu.Unlock() - b.connSetupTime = connSetupTime + b.connSetupTime = durationpb.New(connSetupTime) } // Given a DERPMap, anonymise all IPs and hostnames. @@ -97,24 +97,26 @@ func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) { b.cleanDerpMap = cleanMap } -func (b *TelemetryStore) updateByNode(n *Node) { +// Update the telemetry store with the current node state. +// Returns true if the home DERP has changed. +func (b *TelemetryStore) updateByNode(n *Node) bool { b.mu.Lock() defer b.mu.Unlock() b.nodeID = uint64(n.ID) - b.homeDerp = int32(n.PreferredDERP) + newHome := int32(n.PreferredDERP) + if b.homeDerp != newHome { + b.homeDerp = newHome + return true + } + return false } // Store an anonymized proto.Netcheck given a tailscale NetInfo. -func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) bool { +func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) { b.mu.Lock() defer b.mu.Unlock() - derpHomeChanged := false - if b.cleanNetCheck != nil { - derpHomeChanged = b.cleanNetCheck.PreferredDERP != int64(ni.PreferredDERP) - } - b.cleanNetCheck = &proto.Netcheck{ UDP: ni.UDP, IPv6: ni.IPv6, @@ -152,7 +154,6 @@ func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) bool { for rid, seconds := range ni.DERPLatencyV6 { b.cleanNetCheck.RegionV6Latency[int64(rid)] = durationpb.New(time.Duration(seconds * float64(time.Second))) } - return derpHomeChanged } func (b *TelemetryStore) toEndpoint(ipport string) *proto.TelemetryEvent_P2PEndpoint { From 612ae5f45fbf14c94de40966ebe8d8c7b94ae045 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 07:20:10 +0000 Subject: [PATCH 04/17] better remote node id --- cli/ssh.go | 3 +-- codersdk/workspacesdk/agentconn.go | 4 ---- tailnet/conn.go | 19 ++++++------------- tailnet/telemetry.go | 30 +++++++++++++++++++++++++++--- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/cli/ssh.go b/cli/ssh.go index d7825188a5edb..1d75f1015e242 100644 --- a/cli/ssh.go +++ b/cli/ssh.go @@ -37,7 +37,6 @@ import ( "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/pty" - "github.com/coder/coder/v2/tailnet" "github.com/coder/retry" "github.com/coder/serpent" ) @@ -438,7 +437,7 @@ func (r *RootCmd) ssh() *serpent.Command { } err = sshSession.Wait() - conn.SendDisconnectedTelemetry(tailnet.TelemetryApplicationSSH) + conn.SendDisconnectedTelemetry() if err != nil { if exitErr := (&gossh.ExitError{}); errors.As(err, &exitErr) { // Clear the error since it's not useful beyond diff --git a/codersdk/workspacesdk/agentconn.go b/codersdk/workspacesdk/agentconn.go index edd3584493bde..ed9da4c2a04bf 100644 --- a/codersdk/workspacesdk/agentconn.go +++ b/codersdk/workspacesdk/agentconn.go @@ -380,7 +380,3 @@ func (c *AgentConn) apiClient() *http.Client { func (c *AgentConn) GetPeerDiagnostics() tailnet.PeerDiagnostics { return c.Conn.GetPeerDiagnostics(c.opts.AgentID) } - -func (c *AgentConn) SendDisconnectedTelemetry(application string) { - c.Conn.SendDisconnectedTelemetry(c.agentAddress(), application) -} diff --git a/tailnet/conn.go b/tailnet/conn.go index 53553343d4dec..c7de8ba9debaa 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -728,14 +728,10 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { if c.telemetrySink == nil { return } - c.telemetryStore.markConnected(time.Since(c.createdAt)) + c.telemetryStore.markConnected(&ip, c.createdAt, application) + c.telemetryStore.updateRemoteNodeID(c.wireguardEngine) e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED - e.Application = application - pip, ok := c.wireguardEngine.PeerForIP(ip) - if ok { - e.NodeIdRemote = uint64(pip.Node.ID) - } c.telemetryWg.Add(1) go func() { defer c.telemetryWg.Done() @@ -749,6 +745,7 @@ func (c *Conn) sendUpdatedTelemetry() { if c.telemetrySink == nil { return } + c.telemetryStore.updateRemoteNodeID(c.wireguardEngine) e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED c.telemetryWg.Add(1) @@ -758,17 +755,13 @@ func (c *Conn) sendUpdatedTelemetry() { }() } -func (c *Conn) SendDisconnectedTelemetry(ip netip.Addr, application string) { +func (c *Conn) SendDisconnectedTelemetry() { if c.telemetrySink == nil { return } e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_DISCONNECTED - e.Application = application - pip, ok := c.wireguardEngine.PeerForIP(ip) - if ok { - e.NodeIdRemote = uint64(pip.Node.ID) - } + c.telemetryStore.updateRemoteNodeID(c.wireguardEngine) c.telemetryWg.Add(1) go func() { defer c.telemetryWg.Done() @@ -781,8 +774,8 @@ func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) { return } e := c.newTelemetryEvent() - e.Status = proto.TelemetryEvent_CONNECTED e.ThroughputMbits = wrapperspb.Float(float32(throughputMbits)) + e.Status = proto.TelemetryEvent_CONNECTED c.telemetryWg.Add(1) go func() { defer c.telemetryWg.Done() diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 0812c096b7049..d1261ff49a9ea 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -12,6 +12,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" "tailscale.com/tailcfg" + "tailscale.com/wgengine" "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/tailnet/proto" @@ -33,8 +34,13 @@ type TelemetryStore struct { cleanNetCheck *proto.Netcheck nodeID uint64 homeDerp int32 + application string + // nil if not connected connSetupTime *durationpb.Duration + connectedIP *netip.Addr + // 0 if not connected + connectedNodeID uint64 } func newTelemetryStore() (*TelemetryStore, error) { @@ -58,19 +64,37 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent { DerpMap: DERPMapToProto(b.cleanDerpMap), LatestNetcheck: b.cleanNetCheck, NodeIdSelf: b.nodeID, + NodeIdRemote: b.connectedNodeID, HomeDerp: b.homeDerp, ConnectionSetup: b.connSetupTime, + Application: b.application, // TODO(ethanndickson): P2PSetup: &durationpb.Duration{}, } } -func (b *TelemetryStore) markConnected(connSetupTime time.Duration) { +func (b *TelemetryStore) markConnected(ip *netip.Addr, connCreatedAt time.Time, application string) { b.mu.Lock() defer b.mu.Unlock() - b.connSetupTime = durationpb.New(connSetupTime) + b.connSetupTime = durationpb.New(time.Since(connCreatedAt)) + b.connectedIP = ip + b.application = application +} + +func (b *TelemetryStore) updateRemoteNodeID(engine wgengine.Engine) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.connectedIP == nil { + return + } + + pip, ok := engine.PeerForIP(*b.connectedIP) + if ok { + b.connectedNodeID = uint64(pip.Node.ID) + } } // Given a DERPMap, anonymise all IPs and hostnames. @@ -97,7 +121,7 @@ func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) { b.cleanDerpMap = cleanMap } -// Update the telemetry store with the current node state. +// Update the telemetry store with the current self node state. // Returns true if the home DERP has changed. func (b *TelemetryStore) updateByNode(n *Node) bool { b.mu.Lock() From d2d9c39ab9cc7e9927ed2f1d4efd76eb246df0b3 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 08:15:38 +0000 Subject: [PATCH 05/17] use netmap callback --- tailnet/configmaps.go | 5 ++-- tailnet/conn.go | 45 ++++++++++++------------------ tailnet/telemetry.go | 40 +++++++++++++++++--------- tailnet/telemetry_internal_test.go | 4 ++- 4 files changed, 50 insertions(+), 44 deletions(-) diff --git a/tailnet/configmaps.go b/tailnet/configmaps.go index a6ef9f40028b1..3d817b17516b8 100644 --- a/tailnet/configmaps.go +++ b/tailnet/configmaps.go @@ -284,16 +284,15 @@ func (c *configMaps) getBlockEndpoints() bool { // setDERPMap sets the DERP map, triggering a configuration of the engine if it has changed. // c.L MUST NOT be held. // Returns if the derpMap is dirty. -func (c *configMaps) setDERPMap(derpMap *tailcfg.DERPMap) bool { +func (c *configMaps) setDERPMap(derpMap *tailcfg.DERPMap) { c.L.Lock() defer c.L.Unlock() if CompareDERPMaps(c.derpMap, derpMap) { - return false + return } c.derpMap = derpMap c.derpMapDirty = true c.Broadcast() - return true } // derMapLocked returns the current DERPMap. c.L must be held diff --git a/tailnet/conn.go b/tailnet/conn.go index c7de8ba9debaa..460a286cd710c 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -31,6 +31,7 @@ import ( "tailscale.com/types/key" tslogger "tailscale.com/types/logger" "tailscale.com/types/netlogtype" + "tailscale.com/types/netmap" "tailscale.com/wgengine" "tailscale.com/wgengine/capture" "tailscale.com/wgengine/magicsock" @@ -262,16 +263,6 @@ func NewConn(options *Options) (conn *Conn, err error) { ) nodeUp.setAddresses(options.Addresses) nodeUp.setBlockEndpoints(options.BlockEndpoints) - wireguardEngine.SetStatusCallback(nodeUp.setStatus) - magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket) - if telemetryStore != nil { - wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) { - telemetryStore.setNetInfo(ni) - nodeUp.setNetInfo(ni) - }) - } else { - wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo) - } server := &Conn{ id: uuid.New(), @@ -298,7 +289,21 @@ func NewConn(options *Options) (conn *Conn, err error) { _ = server.Close() } }() - server.SetNodeCallback(nil) + if server.telemetryStore != nil { + server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) { + server.telemetryStore.setNetInfo(ni) + nodeUp.setNetInfo(ni) + }) + server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) { + if server.telemetryStore.updateNetworkMap(nm) { + server.sendUpdatedTelemetry() + } + }) + } else { + server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo) + } + server.wireguardEngine.SetStatusCallback(nodeUp.setStatus) + server.magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket) netStack.GetTCPHandlerForFlow = server.forwardTCP @@ -391,23 +396,12 @@ func (c *Conn) SetAddresses(ips []netip.Prefix) error { // If telemetry is enabled, the callback will first update the telemetry store, // send the updated telemetry, and then call the provided callback. func (c *Conn) SetNodeCallback(callback func(node *Node)) { - if c.telemetryStore != nil { - c.nodeUpdater.setCallback(func(node *Node) { - if c.telemetryStore.updateByNode(node) { - c.sendUpdatedTelemetry() - } - callback(node) - }) - } else { - c.nodeUpdater.setCallback(callback) - } + c.nodeUpdater.setCallback(callback) } // SetDERPMap updates the DERPMap of a connection. func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { - if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil { - c.telemetryStore.updateDerpMap(derpMap) - } + c.configMaps.setDERPMap(derpMap) } func (c *Conn) SetDERPForceWebSockets(v bool) { @@ -729,7 +723,6 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { return } c.telemetryStore.markConnected(&ip, c.createdAt, application) - c.telemetryStore.updateRemoteNodeID(c.wireguardEngine) e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED c.telemetryWg.Add(1) @@ -745,7 +738,6 @@ func (c *Conn) sendUpdatedTelemetry() { if c.telemetrySink == nil { return } - c.telemetryStore.updateRemoteNodeID(c.wireguardEngine) e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED c.telemetryWg.Add(1) @@ -761,7 +753,6 @@ func (c *Conn) SendDisconnectedTelemetry() { } e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_DISCONNECTED - c.telemetryStore.updateRemoteNodeID(c.wireguardEngine) c.telemetryWg.Add(1) go func() { defer c.telemetryWg.Done() diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index d1261ff49a9ea..72e07b2584731 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -12,7 +12,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" "tailscale.com/tailcfg" - "tailscale.com/wgengine" + "tailscale.com/types/netmap" "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/tailnet/proto" @@ -83,7 +83,7 @@ func (b *TelemetryStore) markConnected(ip *netip.Addr, connCreatedAt time.Time, b.application = application } -func (b *TelemetryStore) updateRemoteNodeID(engine wgengine.Engine) { +func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) { b.mu.Lock() defer b.mu.Unlock() @@ -91,18 +91,31 @@ func (b *TelemetryStore) updateRemoteNodeID(engine wgengine.Engine) { return } - pip, ok := engine.PeerForIP(*b.connectedIP) - if ok { - b.connectedNodeID = uint64(pip.Node.ID) + ip := *b.connectedIP + + for _, p := range nm.Peers { + for _, a := range p.Addresses { + if a.Addr() == ip && a.IsSingleIP() { + b.connectedNodeID = uint64(p.ID) + } + } } } +// Returning whether a new telemetry event should be sent +func (b *TelemetryStore) updateNetworkMap(nm *netmap.NetworkMap) bool { + b.mu.Lock() + defer b.mu.Unlock() + + b.updateDerpMapLocked(nm.DERPMap) + b.updateRemoteNodeIDLocked(nm) + return b.updateByNodeLocked(nm.SelfNode) +} + // Given a DERPMap, anonymise all IPs and hostnames. // Keep track of seen hostnames/cert names to anonymize them from future logs. // b.mu must NOT be held. -func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) { - b.mu.Lock() - defer b.mu.Unlock() +func (b *TelemetryStore) updateDerpMapLocked(cur *tailcfg.DERPMap) { cleanMap := cur.Clone() for _, r := range cleanMap.Regions { for _, n := range r.Nodes { @@ -123,12 +136,13 @@ func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) { // Update the telemetry store with the current self node state. // Returns true if the home DERP has changed. -func (b *TelemetryStore) updateByNode(n *Node) bool { - b.mu.Lock() - defer b.mu.Unlock() - +func (b *TelemetryStore) updateByNodeLocked(n *tailcfg.Node) bool { b.nodeID = uint64(n.ID) - newHome := int32(n.PreferredDERP) + derpIP, err := netip.ParseAddrPort(n.DERP) + if err != nil { + return false + } + newHome := int32(derpIP.Port()) if b.homeDerp != newHome { b.homeDerp = newHome return true diff --git a/tailnet/telemetry_internal_test.go b/tailnet/telemetry_internal_test.go index 7abbe611d7d36..067ab776fa596 100644 --- a/tailnet/telemetry_internal_test.go +++ b/tailnet/telemetry_internal_test.go @@ -133,7 +133,9 @@ func TestTelemetryStore(t *testing.T) { }, }, } - telemetry.updateDerpMap(derpMap) + telemetry.mu.Lock() + telemetry.updateDerpMapLocked(derpMap) + telemetry.mu.Unlock() event := telemetry.newEvent() require.Len(t, event.DerpMap.Regions[999].Nodes, 1) From af36c3734aa9d4c11c7528871c3e84c6aa3033d9 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 08:33:04 +0000 Subject: [PATCH 06/17] fixup --- tailnet/conn.go | 4 ++-- tailnet/telemetry.go | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 460a286cd710c..82c9012528883 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -718,6 +718,7 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) { c.magicConn.ServeHTTPDebug(w, r) } +// Currently called when connected to the Agent with the given IP using the given application. func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { if c.telemetrySink == nil { return @@ -732,8 +733,7 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { }() } -// Called whenever the Node is updated. -// Expects that the telemetry store has the latest node information. +// Currently only called when the preferred DERP is updated. func (c *Conn) sendUpdatedTelemetry() { if c.telemetrySink == nil { return diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 72e07b2584731..aeb18ed26704f 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -21,6 +21,7 @@ import ( const ( TelemetryApplicationSSH string = "ssh" TelemetryApplicationSpeedtest string = "speedtest" + TelemetryApplicationVSCode string = "vscode" ) // Responsible for storing and anonymizing networking telemetry state. @@ -84,9 +85,6 @@ func (b *TelemetryStore) markConnected(ip *netip.Addr, connCreatedAt time.Time, } func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) { - b.mu.Lock() - defer b.mu.Unlock() - if b.connectedIP == nil { return } @@ -102,11 +100,15 @@ func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) { } } -// Returning whether a new telemetry event should be sent +// Returns whether a new telemetry event should be sent func (b *TelemetryStore) updateNetworkMap(nm *netmap.NetworkMap) bool { b.mu.Lock() defer b.mu.Unlock() + if nm == nil { + return false + } + b.updateDerpMapLocked(nm.DERPMap) b.updateRemoteNodeIDLocked(nm) return b.updateByNodeLocked(nm.SelfNode) @@ -116,6 +118,9 @@ func (b *TelemetryStore) updateNetworkMap(nm *netmap.NetworkMap) bool { // Keep track of seen hostnames/cert names to anonymize them from future logs. // b.mu must NOT be held. func (b *TelemetryStore) updateDerpMapLocked(cur *tailcfg.DERPMap) { + if cur == nil { + return + } cleanMap := cur.Clone() for _, r := range cleanMap.Regions { for _, n := range r.Nodes { @@ -137,6 +142,9 @@ func (b *TelemetryStore) updateDerpMapLocked(cur *tailcfg.DERPMap) { // Update the telemetry store with the current self node state. // Returns true if the home DERP has changed. func (b *TelemetryStore) updateByNodeLocked(n *tailcfg.Node) bool { + if n == nil { + return false + } b.nodeID = uint64(n.ID) derpIP, err := netip.ParseAddrPort(n.DERP) if err != nil { From 3466ba631796edfb326142343b1e52358cea01a0 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 08:59:38 +0000 Subject: [PATCH 07/17] fixup --- tailnet/conn.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 82c9012528883..7192c85782286 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -392,9 +392,6 @@ func (c *Conn) SetAddresses(ips []netip.Prefix) error { return nil } -// Sets the callback for when the node is updated. -// If telemetry is enabled, the callback will first update the telemetry store, -// send the updated telemetry, and then call the provided callback. func (c *Conn) SetNodeCallback(callback func(node *Node)) { c.nodeUpdater.setCallback(callback) } From dbffec425b00f3ed2f9a7b09c85706c87a5876e2 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 8 Jul 2024 13:46:58 +0000 Subject: [PATCH 08/17] test --- tailnet/telemetry.go | 12 +++--- tailnet/telemetry_internal_test.go | 65 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index aeb18ed26704f..4187b6d81522d 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -33,7 +33,7 @@ type TelemetryStore struct { cleanDerpMap *tailcfg.DERPMap cleanNetCheck *proto.Netcheck - nodeID uint64 + nodeIDSelf uint64 homeDerp int32 application string @@ -41,7 +41,7 @@ type TelemetryStore struct { connSetupTime *durationpb.Duration connectedIP *netip.Addr // 0 if not connected - connectedNodeID uint64 + nodeIDRemote uint64 } func newTelemetryStore() (*TelemetryStore, error) { @@ -64,8 +64,8 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent { Time: timestamppb.Now(), DerpMap: DERPMapToProto(b.cleanDerpMap), LatestNetcheck: b.cleanNetCheck, - NodeIdSelf: b.nodeID, - NodeIdRemote: b.connectedNodeID, + NodeIdSelf: b.nodeIDSelf, + NodeIdRemote: b.nodeIDRemote, HomeDerp: b.homeDerp, ConnectionSetup: b.connSetupTime, Application: b.application, @@ -94,7 +94,7 @@ func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) { for _, p := range nm.Peers { for _, a := range p.Addresses { if a.Addr() == ip && a.IsSingleIP() { - b.connectedNodeID = uint64(p.ID) + b.nodeIDRemote = uint64(p.ID) } } } @@ -145,7 +145,7 @@ func (b *TelemetryStore) updateByNodeLocked(n *tailcfg.Node) bool { if n == nil { return false } - b.nodeID = uint64(n.ID) + b.nodeIDSelf = uint64(n.ID) derpIP, err := netip.ParseAddrPort(n.DERP) if err != nil { return false diff --git a/tailnet/telemetry_internal_test.go b/tailnet/telemetry_internal_test.go index 067ab776fa596..fe3b24e2b7095 100644 --- a/tailnet/telemetry_internal_test.go +++ b/tailnet/telemetry_internal_test.go @@ -1,10 +1,14 @@ package tailnet import ( + "fmt" + "net/netip" "testing" + "time" "github.com/stretchr/testify/require" "tailscale.com/tailcfg" + "tailscale.com/types/netmap" "github.com/coder/coder/v2/tailnet/proto" ) @@ -12,6 +16,67 @@ import ( func TestTelemetryStore(t *testing.T) { t.Parallel() + t.Run("CreateEvent", func(t *testing.T) { + t.Parallel() + + remotePrefix := netip.PrefixFrom(IP(), 128) + remoteIP := remotePrefix.Addr() + application := "test" + + nm := &netmap.NetworkMap{ + SelfNode: &tailcfg.Node{ + ID: 0, + DERP: "127.3.3.40:999", + }, + Peers: []*tailcfg.Node{ + { + ID: 1, + Addresses: []netip.Prefix{ + netip.PrefixFrom(IP(), 128), + netip.PrefixFrom(IP(), 128), + }, + }, + { + ID: 2, + Addresses: []netip.Prefix{ + remotePrefix, + netip.PrefixFrom(IP(), 128), + netip.PrefixFrom(IP(), 128), + }, + }, + }, + DERPMap: &tailcfg.DERPMap{ + HomeParams: &tailcfg.DERPHomeParams{ + RegionScore: map[int]float64{ + 999: 1.0, + }, + }, + Regions: map[int]*tailcfg.DERPRegion{ + 999: { + RegionID: 999, + RegionCode: "zzz", + RegionName: "Cool Region", + EmbeddedRelay: true, + Avoid: false, + }, + }, + OmitDefaultRegions: false, + }, + } + + telemetry, err := newTelemetryStore() + require.NoError(t, err) + telemetry.markConnected(&remoteIP, time.Now(), application) + telemetry.updateNetworkMap(nm) + e := telemetry.newEvent() + // DERPMapToProto already tested + require.Equal(t, DERPMapToProto(nm.DERPMap), e.DerpMap) + require.Equal(t, uint64(nm.Peers[1].ID), e.NodeIdRemote) + require.Equal(t, uint64(nm.SelfNode.ID), e.NodeIdSelf) + require.Equal(t, application, e.Application) + require.Equal(t, nm.SelfNode.DERP, fmt.Sprintf("127.3.3.40:%d", e.HomeDerp)) + }) + t.Run("CleanIPs", func(t *testing.T) { t.Parallel() From b2405072da6160d67b4966bf6df88955fc31855a Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 05:14:38 +0000 Subject: [PATCH 09/17] set p2p setup --- tailnet/conn.go | 60 ++++++++++++++++---------------------------- tailnet/telemetry.go | 7 +++--- 2 files changed, 24 insertions(+), 43 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 7192c85782286..56e9b5d921743 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -294,11 +294,7 @@ func NewConn(options *Options) (conn *Conn, err error) { server.telemetryStore.setNetInfo(ni) nodeUp.setNetInfo(ni) }) - server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) { - if server.telemetryStore.updateNetworkMap(nm) { - server.sendUpdatedTelemetry() - } - }) + server.wireguardEngine.AddNetworkMapCallback(server.networkMapCallback) } else { server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo) } @@ -723,25 +719,7 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { c.telemetryStore.markConnected(&ip, c.createdAt, application) e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED - c.telemetryWg.Add(1) - go func() { - defer c.telemetryWg.Done() - c.telemetrySink.SendTelemetryEvent(e) - }() -} - -// Currently only called when the preferred DERP is updated. -func (c *Conn) sendUpdatedTelemetry() { - if c.telemetrySink == nil { - return - } - e := c.newTelemetryEvent() - e.Status = proto.TelemetryEvent_CONNECTED - c.telemetryWg.Add(1) - go func() { - defer c.telemetryWg.Done() - c.telemetrySink.SendTelemetryEvent(e) - }() + c.sendTelemetryBackground(e) } func (c *Conn) SendDisconnectedTelemetry() { @@ -750,11 +728,7 @@ func (c *Conn) SendDisconnectedTelemetry() { } e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_DISCONNECTED - c.telemetryWg.Add(1) - go func() { - defer c.telemetryWg.Done() - c.telemetrySink.SendTelemetryEvent(e) - }() + c.sendTelemetryBackground(e) } func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) { @@ -764,11 +738,7 @@ func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) { e := c.newTelemetryEvent() e.ThroughputMbits = wrapperspb.Float(float32(throughputMbits)) e.Status = proto.TelemetryEvent_CONNECTED - c.telemetryWg.Add(1) - go func() { - defer c.telemetryWg.Done() - c.telemetrySink.SendTelemetryEvent(e) - }() + c.sendTelemetryBackground(e) } // nolint:revive @@ -782,15 +752,12 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) { if pr.Endpoint != "" { e.P2PLatency = latency e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint) + e.P2PSetup = durationpb.New(time.Since(c.createdAt)) } else { e.DerpLatency = latency } e.Status = proto.TelemetryEvent_CONNECTED - c.telemetryWg.Add(1) - go func() { - defer c.telemetryWg.Done() - c.telemetrySink.SendTelemetryEvent(e) - }() + c.sendTelemetryBackground(e) } // The returned telemetry event will not have it's status set. @@ -804,6 +771,21 @@ func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent { return event } +func (c *Conn) networkMapCallback(nm *netmap.NetworkMap) { + c.telemetryStore.updateNetworkMap(nm) + if c.telemetryStore.connectedIP != nil { + go func() { _, _, _, _ = c.Ping(context.Background(), *c.telemetryStore.connectedIP) }() + } +} + +func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) { + c.telemetryWg.Add(1) + go func() { + defer c.telemetryWg.Done() + c.telemetrySink.SendTelemetryEvent(e) + }() +} + // PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted // tunnel to a peer via a Conn type PeerDiagnostics struct { diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 4187b6d81522d..9e1fba3df7585 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -100,18 +100,17 @@ func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) { } } -// Returns whether a new telemetry event should be sent -func (b *TelemetryStore) updateNetworkMap(nm *netmap.NetworkMap) bool { +func (b *TelemetryStore) updateNetworkMap(nm *netmap.NetworkMap) { b.mu.Lock() defer b.mu.Unlock() if nm == nil { - return false + return } b.updateDerpMapLocked(nm.DERPMap) b.updateRemoteNodeIDLocked(nm) - return b.updateByNodeLocked(nm.SelfNode) + b.updateByNodeLocked(nm.SelfNode) } // Given a DERPMap, anonymise all IPs and hostnames. From e2e4018b2e75867515e561f320ca4591fe855c36 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 05:26:53 +0000 Subject: [PATCH 10/17] fixup --- tailnet/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 56e9b5d921743..19a9a34ec1125 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -774,7 +774,7 @@ func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent { func (c *Conn) networkMapCallback(nm *netmap.NetworkMap) { c.telemetryStore.updateNetworkMap(nm) if c.telemetryStore.connectedIP != nil { - go func() { _, _, _, _ = c.Ping(context.Background(), *c.telemetryStore.connectedIP) }() + _, _, _, _ = c.Ping(context.Background(), *c.telemetryStore.connectedIP) } } From 0bebad47aec962cb62856dea8fd0c36c1c765116 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 07:23:27 +0000 Subject: [PATCH 11/17] watch for conn changes --- tailnet/conn.go | 51 +++++++++++++++++++++++++++++++++++++------- tailnet/telemetry.go | 18 ++++++++++++++++ 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 19a9a34ec1125..02a920965bc85 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -264,6 +264,7 @@ func NewConn(options *Options) (conn *Conn, err error) { nodeUp.setAddresses(options.Addresses) nodeUp.setBlockEndpoints(options.BlockEndpoints) + ctx, ctxCancel := context.WithCancel(context.Background()) server := &Conn{ id: uuid.New(), closed: make(chan struct{}), @@ -283,6 +284,8 @@ func NewConn(options *Options) (conn *Conn, err error) { telemetrySink: options.TelemetrySink, telemetryStore: telemetryStore, createdAt: time.Now(), + watchCtx: ctx, + watchCancel: ctxCancel, } defer func() { if err != nil { @@ -293,8 +296,17 @@ func NewConn(options *Options) (conn *Conn, err error) { server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) { server.telemetryStore.setNetInfo(ni) nodeUp.setNetInfo(ni) + if server.telemetryStore.connectedIP != nil { + _, _, _, _ = server.Ping(ctx, *server.telemetryStore.connectedIP) + } }) - server.wireguardEngine.AddNetworkMapCallback(server.networkMapCallback) + server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) { + server.telemetryStore.updateNetworkMap(nm) + if server.telemetryStore.connectedIP != nil { + _, _, _, _ = server.Ping(ctx, *server.telemetryStore.connectedIP) + } + }) + go server.watchConnChange() } else { server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo) } @@ -361,6 +373,9 @@ type Conn struct { telemetryStore *TelemetryStore telemetryWg sync.WaitGroup + watchCtx context.Context + watchCancel func() + trafficStats *connstats.Statistics } @@ -542,6 +557,7 @@ func (c *Conn) Closed() <-chan struct{} { // Close shuts down the Wireguard connection. func (c *Conn) Close() error { c.logger.Info(context.Background(), "closing tailnet Conn") + c.watchCancel() c.telemetryWg.Wait() c.configMaps.close() c.nodeUpdater.close() @@ -771,13 +787,6 @@ func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent { return event } -func (c *Conn) networkMapCallback(nm *netmap.NetworkMap) { - c.telemetryStore.updateNetworkMap(nm) - if c.telemetryStore.connectedIP != nil { - _, _, _, _ = c.Ping(context.Background(), *c.telemetryStore.connectedIP) - } -} - func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) { c.telemetryWg.Add(1) go func() { @@ -786,6 +795,32 @@ func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) { }() } +// Watch for changes in the connection type (P2P<->DERP) and send telemetry events. +func (c *Conn) watchConnChange() { + ticker := time.NewTicker(time.Millisecond * 50) + defer ticker.Stop() + for { + select { + case <-c.watchCtx.Done(): + return + case <-ticker.C: + } + status := c.Status() + peers := status.Peers() + if len(peers) > 1 { + // Not a CLI<->agent connection, stop watching + return + } else if len(peers) == 0 { + continue + } + peer := status.Peer[peers[0]] + // If the connection type has changed, send a telemetry event with the latest ping stats + if c.telemetryStore.checkConnType(peer.Relay) && c.telemetryStore.connectedIP != nil { + _, _, _, _ = c.Ping(c.watchCtx, *c.telemetryStore.connectedIP) + } + } +} + // PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted // tunnel to a peer via a Conn type PeerDiagnostics struct { diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 9e1fba3df7585..8b72d89493f11 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -42,6 +42,7 @@ type TelemetryStore struct { connectedIP *netip.Addr // 0 if not connected nodeIDRemote uint64 + p2p bool } func newTelemetryStore() (*TelemetryStore, error) { @@ -84,6 +85,23 @@ func (b *TelemetryStore) markConnected(ip *netip.Addr, connCreatedAt time.Time, b.application = application } +// Return whether we've changed to/from a P2P connection +func (b *TelemetryStore) checkConnType(relay string) bool { + b.mu.Lock() + defer b.mu.Unlock() + + if b.p2p && relay == "" { + return false + } else if !b.p2p && relay == "" { + b.p2p = true + return true + } else if b.p2p && relay != "" { + b.p2p = false + return true + } + return false +} + func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) { if b.connectedIP == nil { return From 6a198494daf5454c840778ebe61f62d1960e9d26 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 07:24:20 +0000 Subject: [PATCH 12/17] fixup --- tailnet/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 02a920965bc85..8b90c655442b5 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -727,7 +727,7 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) { c.magicConn.ServeHTTPDebug(w, r) } -// Currently called when connected to the Agent with the given IP using the given application. +// SendConnectedTelemetry should be called when connection to a peer with the given IP is established. func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { if c.telemetrySink == nil { return From 11f2a2b69de7b80a46f91b7637d0910340242930 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 07:49:14 +0000 Subject: [PATCH 13/17] use latest p2p setup time --- tailnet/conn.go | 3 +-- tailnet/telemetry.go | 19 +++++++++++++------ tailnet/telemetry_internal_test.go | 3 +-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 8b90c655442b5..3d8eb1190a262 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -732,7 +732,7 @@ func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { if c.telemetrySink == nil { return } - c.telemetryStore.markConnected(&ip, c.createdAt, application) + c.telemetryStore.markConnected(&ip, application) e := c.newTelemetryEvent() e.Status = proto.TelemetryEvent_CONNECTED c.sendTelemetryBackground(e) @@ -768,7 +768,6 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) { if pr.Endpoint != "" { e.P2PLatency = latency e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint) - e.P2PSetup = durationpb.New(time.Since(c.createdAt)) } else { e.DerpLatency = latency } diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 8b72d89493f11..b8df7cfd31a0c 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -43,6 +43,9 @@ type TelemetryStore struct { // 0 if not connected nodeIDRemote uint64 p2p bool + + p2pSetupTime time.Duration + lastDerpTime time.Time } func newTelemetryStore() (*TelemetryStore, error) { @@ -61,7 +64,7 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent { b.mu.Lock() defer b.mu.Unlock() - return &proto.TelemetryEvent{ + out := &proto.TelemetryEvent{ Time: timestamppb.Now(), DerpMap: DERPMapToProto(b.cleanDerpMap), LatestNetcheck: b.cleanNetCheck, @@ -70,17 +73,18 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent { HomeDerp: b.homeDerp, ConnectionSetup: b.connSetupTime, Application: b.application, - - // TODO(ethanndickson): - P2PSetup: &durationpb.Duration{}, } + if b.p2pSetupTime > 0 { + out.P2PSetup = durationpb.New(b.p2pSetupTime) + } + return out } -func (b *TelemetryStore) markConnected(ip *netip.Addr, connCreatedAt time.Time, application string) { +func (b *TelemetryStore) markConnected(ip *netip.Addr, application string) { b.mu.Lock() defer b.mu.Unlock() - b.connSetupTime = durationpb.New(time.Since(connCreatedAt)) + b.lastDerpTime = time.Now() b.connectedIP = ip b.application = application } @@ -94,9 +98,12 @@ func (b *TelemetryStore) checkConnType(relay string) bool { return false } else if !b.p2p && relay == "" { b.p2p = true + b.p2pSetupTime = time.Since(b.lastDerpTime) return true } else if b.p2p && relay != "" { b.p2p = false + b.lastDerpTime = time.Now() + b.p2pSetupTime = 0 return true } return false diff --git a/tailnet/telemetry_internal_test.go b/tailnet/telemetry_internal_test.go index fe3b24e2b7095..4e085acea8f05 100644 --- a/tailnet/telemetry_internal_test.go +++ b/tailnet/telemetry_internal_test.go @@ -4,7 +4,6 @@ import ( "fmt" "net/netip" "testing" - "time" "github.com/stretchr/testify/require" "tailscale.com/tailcfg" @@ -66,7 +65,7 @@ func TestTelemetryStore(t *testing.T) { telemetry, err := newTelemetryStore() require.NoError(t, err) - telemetry.markConnected(&remoteIP, time.Now(), application) + telemetry.markConnected(&remoteIP, application) telemetry.updateNetworkMap(nm) e := telemetry.newEvent() // DERPMapToProto already tested From fe7252a5bf78300ef9c5b7b0a506ba923d441120 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 08:07:33 +0000 Subject: [PATCH 14/17] ensure latest derpmap --- tailnet/configmaps.go | 5 +++-- tailnet/conn.go | 6 ++++-- tailnet/telemetry.go | 10 ++++++++-- tailnet/telemetry_internal_test.go | 4 +--- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tailnet/configmaps.go b/tailnet/configmaps.go index 3d817b17516b8..a6ef9f40028b1 100644 --- a/tailnet/configmaps.go +++ b/tailnet/configmaps.go @@ -284,15 +284,16 @@ func (c *configMaps) getBlockEndpoints() bool { // setDERPMap sets the DERP map, triggering a configuration of the engine if it has changed. // c.L MUST NOT be held. // Returns if the derpMap is dirty. -func (c *configMaps) setDERPMap(derpMap *tailcfg.DERPMap) { +func (c *configMaps) setDERPMap(derpMap *tailcfg.DERPMap) bool { c.L.Lock() defer c.L.Unlock() if CompareDERPMaps(c.derpMap, derpMap) { - return + return false } c.derpMap = derpMap c.derpMapDirty = true c.Broadcast() + return true } // derMapLocked returns the current DERPMap. c.L must be held diff --git a/tailnet/conn.go b/tailnet/conn.go index 3d8eb1190a262..43612ca0c1757 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -409,7 +409,9 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) { // SetDERPMap updates the DERPMap of a connection. func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { - c.configMaps.setDERPMap(derpMap) + if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil { + c.telemetryStore.updateDerpMap(derpMap) + } } func (c *Conn) SetDERPForceWebSockets(v bool) { @@ -814,7 +816,7 @@ func (c *Conn) watchConnChange() { } peer := status.Peer[peers[0]] // If the connection type has changed, send a telemetry event with the latest ping stats - if c.telemetryStore.checkConnType(peer.Relay) && c.telemetryStore.connectedIP != nil { + if c.telemetryStore.changedConntype(peer.Relay) && c.telemetryStore.connectedIP != nil { _, _, _, _ = c.Ping(c.watchCtx, *c.telemetryStore.connectedIP) } } diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index b8df7cfd31a0c..9a60634fa3e71 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -89,8 +89,7 @@ func (b *TelemetryStore) markConnected(ip *netip.Addr, application string) { b.application = application } -// Return whether we've changed to/from a P2P connection -func (b *TelemetryStore) checkConnType(relay string) bool { +func (b *TelemetryStore) changedConntype(relay string) bool { b.mu.Lock() defer b.mu.Unlock() @@ -141,6 +140,13 @@ func (b *TelemetryStore) updateNetworkMap(nm *netmap.NetworkMap) { // Given a DERPMap, anonymise all IPs and hostnames. // Keep track of seen hostnames/cert names to anonymize them from future logs. // b.mu must NOT be held. +func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) { + b.mu.Lock() + defer b.mu.Unlock() + + b.updateDerpMapLocked(cur) +} + func (b *TelemetryStore) updateDerpMapLocked(cur *tailcfg.DERPMap) { if cur == nil { return diff --git a/tailnet/telemetry_internal_test.go b/tailnet/telemetry_internal_test.go index 4e085acea8f05..9b3e4d88206a6 100644 --- a/tailnet/telemetry_internal_test.go +++ b/tailnet/telemetry_internal_test.go @@ -197,9 +197,7 @@ func TestTelemetryStore(t *testing.T) { }, }, } - telemetry.mu.Lock() - telemetry.updateDerpMapLocked(derpMap) - telemetry.mu.Unlock() + telemetry.updateDerpMap(derpMap) event := telemetry.newEvent() require.Len(t, event.DerpMap.Regions[999].Nodes, 1) From 4b76942aac5f1736b7b32fbcb65895761c7b541e Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 13:35:25 +0000 Subject: [PATCH 15/17] nicer ping peer --- tailnet/conn.go | 12 ++++-------- tailnet/telemetry.go | 12 ++++++++++++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index 43612ca0c1757..f183d79e78635 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -296,15 +296,11 @@ func NewConn(options *Options) (conn *Conn, err error) { server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) { server.telemetryStore.setNetInfo(ni) nodeUp.setNetInfo(ni) - if server.telemetryStore.connectedIP != nil { - _, _, _, _ = server.Ping(ctx, *server.telemetryStore.connectedIP) - } + server.telemetryStore.pingPeer(server) }) server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) { server.telemetryStore.updateNetworkMap(nm) - if server.telemetryStore.connectedIP != nil { - _, _, _, _ = server.Ping(ctx, *server.telemetryStore.connectedIP) - } + server.telemetryStore.pingPeer(server) }) go server.watchConnChange() } else { @@ -816,8 +812,8 @@ func (c *Conn) watchConnChange() { } peer := status.Peer[peers[0]] // If the connection type has changed, send a telemetry event with the latest ping stats - if c.telemetryStore.changedConntype(peer.Relay) && c.telemetryStore.connectedIP != nil { - _, _, _, _ = c.Ping(c.watchCtx, *c.telemetryStore.connectedIP) + if c.telemetryStore.changedConntype(peer.Relay) { + c.telemetryStore.pingPeer(c) } } } diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 9a60634fa3e71..9da7621c8762f 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -89,6 +89,18 @@ func (b *TelemetryStore) markConnected(ip *netip.Addr, application string) { b.application = application } +func (b *TelemetryStore) pingPeer(conn *Conn) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.connectedIP == nil { + return + } + go func() { + _, _, _, _ = conn.Ping(conn.watchCtx, *b.connectedIP) + }() +} + func (b *TelemetryStore) changedConntype(relay string) bool { b.mu.Lock() defer b.mu.Unlock() From 6214e2a47860a46c3a027659f26dcdede58b9913 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 9 Jul 2024 13:42:22 +0000 Subject: [PATCH 16/17] fix race --- tailnet/telemetry.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index 9da7621c8762f..bb369e81dce43 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -96,8 +96,9 @@ func (b *TelemetryStore) pingPeer(conn *Conn) { if b.connectedIP == nil { return } + ip := *b.connectedIP go func() { - _, _, _, _ = conn.Ping(conn.watchCtx, *b.connectedIP) + _, _, _, _ = conn.Ping(conn.watchCtx, ip) }() } From 60e88f7582406f76a0efce56a31f147ddbbf0cea Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Wed, 10 Jul 2024 03:48:06 +0000 Subject: [PATCH 17/17] relay -> udp addr --- tailnet/conn.go | 2 +- tailnet/telemetry.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tailnet/conn.go b/tailnet/conn.go index f183d79e78635..5aefb3e404ecf 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -812,7 +812,7 @@ func (c *Conn) watchConnChange() { } peer := status.Peer[peers[0]] // If the connection type has changed, send a telemetry event with the latest ping stats - if c.telemetryStore.changedConntype(peer.Relay) { + if c.telemetryStore.changedConntype(peer.CurAddr) { c.telemetryStore.pingPeer(c) } } diff --git a/tailnet/telemetry.go b/tailnet/telemetry.go index bb369e81dce43..f98297be7cbf5 100644 --- a/tailnet/telemetry.go +++ b/tailnet/telemetry.go @@ -102,17 +102,17 @@ func (b *TelemetryStore) pingPeer(conn *Conn) { }() } -func (b *TelemetryStore) changedConntype(relay string) bool { +func (b *TelemetryStore) changedConntype(addr string) bool { b.mu.Lock() defer b.mu.Unlock() - if b.p2p && relay == "" { + if b.p2p && addr != "" { return false - } else if !b.p2p && relay == "" { + } else if !b.p2p && addr != "" { b.p2p = true b.p2pSetupTime = time.Since(b.lastDerpTime) return true - } else if b.p2p && relay != "" { + } else if b.p2p && addr == "" { b.p2p = false b.lastDerpTime = time.Now() b.p2pSetupTime = 0