Skip to content

Commit f83df79

Browse files
committed
tests
1 parent 45b5234 commit f83df79

File tree

5 files changed

+265
-7
lines changed

5 files changed

+265
-7
lines changed

codersdk/workspacesdk/connector.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type tailnetAPIConnector struct {
6363
coordinateURL string
6464
dialOptions *websocket.DialOptions
6565
conn tailnetConn
66+
customDialFn func() (proto.DRPCTailnetClient, error)
6667

6768
clientMu sync.RWMutex
6869
client proto.DRPCTailnetClient
@@ -71,9 +72,9 @@ type tailnetAPIConnector struct {
7172
isFirst bool
7273
closed chan struct{}
7374

74-
// Set to true if we get a response from the server that it doesn't support
75+
// Only set to true if we get a response from the server that it doesn't support
7576
// network telemetry.
76-
telemetryDisabled atomic.Bool
77+
telemetryUnavailable atomic.Bool
7778
}
7879

7980
// Create a new tailnetAPIConnector without running it
@@ -133,6 +134,9 @@ var permanentErrorStatuses = []int{
133134
}
134135

135136
func (tac *tailnetAPIConnector) dial() (proto.DRPCTailnetClient, error) {
137+
if tac.customDialFn != nil {
138+
return tac.customDialFn()
139+
}
136140
tac.logger.Debug(tac.ctx, "dialing Coder tailnet v2+ API")
137141
// nolint:bodyclose
138142
ws, res, err := websocket.Dial(tac.ctx, tac.coordinateURL, tac.dialOptions)
@@ -277,7 +281,7 @@ func (tac *tailnetAPIConnector) SendTelemetryEvent(event *proto.TelemetryEvent)
277281
// We hold the lock for the entire telemetry request, but this would only block
278282
// a coordinate retry, and closing the connection.
279283
defer tac.clientMu.RUnlock()
280-
if tac.client == nil || tac.telemetryDisabled.Load() {
284+
if tac.client == nil || tac.telemetryUnavailable.Load() {
281285
return
282286
}
283287
ctx, cancel := context.WithTimeout(tac.ctx, 5*time.Second)
@@ -287,6 +291,6 @@ func (tac *tailnetAPIConnector) SendTelemetryEvent(event *proto.TelemetryEvent)
287291
})
288292
if drpcerr.Code(err) == drpcerr.Unimplemented || drpc.ProtocolError.Has(err) && strings.Contains(err.Error(), "unknown rpc: ") {
289293
tac.logger.Debug(tac.ctx, "attempted to send telemetry to a server that doesn't support it", slog.Error(err))
290-
tac.telemetryDisabled.Store(true)
294+
tac.telemetryUnavailable.Store(true)
291295
}
292296
}

codersdk/workspacesdk/connector_internal_test.go

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"github.com/hashicorp/yamux"
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
16+
"golang.org/x/xerrors"
1617
"nhooyr.io/websocket"
18+
"storj.io/drpc"
19+
"storj.io/drpc/drpcerr"
1720
"tailscale.com/tailcfg"
1821

1922
"cdr.dev/slog"
@@ -139,6 +142,140 @@ func TestTailnetAPIConnector_UplevelVersion(t *testing.T) {
139142
require.NotEmpty(t, sdkErr.Helper)
140143
}
141144

