Skip to content

Commit 112075d

Browse files
committed
chore: refactor sending telemetry
1 parent cfb06e9 commit 112075d

File tree

4 files changed

+254
-186
lines changed

4 files changed

+254
-186
lines changed

codersdk/workspacesdk/connector.go

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,12 @@ import (
88
"net/http"
99
"net/url"
1010
"slices"
11-
"strings"
1211
"sync"
13-
"sync/atomic"
1412
"time"
1513

1614
"github.com/google/uuid"
1715
"golang.org/x/xerrors"
1816
"nhooyr.io/websocket"
19-
"storj.io/drpc"
20-
"storj.io/drpc/drpcerr"
2117

2218
"cdr.dev/slog"
2319
"github.com/coder/coder/v2/buildinfo"
@@ -66,19 +62,12 @@ type tailnetAPIConnector struct {
6662
dialOptions *websocket.DialOptions
6763
derpCtrl tailnet.DERPController
6864
coordCtrl tailnet.CoordinationController
69-
customDialFn func() (proto.DRPCTailnetClient, error)
70-
71-
clientMu sync.RWMutex
72-
client proto.DRPCTailnetClient
65+
telCtrl *tailnet.BasicTelemetryController
7366

7467
connected chan error
7568
resumeToken *proto.RefreshResumeTokenResponse
7669
isFirst bool
7770
closed chan struct{}
78-
79-
// Only set to true if we get a response from the server that it doesn't support
80-
// network telemetry.
81-
telemetryUnavailable atomic.Bool
8271
}
8372

8473
// Create a new tailnetAPIConnector without running it
@@ -92,6 +81,7 @@ func newTailnetAPIConnector(ctx context.Context, logger slog.Logger, agentID uui
9281
dialOptions: dialOptions,
9382
connected: make(chan error, 1),
9483
closed: make(chan struct{}),
84+
telCtrl: tailnet.NewBasicTelemetryController(logger),
9585
}
9686
}
9787

@@ -124,9 +114,6 @@ func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
124114
if err != nil {
125115
continue
126116
}
127-
tac.clientMu.Lock()
128-
tac.client = tailnetClient
129-
tac.clientMu.Unlock()
130117
tac.logger.Debug(tac.ctx, "obtained tailnet API v2+ client")
131118
tac.runConnectorOnce(tailnetClient)
132119
tac.logger.Debug(tac.ctx, "tailnet API v2+ connection lost")
@@ -141,9 +128,6 @@ var permanentErrorStatuses = []int{
141128
}
142129

143130
func (tac *tailnetAPIConnector) dial() (proto.DRPCTailnetClient, error) {
144-
if tac.customDialFn != nil {
145-
return tac.customDialFn()
146-
}
147131
tac.logger.Debug(tac.ctx, "dialing Coder tailnet v2+ API")
148132

149133
u, err := url.Parse(tac.coordinateURL)
@@ -228,6 +212,8 @@ func (tac *tailnetAPIConnector) runConnectorOnce(client proto.DRPCTailnetClient)
228212
}
229213
}()
230214

215+
tac.telCtrl.New(client) // synchronous, doesn't need a goroutine
216+
231217
refreshTokenCtx, refreshTokenCancel := context.WithCancel(tac.ctx)
232218
wg := sync.WaitGroup{}
233219
wg.Add(3)
@@ -245,10 +231,7 @@ func (tac *tailnetAPIConnector) runConnectorOnce(client proto.DRPCTailnetClient)
245231
// we do NOT want to gracefully disconnect on the coordinate() routine. So, we'll just
246232
// close the underlying connection. This will trigger a retry of the control plane in
247233
// run().
248-
tac.clientMu.Lock()
249234
client.DRPCConn().Close()
250-
tac.client = nil
251-
tac.clientMu.Unlock()
252235
// Note that derpMap() logs it own errors, we don't bother here.
253236
}
254237
}()
@@ -351,20 +334,5 @@ func (tac *tailnetAPIConnector) refreshToken(ctx context.Context, client proto.D
351334
}
352335

