Skip to content

Commit cfb06e9

Browse files
committed
chore: refactor DERP setting loop
1 parent 886dcbe commit cfb06e9

File tree

5 files changed

+183
-22
lines changed

5 files changed

+183
-22
lines changed

codersdk/workspacesdk/connector.go

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"nhooyr.io/websocket"
1919
"storj.io/drpc"
2020
"storj.io/drpc/drpcerr"
21-
"tailscale.com/tailcfg"
2221

2322
"cdr.dev/slog"
2423
"github.com/coder/coder/v2/buildinfo"
@@ -37,7 +36,7 @@ var tailnetConnectorGracefulTimeout = time.Second
3736
// @typescript-ignore tailnetConn
3837
type tailnetConn interface {
3938
tailnet.Coordinatee
40-
SetDERPMap(derpMap *tailcfg.DERPMap)
39+
tailnet.DERPMapSetter
4140
}
4241

4342
// tailnetAPIConnector dials the tailnet API (v2+) and then uses the API with a tailnet.Conn to
@@ -65,7 +64,7 @@ type tailnetAPIConnector struct {
6564
coordinateURL string
6665
clock quartz.Clock
6766
dialOptions *websocket.DialOptions
68-
conn tailnetConn
67+
derpCtrl tailnet.DERPController
6968
coordCtrl tailnet.CoordinationController
7069
customDialFn func() (proto.DRPCTailnetClient, error)
7170

@@ -91,7 +90,6 @@ func newTailnetAPIConnector(ctx context.Context, logger slog.Logger, agentID uui
9190
coordinateURL: coordinateURL,
9291
clock: clock,
9392
dialOptions: dialOptions,
94-
conn: nil,
9593
connected: make(chan error, 1),
9694
closed: make(chan struct{}),
9795
}
@@ -112,7 +110,7 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
112110

113111
// Runs a tailnetAPIConnector using the provided connection
114112
func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
115-
tac.conn = conn
113+
tac.derpCtrl = tailnet.NewBasicDERPController(tac.logger, conn)
116114
tac.coordCtrl = tailnet.NewSingleDestController(tac.logger, conn, tac.agentID)
117115
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
118116
go tac.manageGracefulTimeout()
@@ -294,7 +292,9 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
294292
}
295293

296294
func (tac *tailnetAPIConnector) derpMap(client proto.DRPCTailnetClient) error {
297-
s, err := client.StreamDERPMaps(tac.ctx, &proto.StreamDERPMapsRequest{})
295+
s := &tailnet.DERPFromDRPCWrapper{}
296+
var err error
297+
s.Client, err = client.StreamDERPMaps(tac.ctx, &proto.StreamDERPMapsRequest{})
298298
if err != nil {
299299
return xerrors.Errorf("failed to connect to StreamDERPMaps RPC: %w", err)
300300
}
@@ -304,21 +304,15 @@ func (tac *tailnetAPIConnector) derpMap(client proto.DRPCTailnetClient) error {
304304
tac.logger.Debug(tac.ctx, "error closing StreamDERPMaps RPC", slog.Error(cErr))
305305
}
306306
}()
307-
for {
308-
dmp, err := s.Recv()
309-
if err != nil {
310-
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
311-
return nil
312-
}
313-
if !xerrors.Is(err, io.EOF) {
314-
tac.logger.Error(tac.ctx, "error receiving DERP Map", slog.Error(err))
315-
}
316-
return err
317-
}
318-
tac.logger.Debug(tac.ctx, "got new DERP Map", slog.F("derp_map", dmp))
319-
dm := tailnet.DERPMapFromProto(dmp)
320-
tac.conn.SetDERPMap(dm)
307+
cw := tac.derpCtrl.New(s)
308+
err = <-cw.Wait()
309+
if xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
310+
return nil
311+
}
312+
if err != nil && !xerrors.Is(err, io.EOF) {
313+
tac.logger.Error(tac.ctx, "error receiving DERP Map", slog.Error(err))
321314
}
315+
return err
322316
}
323317