145+
func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
146+
t.Parallel()
147+
ctx := testutil.Context(t, testutil.WaitShort)
148+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
149+
agentID := uuid.UUID{0x55}
150+
clientID := uuid.UUID{0x66}
151+
fCoord := tailnettest.NewFakeCoordinator()
152+
var coord tailnet.Coordinator = fCoord
153+
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
154+
coordPtr.Store(&coord)
155+
derpMapCh := make(chan *tailcfg.DERPMap)
156+
defer close(derpMapCh)
157+
eventCh := make(chan []*proto.TelemetryEvent, 1)
158+
svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
159+
Logger: logger,
160+
CoordPtr: &coordPtr,
161+
DERPMapUpdateFrequency: time.Millisecond,
162+
DERPMapFn: func() *tailcfg.DERPMap { return <-derpMapCh },
163+
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) {
164+
eventCh <- batch
165+
},
166+
})
167+
require.NoError(t, err)
168+
169+
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
170+
sws, err := websocket.Accept(w, r, nil)
171+
if !assert.NoError(t, err) {
172+
return
173+
}
174+
ctx, nc := codersdk.WebsocketNetConn(r.Context(), sws, websocket.MessageBinary)
175+
err = svc.ServeConnV2(ctx, nc, tailnet.StreamID{
176+
Name: "client",
177+
ID: clientID,
178+
Auth: tailnet.ClientCoordinateeAuth{AgentID: agentID},
179+
})
180+
assert.NoError(t, err)
181+
}))
182+
183+
fConn := newFakeTailnetConn()
184+
185+
uut := newTailnetAPIConnector(ctx, logger, agentID, svr.URL, &websocket.DialOptions{})
186+
uut.runConnector(fConn)
187+
require.Eventually(t, func() bool {
188+
uut.clientMu.Lock()
189+
defer uut.clientMu.Unlock()
190+
return uut.client != nil
191+
}, testutil.WaitShort, testutil.IntervalFast)
192+
193+
uut.SendTelemetryEvent(&proto.TelemetryEvent{
194+
Id: []byte("test event"),
195+
})
196+
197+
testEvents := testutil.RequireRecvCtx(ctx, t, eventCh)
198+
199+
require.Len(t, testEvents, 1)
200+
require.Equal(t, []byte("test event"), testEvents[0].Id)
201+
}
202+
203+
// Server doesn't support telemetry / server unimplemented telemetry
204+
205+
func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
206+
t.Parallel()
207+
ctx := testutil.Context(t, testutil.WaitShort)
208+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
209+
agentID := uuid.UUID{0x55}
210+
fConn := newFakeTailnetConn()
211+
212+
fakeDRPCClient := newFakeDRPCClient()
213+
uut := &tailnetAPIConnector{
214+
ctx: ctx,
215+
logger: logger,
216+
agentID: agentID,
217+
coordinateURL: "",
218+
dialOptions: &websocket.DialOptions{},
219+
conn: nil,
220+
connected: make(chan error, 1),
221+
closed: make(chan struct{}),
222+
customDialFn: func() (proto.DRPCTailnetClient, error) {
223+
return fakeDRPCClient, nil
224+
},
225+
}
226+
uut.runConnector(fConn)
227+
require.Eventually(t, func() bool {
228+
uut.clientMu.Lock()
229+
defer uut.clientMu.Unlock()
230+
return uut.client != nil
231+
}, testutil.WaitShort, testutil.IntervalFast)
232+
233+
fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
234+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
235+
require.False(t, uut.telemetryUnavailable.Load())
236+
237+
fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
238+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
239+
require.True(t, uut.telemetryUnavailable.Load())
240+
}
241+
242+
func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
243+
t.Parallel()
244+
ctx := testutil.Context(t, testutil.WaitShort)
245+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
246+
agentID := uuid.UUID{0x55}
247+
fConn := newFakeTailnetConn()
248+
249+
fakeDRPCClient := newFakeDRPCClient()
250+
uut := &tailnetAPIConnector{
251+
ctx: ctx,
252+
logger: logger,
253+
agentID: agentID,
254+
coordinateURL: "",
255+
dialOptions: &websocket.DialOptions{},
256+
conn: nil,
257+
connected: make(chan error, 1),
258+
closed: make(chan struct{}),
259+
customDialFn: func() (proto.DRPCTailnetClient, error) {
260+
return fakeDRPCClient, nil
261+
},
262+
}
263+
uut.runConnector(fConn)
264+
require.Eventually(t, func() bool {
265+
uut.clientMu.Lock()
266+
defer uut.clientMu.Unlock()
267+
return uut.client != nil
268+
}, testutil.WaitShort, testutil.IntervalFast)
269+
270+
fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("Protocol Error")
271+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
272+
require.False(t, uut.telemetryUnavailable.Load())
273+
274+
fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
275+
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
276+
require.True(t, uut.telemetryUnavailable.Load())
277+
}
278+
142279
type fakeTailnetConn struct{}
143280

