@@ -18,7 +18,6 @@ import (
18
18
"nhooyr.io/websocket"
19
19
"storj.io/drpc"
20
20
"storj.io/drpc/drpcerr"
21
- "tailscale.com/tailcfg"
22
21
23
22
"cdr.dev/slog"
24
23
"github.com/coder/coder/v2/buildinfo"
@@ -37,7 +36,7 @@ var tailnetConnectorGracefulTimeout = time.Second
37
36
// @typescript-ignore tailnetConn
38
37
type tailnetConn interface {
39
38
tailnet.Coordinatee
40
- SetDERPMap ( derpMap * tailcfg. DERPMap )
39
+ tailnet. DERPMapSetter
41
40
}
42
41
43
42
// tailnetAPIConnector dials the tailnet API (v2+) and then uses the API with a tailnet.Conn to
@@ -65,7 +64,7 @@ type tailnetAPIConnector struct {
65
64
coordinateURL string
66
65
clock quartz.Clock
67
66
dialOptions * websocket.DialOptions
68
- conn tailnetConn
67
+ derpCtrl tailnet. DERPController
69
68
coordCtrl tailnet.CoordinationController
70
69
customDialFn func () (proto.DRPCTailnetClient , error )
71
70
@@ -91,7 +90,6 @@ func newTailnetAPIConnector(ctx context.Context, logger slog.Logger, agentID uui
91
90
coordinateURL : coordinateURL ,
92
91
clock : clock ,
93
92
dialOptions : dialOptions ,
94
- conn : nil ,
95
93
connected : make (chan error , 1 ),
96
94
closed : make (chan struct {}),
97
95
}
@@ -112,7 +110,7 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
112
110
113
111
// Runs a tailnetAPIConnector using the provided connection
114
112
func (tac * tailnetAPIConnector ) runConnector (conn tailnetConn ) {
115
- tac .conn = conn
113
+ tac .derpCtrl = tailnet . NewBasicDERPController ( tac . logger , conn )
116
114
tac .coordCtrl = tailnet .NewSingleDestController (tac .logger , conn , tac .agentID )
117
115
tac .gracefulCtx , tac .cancelGracefulCtx = context .WithCancel (context .Background ())
118
116
go tac .manageGracefulTimeout ()
@@ -294,7 +292,9 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
294
292
}
295
293
296
294
func (tac * tailnetAPIConnector ) derpMap (client proto.DRPCTailnetClient ) error {
297
- s , err := client .StreamDERPMaps (tac .ctx , & proto.StreamDERPMapsRequest {})
295
+ s := & tailnet.DERPFromDRPCWrapper {}
296
+ var err error
297
+ s .Client , err = client .StreamDERPMaps (tac .ctx , & proto.StreamDERPMapsRequest {})
298
298
if err != nil {
299
299
return xerrors .Errorf ("failed to connect to StreamDERPMaps RPC: %w" , err )
300
300
}
@@ -304,21 +304,15 @@ func (tac *tailnetAPIConnector) derpMap(client proto.DRPCTailnetClient) error {
304
304
tac .logger .Debug (tac .ctx , "error closing StreamDERPMaps RPC" , slog .Error (cErr ))
305
305
}
306
306
}()
307
- for {
308
- dmp , err := s .Recv ()
309
- if err != nil {
310
- if xerrors .Is (err , context .Canceled ) || xerrors .Is (err , context .DeadlineExceeded ) {
311
- return nil
312
- }
313
- if ! xerrors .Is (err , io .EOF ) {
314
- tac .logger .Error (tac .ctx , "error receiving DERP Map" , slog .Error (err ))
315
- }
316
- return err
317
- }
318
- tac .logger .Debug (tac .ctx , "got new DERP Map" , slog .F ("derp_map" , dmp ))
319
- dm := tailnet .DERPMapFromProto (dmp )
320
- tac .conn .SetDERPMap (dm )
307
+ cw := tac .derpCtrl .New (s )
308
+ err = <- cw .Wait ()
309
+ if xerrors .Is (err , context .Canceled ) || xerrors .Is (err , context .DeadlineExceeded ) {
310
+ return nil
311
+ }
312
+ if err != nil && ! xerrors .Is (err , io .EOF ) {
313
+ tac .logger .Error (tac .ctx , "error receiving DERP Map" , slog .Error (err ))
321
314
}
315
+ return err
322
316
}
323
317
324
318
func (tac * tailnetAPIConnector ) refreshToken (ctx context.Context , client proto.DRPCTailnetClient ) {
0 commit comments