353336
func (tac *tailnetAPIConnector) SendTelemetryEvent(event *proto.TelemetryEvent) {
354-
tac.clientMu.RLock()
355-
// We hold the lock for the entire telemetry request, but this would only block
356-
// a coordinate retry, and closing the connection.
357-
defer tac.clientMu.RUnlock()
358-
if tac.client == nil || tac.telemetryUnavailable.Load() {
359-
return
360-
}
361-
ctx, cancel := context.WithTimeout(tac.ctx, 5*time.Second)
362-
defer cancel()
363-
_, err := tac.client.PostTelemetry(ctx, &proto.TelemetryRequest{
364-
Events: []*proto.TelemetryEvent{event},
365-
})
366-
if drpcerr.Code(err) == drpcerr.Unimplemented || drpc.ProtocolError.Has(err) && strings.Contains(err.Error(), "unknown rpc: ") {
367-
tac.logger.Debug(tac.ctx, "attempted to send telemetry to a server that doesn't support it", slog.Error(err))
368-
tac.telemetryUnavailable.Store(true)
369-
}
337+
tac.telCtrl.SendTelemetryEvent(event)
370338
}

codersdk/workspacesdk/connector_internal_test.go

Lines changed: 10 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,8 @@ import (
1313
"github.com/hashicorp/yamux"
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
16-
"golang.org/x/xerrors"
17-
"google.golang.org/protobuf/types/known/durationpb"
18-
"google.golang.org/protobuf/types/known/timestamppb"
1916
"nhooyr.io/websocket"
2017
"storj.io/drpc"
21-
"storj.io/drpc/drpcerr"
2218
"tailscale.com/tailcfg"
2319

2420
"cdr.dev/slog"
@@ -385,7 +381,12 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
385381
DERPMapUpdateFrequency: time.Millisecond,
386382
DERPMapFn: func() *tailcfg.DERPMap { return <-derpMapCh },
387383
NetworkTelemetryHandler: func(batch []*proto.TelemetryEvent) {
388-
testutil.RequireSendCtx(ctx, t, eventCh, batch)
384+
select {
385+
case <-ctx.Done():
386+
t.Error("timeout sending telemetry event")
387+
case eventCh <- batch:
388+
t.Log("sent telemetry batch")
389+
}
389390
},
390391
ResumeTokenProvider: tailnet.NewInsecureTestResumeTokenProvider(),
391392
})
@@ -409,11 +410,10 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
409410

410411
uut := newTailnetAPIConnector(ctx, logger, agentID, svr.URL, quartz.NewReal(), &websocket.DialOptions{})
411412
uut.runConnector(fConn)
412-
require.Eventually(t, func() bool {
413-
uut.clientMu.Lock()
414-
defer uut.clientMu.Unlock()
415-
return uut.client != nil
416-
}, testutil.WaitShort, testutil.IntervalFast)
413+
// Coordinate calls happen _after_ telemetry is connected up, so we use this
414+
// to ensure telemetry is connected before sending our event
415+
cc := testutil.RequireRecvCtx(ctx, t, fCoord.CoordinateCalls)
416+
defer close(cc.Resps)
417417