144281
func (*fakeTailnetConn) UpdatePeers([]*proto.CoordinateResponse_PeerUpdate) error {
@@ -157,3 +294,120 @@ func (*fakeTailnetConn) SetTunnelDestination(uuid.UUID) {}
157294
func newFakeTailnetConn() *fakeTailnetConn {
158295
return &fakeTailnetConn{}
159296
}
297+
298+
type fakeDRPCClient struct {
299+
telemeteryErorr error
300+
fakeDRPPCMapStream
301+
}
302+
303+
var _ proto.DRPCTailnetClient = &fakeDRPCClient{}
304+
305+
func newFakeDRPCClient() *fakeDRPCClient {
306+
return &fakeDRPCClient{
307+
fakeDRPPCMapStream: fakeDRPPCMapStream{
308+
fakeDRPCStream: fakeDRPCStream{
309+
ch: make(chan struct{}),
310+
},
311+
},
312+
}
313+
}
314+
315+
// Coordinate implements proto.DRPCTailnetClient.
316+
func (f *fakeDRPCClient) Coordinate(_ context.Context) (proto.DRPCTailnet_CoordinateClient, error) {
317+
return &f.fakeDRPCStream, nil
318+
}
319+
320+
// DRPCConn implements proto.DRPCTailnetClient.
321+
func (*fakeDRPCClient) DRPCConn() drpc.Conn {
322+
return &fakeDRPCConn{}
323+
}
324+
325+
// PostTelemetry implements proto.DRPCTailnetClient.
326+
func (f *fakeDRPCClient) PostTelemetry(_ context.Context, in *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
327+
return nil, f.telemeteryErorr
328+
}
329+
330+
// StreamDERPMaps implements proto.DRPCTailnetClient.
331+
func (f *fakeDRPCClient) StreamDERPMaps(_ context.Context, _ *proto.StreamDERPMapsRequest) (proto.DRPCTailnet_StreamDERPMapsClient, error) {
332+
return &f.fakeDRPPCMapStream, nil
333+
}
334+
335+
type fakeDRPCConn struct{}
336+
337+
var _ drpc.Conn = &fakeDRPCConn{}
338+
339+
// Close implements drpc.Conn.
340+
func (*fakeDRPCConn) Close() error {
341+
return nil
342+
}
343+
344+
// Closed implements drpc.Conn.
345+
func (*fakeDRPCConn) Closed() <-chan struct{} {
346+
return nil
347+
}
348+
349+
// Invoke implements drpc.Conn.
350+
func (*fakeDRPCConn) Invoke(_ context.Context, _ string, _ drpc.Encoding, _ drpc.Message, _ drpc.Message) error {
351+
return nil
352+
}
353+
354+
// NewStream implements drpc.Conn.
355+
func (*fakeDRPCConn) NewStream(_ context.Context, _ string, _ drpc.Encoding) (drpc.Stream, error) {
356+
return nil, nil
357+
}
358+
359+
type fakeDRPCStream struct {
360+
ch chan struct{}
361+
}
362+
363+
var _ proto.DRPCTailnet_CoordinateClient = &fakeDRPCStream{}
364+
365+
// Close implements proto.DRPCTailnet_CoordinateClient.
366+
func (f *fakeDRPCStream) Close() error {
367+
close(f.ch)
368+
return nil
369+
}
370+
371+
// CloseSend implements proto.DRPCTailnet_CoordinateClient.
372+
func (*fakeDRPCStream) CloseSend() error {
373+
return nil
374+
}
375+
376+
// Context implements proto.DRPCTailnet_CoordinateClient.
377+
func (*fakeDRPCStream) Context() context.Context {
378+
return nil
379+
}
380+
381+
// MsgRecv implements proto.DRPCTailnet_CoordinateClient.
382+
func (*fakeDRPCStream) MsgRecv(_ drpc.Message, _ drpc.Encoding) error {
383+
return nil
384+
}
385+
386+
// MsgSend implements proto.DRPCTailnet_CoordinateClient.
387+
func (*fakeDRPCStream) MsgSend(_ drpc.Message, _ drpc.Encoding) error {
388+
return nil
389+
}
390+
391+
// Recv implements proto.DRPCTailnet_CoordinateClient.
392+
func (f *fakeDRPCStream) Recv() (*proto.CoordinateResponse, error) {
393+
<-f.ch
394+
return &proto.CoordinateResponse{}, nil
395+
}
396+
397+
// Send implements proto.DRPCTailnet_CoordinateClient.
398+
func (f *fakeDRPCStream) Send(*proto.CoordinateRequest) error {
399+
<-f.ch
400+
return nil
401+
}
402+
403+
type fakeDRPPCMapStream struct {
404+
fakeDRPCStream
405+
}
406+
407+
var _ proto.DRPCTailnet_StreamDERPMapsClient = &fakeDRPPCMapStream{}
408+
409+
// Recv implements proto.DRPCTailnet_StreamDERPMapsClient.
410+
func (f *fakeDRPPCMapStream) Recv() (*proto.DERPMap, error) {
411+
<-f.fakeDRPCStream.ch
412+
return &proto.DERPMap{}, nil
413+
}

tailnet/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func NewConn(options *Options) (conn *Conn, err error) {
144144
var err error
145145
telemetryStore, err = newTelemetryStore()
146146
if err != nil {
147-
return nil, xerrors.Errorf("create telemetry log sink: %w", err)
147+
return nil, xerrors.Errorf("create telemetry log store: %w", err)
148148
}
149149
logger = logger.appendLogger(slog.Make(telemetryStore).Leveled(slog.LevelDebug))
150150
}

tailnet/telemetry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ func (m multiLogger) With(fields ...slog.Field) multiLogger {
8686
return multiLogger{loggers: loggers}
8787
}
8888

89-
// A logger sink that extracts (anonymized) IP addresses from logs for building
90-
// network telemetry events
89+
// Responsible for storing and anonymizing networking telemetry state.
90+
// Implements slog.Sink and io.Writer to store logs from `tailscale`.
9191
type TelemetryStore struct {
9292
// Always self-referential
9393
sink slog.Sink
File renamed without changes.

0 commit comments

Comments
 (0)