Skip to content

Commit 6a08a59

Browse files
committed
change derp map updates to be separate websocket
1 parent b405113 commit 6a08a59

File tree

7 files changed

+375
-59
lines changed

7 files changed

+375
-59
lines changed

agent/agent.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/spf13/afero"
2929
"go.uber.org/atomic"
3030
"golang.org/x/exp/slices"
31+
"golang.org/x/sync/errgroup"
3132
"golang.org/x/xerrors"
3233
"tailscale.com/net/speedtest"
3334
"tailscale.com/tailcfg"
@@ -71,6 +72,7 @@ type Options struct {
7172
type Client interface {
7273
Manifest(ctx context.Context) (agentsdk.Manifest, error)
7374
Listen(ctx context.Context) (net.Conn, error)
75+
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
7476
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
7577
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
7678
PostAppHealth(ctx context.Context, req agentsdk.PostAppHealthsRequest) error
@@ -615,12 +617,26 @@ func (a *agent) run(ctx context.Context) error {
615617
network.SetBlockEndpoints(manifest.DisableDirectConnections)
616618
}
617619

618-
a.logger.Debug(ctx, "running tailnet connection coordinator")
619-
err = a.runCoordinator(ctx, network)
620-
if err != nil {
621-
return xerrors.Errorf("run coordinator: %w", err)
622-
}
623-
return nil
620+
eg, egCtx := errgroup.WithContext(ctx)
621+
eg.Go(func() error {
622+
a.logger.Debug(egCtx, "running tailnet connection coordinator")
623+
err = a.runCoordinator(egCtx, network)
624+
if err != nil {
625+
return xerrors.Errorf("run coordinator: %w", err)
626+
}
627+
return nil
628+
})
629+
630+
eg.Go(func() error {
631+
a.logger.Debug(egCtx, "running derp map subscriber")
632+
err = a.runDERPMapSubscriber(egCtx, network)
633+
if err != nil {
634+
return xerrors.Errorf("run derp map subscriber: %w", err)
635+
}
636+
return nil
637+
})
638+
639+
return eg.Wait()
624640
}
625641

626642
func (a *agent) trackConnGoroutine(fn func()) error {
@@ -829,6 +845,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
829845
}
830846
}
831847

848+
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
849+
func (a *agent) runDERPMapSubscriber(ctx context.Context, network *tailnet.Conn) error {
850+
ctx, cancel := context.WithCancel(ctx)
851+
defer cancel()
852+
853+
updates, closer, err := a.client.DERPMapUpdates(ctx)
854+
if err != nil {
855+
return err
856+
}
857+
defer closer.Close()
858+
859+
a.logger.Info(ctx, "connected to derp map endpoint")
860+
for {
861+
select {
862+
case <-ctx.Done():
863+
return ctx.Err()
864+
case update := <-updates:
865+
if update.Err != nil {
866+
return update.Err
867+
}
868+
if update.DERPMap != nil && !tailnet.CompareDERPMaps(a.network.DERPMap(), update.DERPMap) {
869+
a.logger.Info(ctx, "updating derp map due to detected changes")
870+
network.SetDERPMap(update.DERPMap)
871+
}
872+
}
873+
}
874+
}
875+
832876
func (a *agent) runStartupScript(ctx context.Context, script string) error {
833877
return a.runScript(ctx, "startup", script)
834878
}

agent/agent_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1912,12 +1912,37 @@ type client struct {
19121912
lifecycleStates []codersdk.WorkspaceAgentLifecycle
19131913
startup agentsdk.PostStartupRequest
19141914
logs []agentsdk.StartupLog
1915+
1916+
derpMapUpdates chan agentsdk.DERPMapUpdate
19151917
}
19161918

19171919
func (c *client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
19181920
return c.manifest, nil
19191921
}
19201922