418418
uut.SendTelemetryEvent(&proto.TelemetryEvent{
419419
Id: []byte("test event"),
@@ -425,86 +425,6 @@ func TestTailnetAPIConnector_TelemetrySuccess(t *testing.T) {
425425
require.Equal(t, []byte("test event"), testEvents[0].Id)
426426
}
427427

428-
func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
429-
t.Parallel()
430-
ctx := testutil.Context(t, testutil.WaitShort)
431-
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
432-
agentID := uuid.UUID{0x55}
433-
fConn := newFakeTailnetConn()
434-
435-
fakeDRPCClient := newFakeDRPCClient()
436-
uut := &tailnetAPIConnector{
437-
ctx: ctx,
438-
logger: logger,
439-
agentID: agentID,
440-
coordinateURL: "",
441-
clock: quartz.NewReal(),
442-
dialOptions: &websocket.DialOptions{},
443-
connected: make(chan error, 1),
444-
closed: make(chan struct{}),
445-
customDialFn: func() (proto.DRPCTailnetClient, error) {
446-
return fakeDRPCClient, nil
447-
},
448-
}
449-
uut.runConnector(fConn)
450-
require.Eventually(t, func() bool {
451-
uut.clientMu.Lock()
452-
defer uut.clientMu.Unlock()
453-
return uut.client != nil
454-
}, testutil.WaitShort, testutil.IntervalFast)
455-
456-
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
457-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
458-
require.False(t, uut.telemetryUnavailable.Load())
459-
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
460-
461-
fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
462-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
463-
require.True(t, uut.telemetryUnavailable.Load())
464-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
465-
require.Equal(t, int64(2), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
466-
}
467-
468-
func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
469-
t.Parallel()
470-
ctx := testutil.Context(t, testutil.WaitShort)
471-
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
472-
agentID := uuid.UUID{0x55}
473-
fConn := newFakeTailnetConn()
474-
475-
fakeDRPCClient := newFakeDRPCClient()
476-
uut := &tailnetAPIConnector{
477-
ctx: ctx,
478-
logger: logger,
479-
agentID: agentID,
480-
coordinateURL: "",
481-
clock: quartz.NewReal(),
482-
dialOptions: &websocket.DialOptions{},
483-
connected: make(chan error, 1),
484-
closed: make(chan struct{}),
485-
customDialFn: func() (proto.DRPCTailnetClient, error) {
486-
return fakeDRPCClient, nil
487-
},
488-
}
489-
uut.runConnector(fConn)
490-
require.Eventually(t, func() bool {
491-
uut.clientMu.Lock()
492-
defer uut.clientMu.Unlock()
493-
return uut.client != nil
494-
}, testutil.WaitShort, testutil.IntervalFast)
495-
496-
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("Protocol Error")
497-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
498-
require.False(t, uut.telemetryUnavailable.Load())
499-
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
500-
501-
fakeDRPCClient.telemetryError = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
502-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
503-
require.True(t, uut.telemetryUnavailable.Load())
504-
uut.SendTelemetryEvent(&proto.TelemetryEvent{})
505-
require.Equal(t, int64(2), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
506-
}
507-
508428
type fakeTailnetConn struct{}
509429

510430
func (*fakeTailnetConn) UpdatePeers([]*proto.CoordinateResponse_PeerUpdate) error {
@@ -524,65 +444,6 @@ func newFakeTailnetConn() *fakeTailnetConn {
524444
return &fakeTailnetConn{}
525445
}
526446

527-
type fakeDRPCClient struct {
528-
postTelemetryCalls int64
529-
refreshTokenFn func(context.Context, *proto.RefreshResumeTokenRequest) (*proto.RefreshResumeTokenResponse, error)
530-
telemetryError error
531-
fakeDRPPCMapStream
532-
}
533-
534-
var _ proto.DRPCTailnetClient = &fakeDRPCClient{}
535-
536-
func newFakeDRPCClient() *fakeDRPCClient {
537-
return &fakeDRPCClient{
538-
postTelemetryCalls: 0,
539-
fakeDRPPCMapStream: fakeDRPPCMapStream{
540-
fakeDRPCStream: fakeDRPCStream{
541-
ch: make(chan struct{}),
542-
},
543-
},
544-
}
545-
}
546-
547-
// Coordinate implements proto.DRPCTailnetClient.
548-
func (f *fakeDRPCClient) Coordinate(_ context.Context) (proto.DRPCTailnet_CoordinateClient, error) {
549-
return &f.fakeDRPCStream, nil
550-
}
551-
552-
// DRPCConn implements proto.DRPCTailnetClient.
553-
func (*fakeDRPCClient) DRPCConn() drpc.Conn {
554-
return &fakeDRPCConn{}
555-
}
556-
557-
// PostTelemetry implements proto.DRPCTailnetClient.
558-
func (f *fakeDRPCClient) PostTelemetry(_ context.Context, _ *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
559-
atomic.AddInt64(&f.postTelemetryCalls, 1)
560-
return nil, f.telemetryError
561-
}
562-
563-
// StreamDERPMaps implements proto.DRPCTailnetClient.
564-
func (f *fakeDRPCClient) StreamDERPMaps(_ context.Context, _ *proto.StreamDERPMapsRequest) (proto.DRPCTailnet_StreamDERPMapsClient, error) {
565-
return &f.fakeDRPPCMapStream, nil
566-
}
567-
568-
// RefreshResumeToken implements proto.DRPCTailnetClient.
569-
func (f *fakeDRPCClient) RefreshResumeToken(_ context.Context, _ *proto.RefreshResumeTokenRequest) (*proto.RefreshResumeTokenResponse, error) {
570-
if f.refreshTokenFn != nil {
571-
return f.refreshTokenFn(context.Background(), nil)
572-
}
573-
574-
return &proto.RefreshResumeTokenResponse{
575-
Token: "test",
576-
RefreshIn: durationpb.New(30 * time.Minute),
577-
ExpiresAt: timestamppb.New(time.Now().Add(time.Hour)),
578-
}, nil
579-
}
580-
581-
// WorkspaceUpdates implements proto.DRPCTailnetClient.
582-
func (*fakeDRPCClient) WorkspaceUpdates(context.Context, *proto.WorkspaceUpdatesRequest) (proto.DRPCTailnet_WorkspaceUpdatesClient, error) {
583-
panic("unimplemented")
584-
}
585-
586447
type fakeDRPCConn struct{}
587448

588449
var _ drpc.Conn = &fakeDRPCConn{}

0 commit comments

Comments
 (0)