Skip to content

Commit a82e73a

Browse files
committed
feat: add support for multiple tunnel destinations in tailnet
1 parent ed2ba52 commit a82e73a

File tree

5 files changed

+507
-19
lines changed

5 files changed

+507
-19
lines changed

agent/agent_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,7 +1918,8 @@ func TestAgent_UpdatedDERP(t *testing.T) {
19181918
testCtx, testCtxCancel := context.WithCancel(context.Background())
19191919
t.Cleanup(testCtxCancel)
19201920
clientID := uuid.New()
1921-
ctrl := tailnet.NewSingleDestController(logger, conn, agentID)
1921+
ctrl := tailnet.NewTunnelSrcCoordController(logger, conn)
1922+
ctrl.AddDestination(agentID)
19221923
auth := tailnet.ClientCoordinateeAuth{AgentID: agentID}
19231924
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(logger, clientID, auth, coordinator))
19241925
t.Cleanup(func() {
@@ -2408,7 +2409,8 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
24082409
testCtx, testCtxCancel := context.WithCancel(context.Background())
24092410
t.Cleanup(testCtxCancel)
24102411
clientID := uuid.New()
2411-
ctrl := tailnet.NewSingleDestController(logger, conn, metadata.AgentID)
2412+
ctrl := tailnet.NewTunnelSrcCoordController(logger, conn)
2413+
ctrl.AddDestination(metadata.AgentID)
24122414
auth := tailnet.ClientCoordinateeAuth{AgentID: metadata.AgentID}
24132415
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(
24142416
logger, clientID, auth, coordinator))

codersdk/workspacesdk/workspacesdk.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ func (c *Client) DialAgent(dialCtx context.Context, agentID uuid.UUID, options *
268268
_ = conn.Close()
269269
}
270270
}()
271-
controller.CoordCtrl = tailnet.NewSingleDestController(options.Logger, conn, agentID)
271+
coordCtrl := tailnet.NewTunnelSrcCoordController(options.Logger, conn)
272+
coordCtrl.AddDestination(agentID)
273+
controller.CoordCtrl = coordCtrl
272274
controller.DERPCtrl = tailnet.NewBasicDERPController(options.Logger, conn)
273275
controller.Run(ctx)
274276

tailnet/controllers.go

Lines changed: 118 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"maps"
78
"math"
89
"strings"
910
"sync"
@@ -287,34 +288,139 @@ func (c *BasicCoordination) respLoop() {
287288
}
288289
}
289290

290-
type singleDestController struct {
291+
type TunnelSrcCoordController struct {
291292
*BasicCoordinationController
292-
dest uuid.UUID
293+
294+
mu sync.Mutex
295+
dests map[uuid.UUID]struct{}
296+
coordination *BasicCoordination
293297
}
294298

295-
// NewSingleDestController creates a CoordinationController for Coder clients that connect to a
296-
// single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent.
297-
func NewSingleDestController(logger slog.Logger, coordinatee Coordinatee, dest uuid.UUID) CoordinationController {
298-
coordinatee.SetTunnelDestination(dest)
299-
return &singleDestController{
299+
// NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively tunnel
300+
// sources (that is, they create tunnel --- Coder clients not workspaces).
301+
func NewTunnelSrcCoordController(logger slog.Logger, coordinatee Coordinatee) *TunnelSrcCoordController {
302+
return &TunnelSrcCoordController{
300303
BasicCoordinationController: &BasicCoordinationController{
301304
Logger: logger,
302305
Coordinatee: coordinatee,
303306
SendAcks: false,
304307
},
305-
dest: dest,
308+
dests: make(map[uuid.UUID]struct{}),
306309
}
307310
}
308311

309-
func (c *singleDestController) New(client CoordinatorClient) CloserWaiter {
312+
func (c *TunnelSrcCoordController) New(client CoordinatorClient) CloserWaiter {
313+
c.mu.Lock()
314+
defer c.mu.Unlock()
310315
b := c.BasicCoordinationController.NewCoordination(client)
311-
err := client.Send(&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: c.dest[:]}})
312-
if err != nil {
313-
b.SendErr(err)
316+
c.coordination = b
317+
// resync destinations on reconnect
318+
for dest := range c.dests {
319+
err := client.Send(&proto.CoordinateRequest{
320+
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]},
321+
})
322+
if err != nil {
323+
b.SendErr(err)
324+
c.coordination = nil
325+
cErr := client.Close()
326+
if cErr != nil {
327+
c.Logger.Debug(context.Background(), "failed to close coordinator client after add tunnel failure", slog.Error(cErr))
328+
}
329+
break
330+
}
314331
}
315332
return b
316333
}
317334

335+
func (c *TunnelSrcCoordController) AddDestination(dest uuid.UUID) {
336+
c.mu.Lock()
337+
defer c.mu.Unlock()
338+
c.Coordinatee.SetTunnelDestination(dest) // this prepares us for an ack
339+
c.dests[dest] = struct{}{}
340+
if c.coordination == nil {
341+
return
342+
}
343+
err := c.coordination.Client.Send(
344+
&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}})
345+
if err != nil {
346+
c.coordination.SendErr(err)
347+
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
348+
if cErr != nil {
349+
c.Logger.Debug(context.Background(),
350+
"failed to close coordinator client after add tunnel failure",
351+
slog.Error(cErr))
352+
}
353+
c.coordination = nil
354+
}
355+
}
356+
357+
func (c *TunnelSrcCoordController) RemoveDestination(dest uuid.UUID) {
358+
c.mu.Lock()
359+
defer c.mu.Unlock()
360+
delete(c.dests, dest)
361+
if c.coordination == nil {
362+
return
363+
}
364+
err := c.coordination.Client.Send(
365+
&proto.CoordinateRequest{RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}})
366+
if err != nil {
367+
c.coordination.SendErr(err)
368+
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
369+
if cErr != nil {
370+
c.Logger.Debug(context.Background(),
371+
"failed to close coordinator client after remove tunnel failure",
372+
slog.Error(cErr))
373+
}
374+
c.coordination = nil
375+
}
376+
}
377+
378+
func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
379+
c.mu.Lock()
380+
defer c.mu.Unlock()
381+
toAdd := make(map[uuid.UUID]struct{})
382+
toRemove := maps.Clone(c.dests)
383+
all := make(map[uuid.UUID]struct{})
384+
for _, dest := range destinations {
385+
all[dest] = struct{}{}
386+
delete(toRemove, dest)
387+
if _, ok := c.dests[dest]; !ok {
388+
toAdd[dest] = struct{}{}
389+
}
390+
}
391+
c.dests = all
392+
if c.coordination == nil {
393+
return
394+
}
395+
var err error
396+
defer func() {
397+
if err != nil {
398+
c.coordination.SendErr(err)
399+
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
400+
if cErr != nil {
401+
c.Logger.Debug(context.Background(),
402+
"failed to close coordinator client during sync destinations",
403+
slog.Error(cErr))
404+
}
405+
c.coordination = nil
406+
}
407+
}()
408+
for dest := range toAdd {
409+
err = c.coordination.Client.Send(
410+
&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}})
411+
if err != nil {
412+
return
413+
}
414+
}
415+
for dest := range toRemove {
416+
err = c.coordination.Client.Send(
417+
&proto.CoordinateRequest{RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}})
418+
if err != nil {
419+
return
420+
}
421+
}
422+
}
423+
318424
// NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
319425
// create tunnels and always send ReadyToHandshake acknowledgements.
320426
func NewAgentCoordinationController(logger slog.Logger, coordinatee Coordinatee) CoordinationController {

0 commit comments

Comments
 (0)