Skip to content

Commit 5a7c1aa

Browse files
committed
feat: agent uses Tailnet v2 API for DERPMap updates
1 parent c91b1c0 commit 5a7c1aa

File tree

8 files changed

+106
-196
lines changed

8 files changed

+106
-196
lines changed

agent/agent.go

+34-37
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Options struct {
8989
type Client interface {
9090
Manifest(ctx context.Context) (agentsdk.Manifest, error)
9191
Listen(ctx context.Context) (drpc.Conn, error)
92-
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
9392
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
9493
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
9594
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
@@ -822,10 +821,22 @@ func (a *agent) run(ctx context.Context) error {
822821
network.SetBlockEndpoints(manifest.DisableDirectConnections)
823822
}
824823

824+
// Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
825+
conn, err := a.client.Listen(ctx)
826+
if err != nil {
827+
return err
828+
}
829+
defer func() {
830+
cErr := conn.Close()
831+
if cErr != nil {
832+
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
833+
}
834+
}()
835+
825836
eg, egCtx := errgroup.WithContext(ctx)
826837
eg.Go(func() error {
827838
a.logger.Debug(egCtx, "running tailnet connection coordinator")
828-
err := a.runCoordinator(egCtx, network)
839+
err := a.runCoordinator(egCtx, conn, network)
829840
if err != nil {
830841
return xerrors.Errorf("run coordinator: %w", err)
831842
}
@@ -834,7 +845,7 @@ func (a *agent) run(ctx context.Context) error {
834845

835846
eg.Go(func() error {
836847
a.logger.Debug(egCtx, "running derp map subscriber")
837-
err := a.runDERPMapSubscriber(egCtx, network)
848+
err := a.runDERPMapSubscriber(egCtx, conn, network)
838849
if err != nil {
839850
return xerrors.Errorf("run derp map subscriber: %w", err)
840851
}
@@ -1056,21 +1067,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
10561067

10571068
// runCoordinator runs a coordinator and returns whether a reconnect
10581069
// should occur.
1059-
func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error {
1060-
ctx, cancel := context.WithCancel(ctx)
1061-
defer cancel()
1062-
1063-
conn, err := a.client.Listen(ctx)
1064-
if err != nil {
1065-
return err
1066-
}
1067-
defer func() {
1068-
cErr := conn.Close()
1069-
if cErr != nil {
1070-
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
1071-
}
1072-
}()
1073-
1070+
func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
1071+
defer a.logger.Debug(ctx, "disconnected from coordination RPC")
10741072
tClient := tailnetproto.NewDRPCTailnetClient(conn)
10751073
coordinate, err := tClient.Coordinate(ctx)
10761074
if err != nil {
@@ -1082,7 +1080,7 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10821080
a.logger.Debug(ctx, "error closing Coordinate client", slog.Error(err))
10831081
}
10841082
}()
1085-
a.logger.Info(ctx, "connected to coordination endpoint")
1083+
a.logger.Info(ctx, "connected to coordination RPC")
10861084
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
10871085
select {
10881086
case <-ctx.Done():
@@ -1093,30 +1091,29 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10931091
}
10941092

10951093
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1096-
func (a *agent) runDERPMapSubscriber(ctx context.Context, network *tailnet.Conn) error {
1094+
func (a *agent) runDERPMapSubscriber(ctx context.Context, conn drpc.Conn, network *tailnet.Conn) error {
1095+
defer a.logger.Debug(ctx, "disconnected from derp map RPC")
10971096
ctx, cancel := context.WithCancel(ctx)
10981097
defer cancel()
1099-
1100-
updates, closer, err := a.client.DERPMapUpdates(ctx)
1098+
tClient := tailnetproto.NewDRPCTailnetClient(conn)
1099+
stream, err := tClient.StreamDERPMaps(ctx, &tailnetproto.StreamDERPMapsRequest{})
11011100
if err != nil {
1102-
return err
1101+
return xerrors.Errorf("stream DERP Maps: %w", err)
11031102
}
1104-
defer closer.Close()
1105-
1106-
a.logger.Info(ctx, "connected to derp map endpoint")
1103+
defer func() {
1104+
cErr := stream.Close()
1105+
if cErr != nil {
1106+
a.logger.Debug(ctx, "error closing DERPMap stream", slog.Error(err))
1107+
}
1108+
}()
1109+
a.logger.Info(ctx, "connected to derp map RPC")
11071110
for {
1108-
select {
1109-
case <-ctx.Done():
1110-
return ctx.Err()
1111-
case update := <-updates:
1112-
if update.Err != nil {
1113-
return update.Err
1114-
}
1115-
if update.DERPMap != nil && !tailnet.CompareDERPMaps(network.DERPMap(), update.DERPMap) {
1116-
a.logger.Info(ctx, "updating derp map due to detected changes")
1117-
network.SetDERPMap(update.DERPMap)
1118-
}
1111+
dmp, err := stream.Recv()
1112+
if err != nil {
1113+
return xerrors.Errorf("recv DERPMap error: %w", err)
11191114
}
1115+
dm := tailnet.DERPMapFromProto(dmp)
1116+
network.SetDERPMap(dm)
11201117
}
11211118
}
11221119

agent/agent_test.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,7 @@ func TestAgent_Lifecycle(t *testing.T) {
13491349
make(chan *agentsdk.Stats, 50),
13501350
tailnet.NewCoordinator(logger),
13511351
)
1352+
defer client.Close()
13521353

13531354
fs := afero.NewMemMapFs()
13541355
agent := agent.New(agent.Options{
@@ -1683,13 +1684,18 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16831684
statsCh,
16841685
coordinator,
16851686
)
1687+
t.Cleanup(func() {
1688+
t.Log("closing client")
1689+
client.Close()
1690+
})
16861691
uut := agent.New(agent.Options{
16871692
Client: client,
16881693
Filesystem: fs,
16891694
Logger: logger.Named("agent"),
16901695
ReconnectingPTYTimeout: time.Minute,
16911696
})
16921697
t.Cleanup(func() {
1698+
t.Log("closing agent")
16931699
_ = uut.Close()
16941700
})
16951701

@@ -1718,6 +1724,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17181724
if err != nil {
17191725
t.Logf("error closing in-memory coordination: %s", err.Error())
17201726
}
1727+
t.Logf("closed coordination %s", name)
17211728
})
17221729
// Force DERP.
17231730
conn.SetBlockEndpoints(true)
@@ -1753,11 +1760,9 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17531760
}
17541761

17551762
// Push a new DERP map to the agent.
1756-
err := client.PushDERPMapUpdate(agentsdk.DERPMapUpdate{
1757-
DERPMap: newDerpMap,
1758-
})
1763+
err := client.PushDERPMapUpdate(newDerpMap)
17591764
require.NoError(t, err)
1760-
t.Logf("client Pushed DERPMap update")
1765+
t.Logf("pushed DERPMap update to agent")
17611766

17621767
require.Eventually(t, func() bool {
17631768
conn := uut.TailnetConn()
@@ -1826,6 +1831,7 @@ func TestAgent_Reconnect(t *testing.T) {
18261831
statsCh,
18271832
coordinator,
18281833
)
1834+
defer client.Close()
18291835
initialized := atomic.Int32{}
18301836
closer := agent.New(agent.Options{
18311837
ExchangeToken: func(ctx context.Context) (string, error) {
@@ -1862,6 +1868,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
18621868
make(chan *agentsdk.Stats, 50),
18631869
coordinator,
18641870
)
1871+
defer client.Close()
18651872
filesystem := afero.NewMemMapFs()
18661873
closer := agent.New(agent.Options{
18671874
ExchangeToken: func(ctx context.Context) (string, error) {
@@ -2039,6 +2046,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20392046
statsCh := make(chan *agentsdk.Stats, 50)
20402047
fs := afero.NewMemMapFs()
20412048
c := agenttest.NewClient(t, logger.Named("agent"), metadata.AgentID, metadata, statsCh, coordinator)
2049+
t.Cleanup(c.Close)
20422050

20432051
options := agent.Options{
20442052
Client: c,

agent/agenttest/client.go

+15-21
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ func NewClient(t testing.TB,
3939
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
4040
coordPtr.Store(&coordinator)
4141
mux := drpcmux.New()
42+
derpMapUpdates := make(chan *tailcfg.DERPMap)
4243
drpcService := &tailnet.DRPCService{
43-
CoordPtr: &coordPtr,
44-
Logger: logger,
45-
// TODO: handle DERPMap too!
46-
DerpMapUpdateFrequency: time.Hour,
47-
DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
44+
CoordPtr: &coordPtr,
45+
Logger: logger,
46+
DerpMapUpdateFrequency: time.Microsecond,
47+
DerpMapFn: func() *tailcfg.DERPMap { return <-derpMapUpdates },
4848
}
4949
err := proto.DRPCRegisterTailnet(mux, drpcService)
5050
require.NoError(t, err)
@@ -64,7 +64,7 @@ func NewClient(t testing.TB,
6464
statsChan: statsChan,
6565
coordinator: coordinator,
6666
server: server,
67-
derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
67+
derpMapUpdates: derpMapUpdates,
6868
}
6969
}
7070

@@ -85,23 +85,26 @@ type Client struct {
8585
lifecycleStates []codersdk.WorkspaceAgentLifecycle
8686
startup agentsdk.PostStartupRequest
8787
logs []agentsdk.Log
88-
derpMapUpdates chan agentsdk.DERPMapUpdate
88+
derpMapUpdates chan *tailcfg.DERPMap
89+
derpMapOnce sync.Once
90+
}
91+
92+
func (c *Client) Close() {
93+
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
8994
}
9095

9196
func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
9297
return c.manifest, nil
9398
}
9499

95-
func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
100+
func (c *Client) Listen(ctx context.Context) (drpc.Conn, error) {
96101
conn, lis := drpcsdk.MemTransportPipe()
97-
closed := make(chan struct{})
98102
c.LastWorkspaceAgent = func() {
99103
_ = conn.Close()
100104
_ = lis.Close()
101-
<-closed
102105
}
103106
c.t.Cleanup(c.LastWorkspaceAgent)
104-
serveCtx, cancel := context.WithCancel(context.Background())
107+
serveCtx, cancel := context.WithCancel(ctx)
105108
c.t.Cleanup(cancel)
106109
auth := tailnet.AgentTunnelAuth{}
107110
streamID := tailnet.StreamID{
@@ -112,7 +115,6 @@ func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
112115
serveCtx = tailnet.WithStreamID(serveCtx, streamID)
113116
go func() {
114117
_ = c.server.Serve(serveCtx, lis)
115-
close(closed)
116118
}()
117119
return conn, nil
118120
}
@@ -235,7 +237,7 @@ func (c *Client) GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerCo
235237
return codersdk.ServiceBannerConfig{}, nil
236238
}
237239

238-
func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
240+
func (c *Client) PushDERPMapUpdate(update *tailcfg.DERPMap) error {
239241
timer := time.NewTimer(testutil.WaitShort)
240242
defer timer.Stop()
241243
select {
@@ -247,14 +249,6 @@ func (c *Client) PushDERPMapUpdate(update agentsdk.DERPMapUpdate) error {
247249
return nil
248250
}
249251

250-
func (c *Client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
251-
closed := make(chan struct{})
252-
return c.derpMapUpdates, closeFunc(func() error {
253-
close(closed)
254-
return nil
255-
}), nil
256-
}
257-
258252
type closeFunc func() error
259253

260254
func (c closeFunc) Close() error {

coderd/tailnet_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ func setupAgent(t *testing.T, agentAddresses []netip.Prefix) (uuid.UUID, agent.A
178178
})
179179

180180
c := agenttest.NewClient(t, logger, manifest.AgentID, manifest, make(chan *agentsdk.Stats, 50), coord)
181+
t.Cleanup(c.Close)
181182

182183
options := agent.Options{
183184
Client: c,

coderd/wsconncache/wsconncache_test.go

+23-33
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,16 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
171171
_ = coordinator.Close()
172172
})
173173
manifest.AgentID = uuid.New()
174+
aC := &client{
175+
t: t,
176+
agentID: manifest.AgentID,
177+
manifest: manifest,
178+
coordinator: coordinator,
179+
derpMapUpdates: make(chan *tailcfg.DERPMap),
180+
}
181+
t.Cleanup(aC.close)
174182
closer := agent.New(agent.Options{
175-
Client: &client{
176-
t: t,
177-
agentID: manifest.AgentID,
178-
manifest: manifest,
179-
coordinator: coordinator,
180-
},
183+
Client: aC,
181184
Logger: logger.Named("agent"),
182185
ReconnectingPTYTimeout: ptyTimeout,
183186
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.WorkspaceAgentIP, 128)},
@@ -230,32 +233,20 @@ func setupAgent(t *testing.T, manifest agentsdk.Manifest, ptyTimeout time.Durati
230233
}
231234

232235
type client struct {
233-
t *testing.T
234-
agentID uuid.UUID
235-
manifest agentsdk.Manifest
236-
coordinator tailnet.Coordinator
237-
}
238-
239-
func (c *client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
240-
return c.manifest, nil
236+
t *testing.T
237+
agentID uuid.UUID
238+
manifest agentsdk.Manifest
239+
coordinator tailnet.Coordinator
240+
closeOnce sync.Once
241+
derpMapUpdates chan *tailcfg.DERPMap
241242
}
242243

243-
type closer struct {
244-
closeFunc func() error
244+
func (c *client) close() {
245+
c.closeOnce.Do(func() { close(c.derpMapUpdates) })
245246
}
246247

247-
func (c *closer) Close() error {
248-
return c.closeFunc()
249-
}
250-
251-
func (*client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
252-
closed := make(chan struct{})
253-
return make(<-chan agentsdk.DERPMapUpdate), &closer{
254-
closeFunc: func() error {
255-
close(closed)
256-
return nil
257-
},
258-
}, nil
248+
func (c *client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
249+
return c.manifest, nil
259250
}
260251

261252
func (c *client) Listen(_ context.Context) (drpc.Conn, error) {
@@ -271,11 +262,10 @@ func (c *client) Listen(_ context.Context) (drpc.Conn, error) {
271262
coordPtr.Store(&c.coordinator)
272263
mux := drpcmux.New()
273264
drpcService := &tailnet.DRPCService{
274-
CoordPtr: &coordPtr,
275-
Logger: logger,
276-
// TODO: handle DERPMap too!
277-
DerpMapUpdateFrequency: time.Hour,
278-
DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
265+
CoordPtr: &coordPtr,
266+
Logger: logger,
267+
DerpMapUpdateFrequency: time.Microsecond,
268+
DerpMapFn: func() *tailcfg.DERPMap { return <-c.derpMapUpdates },
279269
}
280270
err := proto.DRPCRegisterTailnet(mux, drpcService)
281271
if err != nil {

0 commit comments

Comments
 (0)