1923+
type closer struct {
1924+
closeFunc func() error
1925+
}
1926+
1927+
func (c *closer) Close() error {
1928+
return c.closeFunc()
1929+
}
1930+
1931+
func (c *client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
1932+
updates := c.derpMapUpdates
1933+
if updates == nil {
1934+
updates = make(chan agentsdk.DERPMapUpdate)
1935+
}
1936+
1937+
closed := make(chan struct{})
1938+
return updates, &closer{
1939+
closeFunc: func() error {
1940+
close(closed)
1941+
return nil
1942+
},
1943+
}, nil
1944+
}
1945+
19211946
func (c *client) Listen(_ context.Context) (net.Conn, error) {
19221947
clientConn, serverConn := net.Pipe()
19231948
closed := make(chan struct{})

coderd/coderd.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,10 @@ func New(options *Options) *API {
486486
r.Use(apiKeyMiddleware)
487487
r.Get("/regions", api.regions)
488488
})
489+
r.Route("/derp-map", func(r chi.Router) {
490+
r.Use(apiKeyMiddleware)
491+
r.Get("/", api.derpMapUpdates)
492+
})
489493
r.Route("/deployment", func(r chi.Router) {
490494
r.Use(apiKeyMiddleware)
491495
r.Get("/config", api.deploymentValues)

coderd/workspaceagents.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,8 @@ func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Req
731731

732732
func (api *API) dialWorkspaceAgentTailnet(agentID uuid.UUID) (*codersdk.WorkspaceAgentConn, error) {
733733
clientConn, serverConn := net.Pipe()
734+
735+
derpMap := api.DERPMap()
734736
conn, err := tailnet.NewConn(&tailnet.Options{
735737
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
736738
DERPMap: api.DERPMap(),
@@ -761,6 +763,28 @@ func (api *API) dialWorkspaceAgentTailnet(agentID uuid.UUID) (*codersdk.Workspac
761763
return conn.UpdateNodes(nodes, true)
762764
})
763765
conn.SetNodeCallback(sendNodes)
766+
go func() {
767+
for {
768+
ticker := time.NewTicker(5 * time.Second)
769+
defer ticker.Stop()
770+
771+
lastDERPMap := derpMap
772+
for {
773+
select {
774+
case <-ctx.Done():
775+
return
776+
case <-ticker.C:
777+
}
778+
779+
derpMap := api.DERPMap()
780+
if lastDERPMap == nil || tailnet.CompareDERPMaps(lastDERPMap, derpMap) {
781+
conn.SetDERPMap(derpMap)
782+
lastDERPMap = derpMap
783+
}
784+
}
785+
}
786+
}()
787+
764788
agentConn := &codersdk.WorkspaceAgentConn{
765789
Conn: conn,
766790
CloseFunc: func() {
@@ -782,6 +806,9 @@ func (api *API) dialWorkspaceAgentTailnet(agentID uuid.UUID) (*codersdk.Workspac
782806
}()
783807
if !agentConn.AwaitReachable(ctx) {
784808
_ = agentConn.Close()
809+
_ = serverConn.Close()
810+
_ = clientConn.Close()
811+
cancel()
785812
return nil, xerrors.Errorf("agent not reachable")
786813
}
787814
return agentConn, nil
@@ -824,6 +851,55 @@ func (api *API) workspaceAgentConnectionGeneric(rw http.ResponseWriter, r *http.
824851
})
825852
}
826853

854+
// @Summary Get DERP map updates
855+
// @ID get-derp-map-updates
856+
// @Security CoderSessionToken
857+
// @Tags Agents
858+
// @Success 101
859+
// @Router /derpmap [get]
860+
func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
861+
ctx := r.Context()
862+
863+
api.WebsocketWaitMutex.Lock()
864+
api.WebsocketWaitGroup.Add(1)
865+
api.WebsocketWaitMutex.Unlock()
866+
defer api.WebsocketWaitGroup.Done()
867+
868+
ws, err := websocket.Accept(rw, r, nil)
869+
if err != nil {
870+
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
871+
Message: "Failed to accept websocket.",
872+
Detail: err.Error(),
873+
})
874+
return
875+
}
876+
nconn := websocket.NetConn(ctx, ws, websocket.MessageBinary)
877+
878+
ticker := time.NewTicker(5 * time.Second)
879+
defer ticker.Stop()
880+
881+
var lastDERPMap *tailcfg.DERPMap
882+
for {
883+
derpMap := api.DERPMap()
884+
if lastDERPMap == nil || !tailnet.CompareDERPMaps(lastDERPMap, derpMap) {
885+
err := json.NewEncoder(nconn).Encode(derpMap)
886+
if err != nil {
887+
_ = ws.Close(websocket.StatusInternalError, err.Error())
888+
return
889+
}
890+
lastDERPMap = derpMap
891+
}
892+
893+
select {
894+
case <-ctx.Done():
895+
return
896+
case <-api.ctx.Done():
897+
return
898+
case <-ticker.C:
899+
}
900+
}
901+
}
902+
827903
// @Summary Coordinate workspace agent via Tailnet
828904
// @Description It accepts a WebSocket connection to an agent that listens to
829905
// @Description incoming connections and publishes node updates.

coderd/wsconncache/wsconncache_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,24 @@ func (c *client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
219219
return c.manifest, nil
220220
}
221221

222+
type closer struct {
223+
closeFunc func() error
224+
}
225+
226+
func (c *closer) Close() error {
227+
return c.closeFunc()
228+
}
229+
230+
func (c *client) DERPMapUpdates(_ context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error) {
231+
closed := make(chan struct{})
232+
return make(<-chan agentsdk.DERPMapUpdate), &closer{
233+
closeFunc: func() error {
234+
close(closed)
235+
return nil
236+
},
237+
}, nil
238+
}
239+
222240
func (c *client) Listen(_ context.Context) (net.Conn, error) {
223241
clientConn, serverConn := net.Pipe()
224242
closed := make(chan struct{})

0 commit comments

Comments
 (0)