Skip to content

Commit ce65647

Browse files
committed
feat: add support for multiple tunnel destinations in tailnet
1 parent 8c00ebc commit ce65647

File tree

5 files changed

+530
-23
lines changed

5 files changed

+530
-23
lines changed

agent/agent_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,7 +1918,8 @@ func TestAgent_UpdatedDERP(t *testing.T) {
19181918
testCtx, testCtxCancel := context.WithCancel(context.Background())
19191919
t.Cleanup(testCtxCancel)
19201920
clientID := uuid.New()
1921-
ctrl := tailnet.NewSingleDestController(logger, conn, agentID)
1921+
ctrl := tailnet.NewTunnelSrcCoordController(logger, conn)
1922+
ctrl.AddDestination(agentID)
19221923
auth := tailnet.ClientCoordinateeAuth{AgentID: agentID}
19231924
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(logger, clientID, auth, coordinator))
19241925
t.Cleanup(func() {
@@ -2408,7 +2409,8 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
24082409
testCtx, testCtxCancel := context.WithCancel(context.Background())
24092410
t.Cleanup(testCtxCancel)
24102411
clientID := uuid.New()
2411-
ctrl := tailnet.NewSingleDestController(logger, conn, metadata.AgentID)
2412+
ctrl := tailnet.NewTunnelSrcCoordController(logger, conn)
2413+
ctrl.AddDestination(metadata.AgentID)
24122414
auth := tailnet.ClientCoordinateeAuth{AgentID: metadata.AgentID}
24132415
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(
24142416
logger, clientID, auth, coordinator))

codersdk/workspacesdk/workspacesdk.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ func (c *Client) DialAgent(dialCtx context.Context, agentID uuid.UUID, options *
268268
_ = conn.Close()
269269
}
270270
}()
271-
controller.CoordCtrl = tailnet.NewSingleDestController(options.Logger, conn, agentID)
271+
coordCtrl := tailnet.NewTunnelSrcCoordController(options.Logger, conn)
272+
coordCtrl.AddDestination(agentID)
273+
controller.CoordCtrl = coordCtrl
272274
controller.DERPCtrl = tailnet.NewBasicDERPController(options.Logger, conn)
273275
controller.Run(ctx)
274276

tailnet/controllers.go

Lines changed: 141 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"maps"
78
"math"
89
"strings"
910
"sync"
@@ -239,15 +240,17 @@ func (c *BasicCoordination) respLoop() {
239240
defer func() {
240241
cErr := c.Client.Close()
241242
if cErr != nil {
242-
c.logger.Debug(context.Background(), "failed to close coordinate client after respLoop exit", slog.Error(cErr))
243+
c.logger.Debug(context.Background(),
244+
"failed to close coordinate client after respLoop exit", slog.Error(cErr))
243245
}
244246
c.coordinatee.SetAllPeersLost()
245247
close(c.respLoopDone)
246248
}()
247249
for {
248250
resp, err := c.Client.Recv()
249251
if err != nil {
250-
c.logger.Debug(context.Background(), "failed to read from protocol", slog.Error(err))
252+
c.logger.Debug(context.Background(),
253+
"failed to read from protocol", slog.Error(err))
251254
c.SendErr(xerrors.Errorf("read: %w", err))
252255
return
253256
}
@@ -278,7 +281,8 @@ func (c *BasicCoordination) respLoop() {
278281
ReadyForHandshake: rfh,
279282
})
280283
if err != nil {
281-
c.logger.Debug(context.Background(), "failed to send ready for handshake", slog.Error(err))
284+
c.logger.Debug(context.Background(),
285+
"failed to send ready for handshake", slog.Error(err))
282286
c.SendErr(xerrors.Errorf("send: %w", err))
283287
return
284288
}
@@ -287,37 +291,158 @@ func (c *BasicCoordination) respLoop() {
287291
}
288292
}
289293