324318
func (tac *tailnetAPIConnector) refreshToken(ctx context.Context, client proto.DRPCTailnetClient) {

codersdk/workspacesdk/connector_internal_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,6 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
440440
coordinateURL: "",
441441
clock: quartz.NewReal(),
442442
dialOptions: &websocket.DialOptions{},
443-
conn: nil,
444443
connected: make(chan error, 1),
445444
closed: make(chan struct{}),
446445
customDialFn: func() (proto.DRPCTailnetClient, error) {
@@ -481,7 +480,6 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
481480
coordinateURL: "",
482481
clock: quartz.NewReal(),
483482
dialOptions: &websocket.DialOptions{},
484-
conn: nil,
485483
connected: make(chan error, 1),
486484
closed: make(chan struct{}),
487485
customDialFn: func() (proto.DRPCTailnetClient, error) {

tailnet/controllers.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,84 @@ func NewInMemoryCoordinatorClient(
359359
)
360360
return c
361361
}
362+
363+
type DERPMapSetter interface {
364+
SetDERPMap(derpMap *tailcfg.DERPMap)
365+
}
366+
367+
type basicDERPController struct {
368+
logger slog.Logger
369+
setter DERPMapSetter
370+
}
371+
372+
func (b *basicDERPController) New(client DERPClient) CloserWaiter {
373+
l := &derpSetLoop{
374+
logger: b.logger,
375+
setter: b.setter,
376+
client: client,
377+
errChan: make(chan error, 1),
378+
recvLoopDone: make(chan struct{}),
379+
}
380+
go l.recvLoop()
381+
return l
382+
}
383+
384+
func NewBasicDERPController(logger slog.Logger, setter DERPMapSetter) DERPController {
385+
return &basicDERPController{
386+
logger: logger,
387+
setter: setter,
388+
}
389+
}
390+
391+
type derpSetLoop struct {
392+
logger slog.Logger
393+
setter DERPMapSetter
394+
client DERPClient
395+
396+
sync.Mutex
397+
closed bool
398+
errChan chan error
399+
recvLoopDone chan struct{}
400+
}
401+
402+
func (l *derpSetLoop) Close(ctx context.Context) error {
403+
l.Lock()
404+
defer l.Unlock()
405+
if l.closed {
406+
select {
407+
case <-ctx.Done():
408+
return ctx.Err()
409+
case <-l.recvLoopDone:
410+
return nil
411+
}
412+
}
413+
l.closed = true
414+
cErr := l.client.Close()
415+
select {
416+
case <-ctx.Done():
417+
return ctx.Err()
418+
case <-l.recvLoopDone:
419+
return cErr
420+
}
421+
}
422+
423+
func (l *derpSetLoop) Wait() <-chan error {
424+
return l.errChan
425+
}
426+
427+
func (l *derpSetLoop) recvLoop() {
428+
defer close(l.recvLoopDone)
429+
for {
430+
dm, err := l.client.Recv()
431+
if err != nil {
432+
l.logger.Debug(context.Background(), "failed to receive DERP message", slog.Error(err))
433+
select {
434+
case l.errChan <- err:
435+
default:
436+
}
437+
return
438+
}
439+
l.logger.Debug(context.Background(), "got new DERP Map", slog.F("derp_map", dm))
440+
l.setter.SetDERPMap(dm)
441+
}
442+
}

tailnet/controllers_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/google/uuid"
1313
"github.com/stretchr/testify/require"
1414
"go.uber.org/mock/gomock"
15+
"golang.org/x/xerrors"
1516
"tailscale.com/tailcfg"
1617
"tailscale.com/types/key"
1718

@@ -281,3 +282,72 @@ func (f *fakeCoordinatee) SetNodeCallback(callback func(*tailnet.Node)) {
281282
defer f.Unlock()
282283
f.callback = callback
283284
}
285+
286+
func TestNewBasicDERPController_Mainline(t *testing.T) {
287+
t.Parallel()
288+
fs := make(chan *tailcfg.DERPMap)
289+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
290+
uut := tailnet.NewBasicDERPController(logger, fakeSetter(fs))
291+
fc := fakeDERPClient{
292+
ch: make(chan *tailcfg.DERPMap),
293+
}
294+
c := uut.New(fc)
295+
ctx := testutil.Context(t, testutil.WaitShort)
296+
expectDM := &tailcfg.DERPMap{}
297+
testutil.RequireSendCtx(ctx, t, fc.ch, expectDM)
298+
gotDM := testutil.RequireRecvCtx(ctx, t, fs)
299+
require.Equal(t, expectDM, gotDM)
300+
err := c.Close(ctx)
301+
require.NoError(t, err)
302+
err = testutil.RequireRecvCtx(ctx, t, c.Wait())
303+
require.ErrorIs(t, err, io.EOF)
304+
// ensure Close is idempotent
305+
err = c.Close(ctx)
306+
require.NoError(t, err)
307+
}
308+
309+
func TestNewBasicDERPController_RecvErr(t *testing.T) {
310+
t.Parallel()
311+
fs := make(chan *tailcfg.DERPMap)
312+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
313+
uut := tailnet.NewBasicDERPController(logger, fakeSetter(fs))
314+
expectedErr := xerrors.New("a bad thing happened")
315+
fc := fakeDERPClient{
316+
ch: make(chan *tailcfg.DERPMap),
317+
err: expectedErr,
318+
}
319+
c := uut.New(fc)
320+
ctx := testutil.Context(t, testutil.WaitShort)
321+
err := testutil.RequireRecvCtx(ctx, t, c.Wait())
322+
require.ErrorIs(t, err, expectedErr)
323+
// ensure Close is idempotent
324+
err = c.Close(ctx)
325+
require.NoError(t, err)
326+
}
327+
328+
type fakeSetter chan *tailcfg.DERPMap
329+
330+
func (s fakeSetter) SetDERPMap(derpMap *tailcfg.DERPMap) {
331+
s <- derpMap
332+
}
333+
334+
type fakeDERPClient struct {
335+
ch chan *tailcfg.DERPMap
336+
err error
337+
}
338+
339+
func (f fakeDERPClient) Close() error {
340+
close(f.ch)
341+
return nil
342+
}
343+
344+
func (f fakeDERPClient) Recv() (*tailcfg.DERPMap, error) {
345+
if f.err != nil {
346+
return nil, f.err
347+
}
348+
dm, ok := <-f.ch
349+
if ok {
350+
return dm, nil
351+
}
352+
return nil, io.EOF
353+
}

tailnet/convert.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,21 @@ func WorkspaceStatusToProto(status codersdk.WorkspaceStatus) proto.Workspace_Sta
298298
return proto.Workspace_UNKNOWN
299299
}
300300
}
301+
302+
type DERPFromDRPCWrapper struct {
303+
Client proto.DRPCTailnet_StreamDERPMapsClient
304+
}
305+
306+
func (w *DERPFromDRPCWrapper) Close() error {
307+
return w.Client.Close()
308+
}
309+
310+
func (w *DERPFromDRPCWrapper) Recv() (*tailcfg.DERPMap, error) {
311+
p, err := w.Client.Recv()
312+
if err != nil {
313+
return nil, err
314+
}
315+
return DERPMapFromProto(p), nil
316+
}
317+
318+
var _ DERPClient = &DERPFromDRPCWrapper{}

0 commit comments

Comments
 (0)