Skip to content

Commit 09f375b

Browse files
committed
derpmap telemetry clean
1 parent 0dd5134 commit 09f375b

File tree

4 files changed

+148
-41
lines changed

4 files changed

+148
-41
lines changed

tailnet/configmaps.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,15 +283,17 @@ func (c *configMaps) getBlockEndpoints() bool {
283283

284284
// setDERPMap sets the DERP map, triggering a configuration of the engine if it has changed.
285285
// c.L MUST NOT be held.
286-
func (c *configMaps) setDERPMap(derpMap *tailcfg.DERPMap) {
286+
// Returns if the derpMap is dirty.
287+
func (c *configMaps) setDERPMap(derpMap *tailcfg.DERPMap) bool {
287288
c.L.Lock()
288289
defer c.L.Unlock()
289290
if CompareDERPMaps(c.derpMap, derpMap) {
290-
return
291+
return false
291292
}
292293
c.derpMap = derpMap
293294
c.derpMapDirty = true
294295
c.Broadcast()
296+
return true
295297
}
296298

297299
// derMapLocked returns the current DERPMap. c.L must be held

tailnet/conn.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@ func NewConn(options *Options) (conn *Conn, err error) {
138138

139139
var (
140140
logger = newMultiLogger(options.Logger)
141-
telemetryLogSink *bufferLogSink
141+
telemetryLogSink *TelemetryStore
142142
)
143143
if options.TelemetrySink != nil {
144144
var err error
145-
telemetryLogSink, err = newBufferLogSink()
145+
telemetryLogSink, err = newTelemetryStore()
146146
if err != nil {
147147
return nil, xerrors.Errorf("create telemetry log sink: %w", err)
148148
}
@@ -352,7 +352,7 @@ type Conn struct {
352352

353353
telemetrySink TelemetrySink
354354
// telemetryLogs will be nil if telemetrySink is nil.
355-
telemetryLogs *bufferLogSink
355+
telemetryLogs *TelemetryStore
356356
telemetryWg sync.WaitGroup
357357

358358
trafficStats *connstats.Statistics
@@ -388,7 +388,9 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {
388388

389389
// SetDERPMap updates the DERPMap of a connection.
390390
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
391-
c.configMaps.setDERPMap(derpMap)
391+
if c.configMaps.setDERPMap(derpMap) && c.telemetryLogs != nil {
392+
c.telemetryLogs.updateDerpMap(derpMap)
393+
}
392394
}
393395

394396
func (c *Conn) SetDERPForceWebSockets(v bool) {
@@ -512,6 +514,8 @@ func (c *Conn) AwaitReachable(ctx context.Context, ip netip.Addr) bool {
512514
for {
513515
select {
514516
case <-completedCtx.Done():
517+
// TODO(ethanndickson): For now, I'm interpreting 'connected' as when the
518+
// agent is reachable.
515519
_ = c.connectedTelemetryEvent()
516520
return true
517521
case <-t.C:
@@ -719,6 +723,7 @@ func (c *Conn) connectedTelemetryEvent() error {
719723
return nil
720724
}
721725

726+
// The returned telemetry event will not have it's status set.
722727
func (c *Conn) newTelemetryEvent() (*proto.TelemetryEvent, error) {
723728
id, err := c.id.MarshalBinary()
724729
if err != nil {
@@ -728,22 +733,22 @@ func (c *Conn) newTelemetryEvent() (*proto.TelemetryEvent, error) {
728733
node := c.nodeUpdater.nodeLocked()
729734
c.nodeUpdater.L.Unlock()
730735

731-
logs, ips := c.telemetryLogs.getLogs()
736+
logs, ips, dm := c.telemetryLogs.getStore()
732737
return &proto.TelemetryEvent{
733738
Id: id,
734739
Time: timestamppb.Now(),
735740
ClientType: c.clientType,
736741
NodeIdSelf: uint64(node.ID),
737742
Logs: logs,
738743
LogIpHashes: ips,
744+
DerpMap: DERPMapToProto(dm),
739745

740746
// TODO:
741747
Application: "",
742748
NodeIdRemote: 0,
743749
P2PEndpoint: &proto.TelemetryEvent_P2PEndpoint{},
744750
ThroughputMbits: &wrapperspb.FloatValue{},
745751
HomeDerp: "",
746-
DerpMap: &proto.DERPMap{},
747752
LatestNetcheck: &proto.Netcheck{},
748753
ConnectionAge: &durationpb.Duration{},
749754
ConnectionSetup: &durationpb.Duration{},

tailnet/logger_internal_test.go

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,27 @@ import (
66
"testing"
77

88
"github.com/stretchr/testify/require"
9+
"tailscale.com/tailcfg"
910

1011
"cdr.dev/slog"
1112
"github.com/coder/coder/v2/tailnet/proto"
1213
)
1314

14-
func TestBufferLogSink(t *testing.T) {
15+
func TestTelemetryStore(t *testing.T) {
1516
t.Parallel()
1617

1718
t.Run("NoIP", func(t *testing.T) {
1819
t.Parallel()
1920
ctx := context.Background()
20-
sink, err := newBufferLogSink()
21+
sink, err := newTelemetryStore()
2122
require.NoError(t, err)
2223
logger := slog.Make(sink).Leveled(slog.LevelDebug)
2324

2425
logger.Debug(ctx, "line1")
2526
logger.Debug(ctx, "line2 fe80")
2627
logger.Debug(ctx, "line3 xxxx::x")
2728

28-
logs, hashes := sink.getLogs()
29+
logs, hashes, _ := sink.getStore()
2930
require.Len(t, logs, 3)
3031
require.Len(t, hashes, 0)
3132
require.Contains(t, logs[0], "line1")
@@ -103,7 +104,7 @@ func TestBufferLogSink(t *testing.T) {
103104
t.Run(c.name, func(t *testing.T) {
104105
t.Parallel()
105106
ctx := context.Background()
106-
sink, err := newBufferLogSink()
107+
sink, err := newTelemetryStore()
107108
require.NoError(t, err)
108109
logger := slog.Make(sink).Leveled(slog.LevelDebug)
109110

@@ -116,15 +117,15 @@ func TestBufferLogSink(t *testing.T) {
116117
logger.Debug(ctx, fmt.Sprintf("line2: %s/24", c.ip))
117118
logger.Debug(ctx, fmt.Sprintf("line3: %s foo (%s)", ipWithPort, c.ip))
118119

119-
logs, hashes := sink.getLogs()
120+
logs, ips, _ := sink.getStore()
120121
require.Len(t, logs, 3)
121-
require.Len(t, hashes, 1)
122+
require.Len(t, ips, 1)
122123
for _, log := range logs {
123124
t.Log(log)
124125
}
125126

126127
// This only runs once since we only processed a single IP.
127-
for expectedHash, ipFields := range hashes {
128+
for expectedHash, ipFields := range ips {
128129
hashedIPWithPort := expectedHash + ":8080"
129130
if c.expectedVersion == 6 {
130131
hashedIPWithPort = fmt.Sprintf("[%s]:8080", expectedHash)
@@ -141,4 +142,55 @@ func TestBufferLogSink(t *testing.T) {
141142
})
142143
}
143144
})
145+
146+
t.Run("DerpMapClean", func(t *testing.T) {
147+
t.Parallel()
148+
ctx := context.Background()
149+
telemetry, err := newTelemetryStore()
150+
require.NoError(t, err)
151+
logger := slog.Make(telemetry).Leveled(slog.LevelDebug)
152+
153+
derpMap := &tailcfg.DERPMap{
154+
Regions: make(map[int]*tailcfg.DERPRegion),
155+
}
156+
// Add a region and node that uses every single field.
157+
derpMap.Regions[999] = &tailcfg.DERPRegion{
158+
RegionID: 999,
159+
EmbeddedRelay: true,
160+
RegionCode: "zzz",
161+
RegionName: "Cool Region",
162+
Avoid: true,
163+
164+
Nodes: []*tailcfg.DERPNode{
165+
{
166+
Name: "zzz1",
167+
RegionID: 999,
168+
HostName: "coolderp.com",
169+
CertName: "coolderpcert",
170+
IPv4: "1.2.3.4",
171+
IPv6: "2001:db8::1",
172+
STUNTestIP: "5.6.7.8",
173+
},
174+
},
175+
}
176+
telemetry.updateDerpMap(derpMap)
177+
178+
logger.Debug(ctx, "line1 coolderp.com qwerty")
179+
logger.Debug(ctx, "line2 1.2.3.4 asdf")
180+
logger.Debug(ctx, "line3 2001:db8::1 foo")
181+
182+
logs, ips, dm := telemetry.getStore()
183+
require.Len(t, logs, 3)
184+
require.Len(t, ips, 3)
185+
require.Len(t, dm.Regions[999].Nodes, 1)
186+
node := dm.Regions[999].Nodes[0]
187+
require.NotContains(t, node.HostName, "coolderp.com")
188+
require.NotContains(t, node.IPv4, "1.2.3.4")
189+
require.NotContains(t, node.IPv6, "2001:db8::1")
190+
require.NotContains(t, node.STUNTestIP, "5.6.7.8")
191+
require.Contains(t, logs[0], node.HostName)
192+
require.Contains(t, ips, node.STUNTestIP)
193+
require.Contains(t, ips, node.IPv6)
194+
require.Contains(t, ips, node.IPv4)
195+
})
144196
}

tailnet/logger.go renamed to tailnet/telemetry.go

Lines changed: 74 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync"
1212

1313
"golang.org/x/xerrors"
14+
"tailscale.com/tailcfg"
1415

1516
"cdr.dev/slog"
1617
"cdr.dev/slog/sloggers/sloghuman"
@@ -87,45 +88,81 @@ func (m multiLogger) With(fields ...slog.Field) multiLogger {
8788

8889
// A logger sink that extracts (anonymized) IP addresses from logs for building
8990
// network telemetry events
90-
type bufferLogSink struct {
91+
type TelemetryStore struct {
92+
// Always self-referential
9193
sink slog.Sink
9294
mu sync.Mutex
9395
// TODO: Store only useful logs
94-
logs []string
95-
// We use the same salt so the same IP hashes to the same value.
96+
logs []string
9697
hashSalt string
97-
// A cache to avoid hashing the same IP multiple times.
98-
ipToHash map[string]string
98+
// A cache to avoid hashing the same IP or hostname multiple times.
99+
hashCache map[string]string
99100
hashedIPs map[string]*proto.IPFields
101+
102+
cleanDerpMap *tailcfg.DERPMap
103+
derpMapFilter *regexp.Regexp
100104
}
101105

102-
var _ slog.Sink = &bufferLogSink{}
106+
var _ slog.Sink = &TelemetryStore{}
103107

104-
var _ io.Writer = &bufferLogSink{}
108+
var _ io.Writer = &TelemetryStore{}
105109

106-
func newBufferLogSink() (*bufferLogSink, error) {
110+
func newTelemetryStore() (*TelemetryStore, error) {
107111
hashSalt, err := cryptorand.String(16)
108112
if err != nil {
109113
return nil, err
110114
}
111-
out := &bufferLogSink{
112-
logs: []string{},
113-
hashSalt: hashSalt,
114-
ipToHash: make(map[string]string),
115-
hashedIPs: make(map[string]*proto.IPFields),
115+
out := &TelemetryStore{
116+
logs: []string{},
117+
hashSalt: hashSalt,
118+
hashCache: make(map[string]string),
119+
hashedIPs: make(map[string]*proto.IPFields),
120+
derpMapFilter: regexp.MustCompile(`^$`),
116121
}
117122
out.sink = sloghuman.Sink(out)
118123
return out, nil
119124
}
120125

121-
func (b *bufferLogSink) getLogs() ([]string, map[string]*proto.IPFields) {
126+
func (b *TelemetryStore) getStore() ([]string, map[string]*proto.IPFields, *tailcfg.DERPMap) {
122127
b.mu.Lock()
123128
defer b.mu.Unlock()
124-
return append([]string{}, b.logs...), b.hashedIPs
129+
return append([]string{}, b.logs...), b.hashedIPs, b.cleanDerpMap.Clone()
130+
}
131+
132+
// Given a DERPMap, anonymise all IPs and hostnames.
133+
// Keep track of seen hostnames/cert names to anonymize them from future logs.
134+
// b.mu must NOT be held.
135+
func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) {
136+
b.mu.Lock()
137+
defer b.mu.Unlock()
138+
var names []string
139+
cleanMap := cur.Clone()
140+
for _, r := range cleanMap.Regions {
141+
for _, n := range r.Nodes {
142+
escapedName := regexp.QuoteMeta(n.HostName)
143+
escapedCertName := regexp.QuoteMeta(n.CertName)
144+
names = append(names, escapedName, escapedCertName)
145+
146+
ipv4, _ := b.processIPLocked(n.IPv4)
147+
n.IPv4 = ipv4
148+
ipv6, _ := b.processIPLocked(n.IPv6)
149+
n.IPv6 = ipv6
150+
stunIP, _ := b.processIPLocked(n.STUNTestIP)
151+
n.STUNTestIP = stunIP
152+
hn := b.hashAddr(n.HostName)
153+
n.HostName = hn
154+
cn := b.hashAddr(n.CertName)
155+
n.CertName = cn
156+
}
157+
}
158+
if len(names) != 0 {
159+
b.derpMapFilter = regexp.MustCompile((strings.Join(names, "|")))
160+
}
161+
b.cleanDerpMap = cleanMap
125162
}
126163

127164
// Write implements io.Writer.
128-
func (b *bufferLogSink) Write(p []byte) (n int, err error) {
165+
func (b *TelemetryStore) Write(p []byte) (n int, err error) {
129166
b.mu.Lock()
130167
defer b.mu.Unlock()
131168

@@ -138,37 +175,39 @@ func (b *bufferLogSink) Write(p []byte) (n int, err error) {
138175
if len(logLineAfterLevel) == 2 {
139176
logLineAfterLevel = logLineSplit[1]
140177
}
178+
// Anonymize IP addresses
141179
for _, match := range ipv4And6Regex.FindAllString(logLineAfterLevel, -1) {
142180
hash, err := b.processIPLocked(match)
143181
if err == nil {
144182
logLine = strings.ReplaceAll(logLine, match, hash)
145183
}
146184
}
185+
// Anonymize derp map host names
186+
for _, match := range b.derpMapFilter.FindAllString(logLineAfterLevel, -1) {
187+
hash := b.hashAddr(match)
188+
logLine = strings.ReplaceAll(logLine, match, hash)
189+
}
147190

148191
b.logs = append(b.logs, logLine)
149192
return len(p), nil
150193
}
151194

152195
// LogEntry implements slog.Sink.
153-
func (b *bufferLogSink) LogEntry(ctx context.Context, e slog.SinkEntry) {
196+
func (b *TelemetryStore) LogEntry(ctx context.Context, e slog.SinkEntry) {
154197
// This will call (*bufferLogSink).Write
155198
b.sink.LogEntry(ctx, e)
156199
}
157200

158201
// Sync implements slog.Sink.
159-
func (b *bufferLogSink) Sync() {
202+
func (b *TelemetryStore) Sync() {
160203
b.sink.Sync()
161204
}
162205

163206
// processIPLocked will look up the IP in the cache, or hash and salt it and add
164207
// to the cache. It will also add it to hashedIPs.
165208
//
166209
// b.mu must be held.
167-
func (b *bufferLogSink) processIPLocked(ip string) (string, error) {
168-
if hashStr, ok := b.ipToHash[ip]; ok {
169-
return hashStr, nil
170-
}
171-
210+
func (b *TelemetryStore) processIPLocked(ip string) (string, error) {
172211
addr, err := netip.ParseAddr(ip)
173212
if err != nil {
174213
return "", xerrors.Errorf("failed to parse IP %q: %w", ip, err)
@@ -190,12 +229,21 @@ func (b *bufferLogSink) processIPLocked(ip string) (string, error) {
190229
class = proto.IPFields_PRIVATE
191230
}
192231

193-
hash := sha256.Sum256([]byte(b.hashSalt + ip))
194-
hashStr := hex.EncodeToString(hash[:])
195-
b.ipToHash[ip] = hashStr
232+
hashStr := b.hashAddr(ip)
196233
b.hashedIPs[hashStr] = &proto.IPFields{
197234
Version: version,
198235
Class: class,
199236
}
200237
return hashStr, nil
201238
}
239+
240+
func (b *TelemetryStore) hashAddr(addr string) string {
241+
if hashStr, ok := b.hashCache[addr]; ok {
242+
return hashStr
243+
}
244+
245+
hash := sha256.Sum256([]byte(b.hashSalt + addr))
246+
hashStr := hex.EncodeToString(hash[:])
247+
b.hashCache[addr] = hashStr
248+
return hashStr
249+
}

0 commit comments

Comments
 (0)