Skip to content

Commit 0bebad4

Browse files
committed
watch for conn changes
1 parent e2e4018 commit 0bebad4

File tree

2 files changed

+61
-8
lines changed

2 files changed

+61
-8
lines changed

tailnet/conn.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ func NewConn(options *Options) (conn *Conn, err error) {
264264
nodeUp.setAddresses(options.Addresses)
265265
nodeUp.setBlockEndpoints(options.BlockEndpoints)
266266

267+
ctx, ctxCancel := context.WithCancel(context.Background())
267268
server := &Conn{
268269
id: uuid.New(),
269270
closed: make(chan struct{}),
@@ -283,6 +284,8 @@ func NewConn(options *Options) (conn *Conn, err error) {
283284
telemetrySink: options.TelemetrySink,
284285
telemetryStore: telemetryStore,
285286
createdAt: time.Now(),
287+
watchCtx: ctx,
288+
watchCancel: ctxCancel,
286289
}
287290
defer func() {
288291
if err != nil {
@@ -293,8 +296,17 @@ func NewConn(options *Options) (conn *Conn, err error) {
293296
server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) {
294297
server.telemetryStore.setNetInfo(ni)
295298
nodeUp.setNetInfo(ni)
299+
if server.telemetryStore.connectedIP != nil {
300+
_, _, _, _ = server.Ping(ctx, *server.telemetryStore.connectedIP)
301+
}
296302
})
297-
server.wireguardEngine.AddNetworkMapCallback(server.networkMapCallback)
303+
server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) {
304+
server.telemetryStore.updateNetworkMap(nm)
305+
if server.telemetryStore.connectedIP != nil {
306+
_, _, _, _ = server.Ping(ctx, *server.telemetryStore.connectedIP)
307+
}
308+
})
309+
go server.watchConnChange()
298310
} else {
299311
server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
300312
}
@@ -361,6 +373,9 @@ type Conn struct {
361373
telemetryStore *TelemetryStore
362374
telemetryWg sync.WaitGroup
363375

376+
watchCtx context.Context
377+
watchCancel func()
378+
364379
trafficStats *connstats.Statistics
365380
}
366381

@@ -542,6 +557,7 @@ func (c *Conn) Closed() <-chan struct{} {
542557
// Close shuts down the Wireguard connection.
543558
func (c *Conn) Close() error {
544559
c.logger.Info(context.Background(), "closing tailnet Conn")
560+
c.watchCancel()
545561
c.telemetryWg.Wait()
546562
c.configMaps.close()
547563
c.nodeUpdater.close()
@@ -771,13 +787,6 @@ func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent {
771787
return event
772788
}
773789

774-
func (c *Conn) networkMapCallback(nm *netmap.NetworkMap) {
775-
c.telemetryStore.updateNetworkMap(nm)
776-
if c.telemetryStore.connectedIP != nil {
777-
_, _, _, _ = c.Ping(context.Background(), *c.telemetryStore.connectedIP)
778-
}
779-
}
780-
781790
func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) {
782791
c.telemetryWg.Add(1)
783792
go func() {
@@ -786,6 +795,32 @@ func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) {
786795
}()
787796
}
788797

798+
// Watch for changes in the connection type (P2P<->DERP) and send telemetry events.
799+
func (c *Conn) watchConnChange() {
800+
ticker := time.NewTicker(time.Millisecond * 50)
801+
defer ticker.Stop()
802+
for {
803+
select {
804+
case <-c.watchCtx.Done():
805+
return
806+
case <-ticker.C:
807+
}
808+
status := c.Status()
809+
peers := status.Peers()
810+
if len(peers) > 1 {
811+
// Not a CLI<->agent connection, stop watching
812+
return
813+
} else if len(peers) == 0 {
814+
continue
815+
}
816+
peer := status.Peer[peers[0]]
817+
// If the connection type has changed, send a telemetry event with the latest ping stats
818+
if c.telemetryStore.checkConnType(peer.Relay) && c.telemetryStore.connectedIP != nil {
819+
_, _, _, _ = c.Ping(c.watchCtx, *c.telemetryStore.connectedIP)
820+
}
821+
}
822+
}
823+
789824
// PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted
790825
// tunnel to a peer via a Conn
791826
type PeerDiagnostics struct {

tailnet/telemetry.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type TelemetryStore struct {
4242
connectedIP *netip.Addr
4343
// 0 if not connected
4444
nodeIDRemote uint64
45+
p2p bool
4546
}
4647

4748
func newTelemetryStore() (*TelemetryStore, error) {
@@ -84,6 +85,23 @@ func (b *TelemetryStore) markConnected(ip *netip.Addr, connCreatedAt time.Time,
8485
b.application = application
8586
}
8687

88+
// Return whether we've changed to/from a P2P connection
89+
func (b *TelemetryStore) checkConnType(relay string) bool {
90+
b.mu.Lock()
91+
defer b.mu.Unlock()
92+
93+
if b.p2p && relay == "" {
94+
return false
95+
} else if !b.p2p && relay == "" {
96+
b.p2p = true
97+
return true
98+
} else if b.p2p && relay != "" {
99+
b.p2p = false
100+
return true
101+
}
102+
return false
103+
}
104+
87105
func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) {
88106
if b.connectedIP == nil {
89107
return

0 commit comments

Comments
 (0)