290-
type singleDestController struct {
294+
type TunnelSrcCoordController struct {
291295
*BasicCoordinationController
292-
dest uuid.UUID
296+
297+
mu sync.Mutex
298+
dests map[uuid.UUID]struct{}
299+
coordination *BasicCoordination
293300
}
294301

295-
// NewSingleDestController creates a CoordinationController for Coder clients that connect to a
296-
// single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent.
297-
func NewSingleDestController(logger slog.Logger, coordinatee Coordinatee, dest uuid.UUID) CoordinationController {
298-
coordinatee.SetTunnelDestination(dest)
299-
return &singleDestController{
302+
// NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively
303+
// tunnel sources (that is, they create tunnel --- Coder clients not workspaces).
304+
func NewTunnelSrcCoordController(
305+
logger slog.Logger, coordinatee Coordinatee,
306+
) *TunnelSrcCoordController {
307+
return &TunnelSrcCoordController{
300308
BasicCoordinationController: &BasicCoordinationController{
301309
Logger: logger,
302310
Coordinatee: coordinatee,
303311
SendAcks: false,
304312
},
305-
dest: dest,
313+
dests: make(map[uuid.UUID]struct{}),
306314
}
307315
}
308316

309-
func (c *singleDestController) New(client CoordinatorClient) CloserWaiter {
317+
func (c *TunnelSrcCoordController) New(client CoordinatorClient) CloserWaiter {
318+
c.mu.Lock()
319+
defer c.mu.Unlock()
310320
b := c.BasicCoordinationController.NewCoordination(client)
311-
err := client.Send(&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: c.dest[:]}})
312-
if err != nil {
313-
b.SendErr(err)
321+
c.coordination = b
322+
// resync destinations on reconnect
323+
for dest := range c.dests {
324+
err := client.Send(&proto.CoordinateRequest{
325+
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
326+
})
327+
if err != nil {
328+
b.SendErr(err)
329+
c.coordination = nil
330+
cErr := client.Close()
331+
if cErr != nil {
332+
c.Logger.Debug(
333+
context.Background(),
334+
"failed to close coordinator client after add tunnel failure",
335+
slog.Error(cErr),
336+
)
337+
}
338+
break
339+
}
314340
}
315341
return b
316342
}
317343

344+
func (c *TunnelSrcCoordController) AddDestination(dest uuid.UUID) {
345+
c.mu.Lock()
346+
defer c.mu.Unlock()
347+
c.Coordinatee.SetTunnelDestination(dest) // this prepares us for an ack
348+
c.dests[dest] = struct{}{}
349+
if c.coordination == nil {
350+
return
351+
}
352+
err := c.coordination.Client.Send(
353+
&proto.CoordinateRequest{
354+
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
355+
})
356+
if err != nil {
357+
c.coordination.SendErr(err)
358+
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
359+
if cErr != nil {
360+
c.Logger.Debug(context.Background(),
361+
"failed to close coordinator client after add tunnel failure",
362+
slog.Error(cErr))
363+
}
364+
c.coordination = nil
365+
}
366+
}
367+
368+
func (c *TunnelSrcCoordController) RemoveDestination(dest uuid.UUID) {
369+
c.mu.Lock()
370+
defer c.mu.Unlock()
371+
delete(c.dests, dest)
372+
if c.coordination == nil {
373+
return
374+
}
375+
err := c.coordination.Client.Send(
376+
&proto.CoordinateRequest{
377+
RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
378+
})
379+
if err != nil {
380+
c.coordination.SendErr(err)
381+
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
382+
if cErr != nil {
383+
c.Logger.Debug(context.Background(),
384+
"failed to close coordinator client after remove tunnel failure",
385+
slog.Error(cErr))
386+
}
387+
c.coordination = nil
388+
}
389+
}
390+
391+
func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
392+
c.mu.Lock()
393+
defer c.mu.Unlock()
394+
toAdd := make(map[uuid.UUID]struct{})
395+
toRemove := maps.Clone(c.dests)
396+
all := make(map[uuid.UUID]struct{})
397+
for _, dest := range destinations {
398+
all[dest] = struct{}{}
399+
delete(toRemove, dest)
400+
if _, ok := c.dests[dest]; !ok {
401+
toAdd[dest] = struct{}{}
402+
}
403+
}
404+
c.dests = all
405+
if c.coordination == nil {
406+
return
407+
}
408+
var err error
409+
defer func() {
410+
if err != nil {
411+
c.coordination.SendErr(err)
412+
cErr := c.coordination.Client.Close() // don't gracefully disconnect
413+
if cErr != nil {
414+
c.Logger.Debug(context.Background(),
415+
"failed to close coordinator client during sync destinations",
416+
slog.Error(cErr))
417+
}
418+
c.coordination = nil
419+
}
420+
}()
421+
for dest := range toAdd {
422+
err = c.coordination.Client.Send(
423+
&proto.CoordinateRequest{
424+
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
425+
})
426+
if err != nil {
427+
return
428+
}
429+
}
430+
for dest := range toRemove {
431+
err = c.coordination.Client.Send(
432+
&proto.CoordinateRequest{
433+
RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
434+
})
435+
if err != nil {
436+
return
437+
}
438+
}
439+
}
440+
318441
// NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
319442
// create tunnels and always send ReadyToHandshake acknowledgements.
320-
func NewAgentCoordinationController(logger slog.Logger, coordinatee Coordinatee) CoordinationController {
443+
func NewAgentCoordinationController(
444+
logger slog.Logger, coordinatee Coordinatee,
445+
) CoordinationController {
321446
return &BasicCoordinationController{
322447
Logger: logger,
323448
Coordinatee: coordinatee,

0 commit comments

Comments
 (0)