Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
netcheck stub
  • Loading branch information
ethanndickson committed Jul 2, 2024
commit 09d92490905c5790d6104e46f0852885f977de6a
46 changes: 24 additions & 22 deletions tailnet/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,16 @@ func NewConn(options *Options) (conn *Conn, err error) {
}

var (
logger = newMultiLogger(options.Logger)
telemetryLogSink *TelemetryStore
logger = newMultiLogger(options.Logger)
telemetryStore *TelemetryStore
)
if options.TelemetrySink != nil {
var err error
telemetryLogSink, err = newTelemetryStore()
telemetryStore, err = newTelemetryStore()
if err != nil {
return nil, xerrors.Errorf("create telemetry log sink: %w", err)
}
logger = logger.appendLogger(slog.Make(telemetryLogSink).Leveled(slog.LevelDebug))
logger = logger.appendLogger(slog.Make(telemetryStore).Leveled(slog.LevelDebug))
}

nodePrivateKey := key.NewNode()
Expand Down Expand Up @@ -270,9 +270,13 @@ func NewConn(options *Options) (conn *Conn, err error) {
wireguardEngine.SetStatusCallback(nodeUp.setStatus)
wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
if options.TelemetrySink != nil {
magicConn.SetNetInfoCallback(telemetryStore.setNetInfo)
}

server := &Conn{
id: uuid.New(),
nodeID: nodeID,
closed: make(chan struct{}),
logger: logger,
magicConn: magicConn,
Expand All @@ -288,7 +292,7 @@ func NewConn(options *Options) (conn *Conn, err error) {
configMaps: cfgMaps,
nodeUpdater: nodeUp,
telemetrySink: options.TelemetrySink,
telemetryLogs: telemetryLogSink,
telemeteryStore: telemetryStore,
}
defer func() {
if err != nil {
Expand Down Expand Up @@ -334,6 +338,7 @@ func IPFromUUID(uid uuid.UUID) netip.Addr {
type Conn struct {
// ID must be unique to this connection
id uuid.UUID
nodeID tailcfg.NodeID
mutex sync.Mutex
closed chan struct{}
logger multiLogger
Expand All @@ -351,9 +356,9 @@ type Conn struct {
clientType proto.TelemetryEvent_ClientType

telemetrySink TelemetrySink
// telemetryLogs will be nil if telemetrySink is nil.
telemetryLogs *TelemetryStore
telemetryWg sync.WaitGroup
// telemeteryStore will be nil if telemetrySink is nil.
telemeteryStore *TelemetryStore
telemetryWg sync.WaitGroup

trafficStats *connstats.Statistics
}
Expand Down Expand Up @@ -388,8 +393,8 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {

// SetDERPMap updates the DERPMap of a connection.
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
if c.configMaps.setDERPMap(derpMap) && c.telemetryLogs != nil {
c.telemetryLogs.updateDerpMap(derpMap)
if c.configMaps.setDERPMap(derpMap) && c.telemeteryStore != nil {
c.telemeteryStore.updateDerpMap(derpMap)
}
}

Expand Down Expand Up @@ -729,27 +734,24 @@ func (c *Conn) newTelemetryEvent() (*proto.TelemetryEvent, error) {
if err != nil {
return nil, xerrors.Errorf("marshal uuid to bytes: %w", err)
}
c.nodeUpdater.L.Lock()
node := c.nodeUpdater.nodeLocked()
c.nodeUpdater.L.Unlock()

logs, ips, dm := c.telemetryLogs.getStore()
logs, ips, dm, ni := c.telemeteryStore.getStore()
return &proto.TelemetryEvent{
Id: id,
Time: timestamppb.Now(),
ClientType: c.clientType,
NodeIdSelf: uint64(node.ID),
Logs: logs,
LogIpHashes: ips,
DerpMap: DERPMapToProto(dm),
Id: id,
Time: timestamppb.Now(),
ClientType: c.clientType,
NodeIdSelf: uint64(c.nodeID),
Logs: logs,
LogIpHashes: ips,
DerpMap: DERPMapToProto(dm),
LatestNetcheck: NetInfoToProto(ni),

// TODO:
Application: "",
NodeIdRemote: 0,
P2PEndpoint: &proto.TelemetryEvent_P2PEndpoint{},
ThroughputMbits: &wrapperspb.FloatValue{},
HomeDerp: "",
LatestNetcheck: &proto.Netcheck{},
ConnectionAge: &durationpb.Duration{},
ConnectionSetup: &durationpb.Duration{},
P2PSetup: &durationpb.Duration{},
Expand Down
10 changes: 10 additions & 0 deletions tailnet/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,13 @@ func DERPNodeFromProto(node *proto.DERPMap_Region_Node) *tailcfg.DERPNode {
CanPort80: node.CanPort_80,
}
}

func NetInfoToProto(netInfo *tailcfg.NetInfo) *proto.Netcheck {
if netInfo == nil {
return nil
}

return &proto.Netcheck{
// TODO:
}
}
6 changes: 3 additions & 3 deletions tailnet/logger_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestTelemetryStore(t *testing.T) {
logger.Debug(ctx, "line2 fe80")
logger.Debug(ctx, "line3 xxxx::x")

logs, hashes, _ := sink.getStore()
logs, hashes, _, _ := sink.getStore()
require.Len(t, logs, 3)
require.Len(t, hashes, 0)
require.Contains(t, logs[0], "line1")
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestTelemetryStore(t *testing.T) {
logger.Debug(ctx, fmt.Sprintf("line2: %s/24", c.ip))
logger.Debug(ctx, fmt.Sprintf("line3: %s foo (%s)", ipWithPort, c.ip))

logs, ips, _ := sink.getStore()
logs, ips, _, _ := sink.getStore()
require.Len(t, logs, 3)
require.Len(t, ips, 1)
for _, log := range logs {
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestTelemetryStore(t *testing.T) {
logger.Debug(ctx, "line2 1.2.3.4 asdf")
logger.Debug(ctx, "line3 2001:db8::1 foo")

logs, ips, dm := telemetry.getStore()
logs, ips, dm, _ := telemetry.getStore()
require.Len(t, logs, 3)
require.Len(t, ips, 3)
require.Len(t, dm.Regions[999].Nodes, 1)
Expand Down
14 changes: 12 additions & 2 deletions tailnet/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type TelemetryStore struct {

cleanDerpMap *tailcfg.DERPMap
derpMapFilter *regexp.Regexp
netInfo *tailcfg.NetInfo
}

var _ slog.Sink = &TelemetryStore{}
Expand All @@ -123,10 +124,12 @@ func newTelemetryStore() (*TelemetryStore, error) {
return out, nil
}

func (b *TelemetryStore) getStore() ([]string, map[string]*proto.IPFields, *tailcfg.DERPMap) {
// getStore returns a deep copy of all current telemetry state.
// TODO: Should this return a populated event instead?
func (b *TelemetryStore) getStore() ([]string, map[string]*proto.IPFields, *tailcfg.DERPMap, *tailcfg.NetInfo) {
b.mu.Lock()
defer b.mu.Unlock()
return append([]string{}, b.logs...), b.hashedIPs, b.cleanDerpMap.Clone()
return append([]string{}, b.logs...), b.hashedIPs, b.cleanDerpMap.Clone(), b.netInfo.Clone()
}

// Given a DERPMap, anonymise all IPs and hostnames.
Expand Down Expand Up @@ -161,6 +164,13 @@ func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) {
b.cleanDerpMap = cleanMap
}

func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) {
b.mu.Lock()
defer b.mu.Unlock()
// TODO: Scrub PII from NetInfo
b.netInfo = ni
}

// Write implements io.Writer.
func (b *TelemetryStore) Write(p []byte) (n int, err error) {
b.mu.Lock()
Expand Down