Skip to content

Commit 20acb7b

Browse files
spikecurtispull[bot]
authored andcommitted
feat: add support for multiple tunnel destinations in tailnet (#15409)
Closes #14729 Expands the Coordination controller used by the CLI client to allow multiple tunnel destinations (agents). Our current client uses just one, but this unifies the logic so that when we add Coder VPN, 1 is just a special case of "many."
1 parent 323b310 commit 20acb7b

File tree

5 files changed

+530
-23
lines changed

5 files changed

+530
-23
lines changed

agent/agent_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,7 +1918,8 @@ func TestAgent_UpdatedDERP(t *testing.T) {
19181918
testCtx, testCtxCancel := context.WithCancel(context.Background())
19191919
t.Cleanup(testCtxCancel)
19201920
clientID := uuid.New()
1921-
ctrl := tailnet.NewSingleDestController(logger, conn, agentID)
1921+
ctrl := tailnet.NewTunnelSrcCoordController(logger, conn)
1922+
ctrl.AddDestination(agentID)
19221923
auth := tailnet.ClientCoordinateeAuth{AgentID: agentID}
19231924
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(logger, clientID, auth, coordinator))
19241925
t.Cleanup(func() {
@@ -2408,7 +2409,8 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
24082409
testCtx, testCtxCancel := context.WithCancel(context.Background())
24092410
t.Cleanup(testCtxCancel)
24102411
clientID := uuid.New()
2411-
ctrl := tailnet.NewSingleDestController(logger, conn, metadata.AgentID)
2412+
ctrl := tailnet.NewTunnelSrcCoordController(logger, conn)
2413+
ctrl.AddDestination(metadata.AgentID)
24122414
auth := tailnet.ClientCoordinateeAuth{AgentID: metadata.AgentID}
24132415
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(
24142416
logger, clientID, auth, coordinator))

codersdk/workspacesdk/workspacesdk.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ func (c *Client) DialAgent(dialCtx context.Context, agentID uuid.UUID, options *
268268
_ = conn.Close()
269269
}
270270
}()
271-
controller.CoordCtrl = tailnet.NewSingleDestController(options.Logger, conn, agentID)
271+
coordCtrl := tailnet.NewTunnelSrcCoordController(options.Logger, conn)
272+
coordCtrl.AddDestination(agentID)
273+
controller.CoordCtrl = coordCtrl
272274
controller.DERPCtrl = tailnet.NewBasicDERPController(options.Logger, conn)
273275
controller.Run(ctx)
274276

tailnet/controllers.go

Lines changed: 141 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"maps"
78
"math"
89
"strings"
910
"sync"
@@ -239,15 +240,17 @@ func (c *BasicCoordination) respLoop() {
239240
defer func() {
240241
cErr := c.Client.Close()
241242
if cErr != nil {
242-
c.logger.Debug(context.Background(), "failed to close coordinate client after respLoop exit", slog.Error(cErr))
243+
c.logger.Debug(context.Background(),
244+
"failed to close coordinate client after respLoop exit", slog.Error(cErr))
243245
}
244246
c.coordinatee.SetAllPeersLost()
245247
close(c.respLoopDone)
246248
}()
247249
for {
248250
resp, err := c.Client.Recv()
249251
if err != nil {
250-
c.logger.Debug(context.Background(), "failed to read from protocol", slog.Error(err))
252+
c.logger.Debug(context.Background(),
253+
"failed to read from protocol", slog.Error(err))
251254
c.SendErr(xerrors.Errorf("read: %w", err))
252255
return
253256
}
@@ -278,7 +281,8 @@ func (c *BasicCoordination) respLoop() {
278281
ReadyForHandshake: rfh,
279282
})
280283
if err != nil {
281-
c.logger.Debug(context.Background(), "failed to send ready for handshake", slog.Error(err))
284+
c.logger.Debug(context.Background(),
285+
"failed to send ready for handshake", slog.Error(err))
282286
c.SendErr(xerrors.Errorf("send: %w", err))
283287
return
284288
}
@@ -287,37 +291,158 @@ func (c *BasicCoordination) respLoop() {
287291
}
288292
}
289293

290-
type singleDestController struct {
294+
type TunnelSrcCoordController struct {
291295
*BasicCoordinationController
292-
dest uuid.UUID
296+
297+
mu sync.Mutex
298+
dests map[uuid.UUID]struct{}
299+
coordination *BasicCoordination
293300
}
294301

295-
// NewSingleDestController creates a CoordinationController for Coder clients that connect to a
296-
// single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent.
297-
func NewSingleDestController(logger slog.Logger, coordinatee Coordinatee, dest uuid.UUID) CoordinationController {
298-
coordinatee.SetTunnelDestination(dest)
299-
return &singleDestController{
302+
// NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively
303+
// tunnel sources (that is, they create tunnel --- Coder clients not workspaces).
304+
func NewTunnelSrcCoordController(
305+
logger slog.Logger, coordinatee Coordinatee,
306+
) *TunnelSrcCoordController {
307+
return &TunnelSrcCoordController{
300308
BasicCoordinationController: &BasicCoordinationController{
301309
Logger: logger,
302310
Coordinatee: coordinatee,
303311
SendAcks: false,
304312
},
305-
dest: dest,
313+
dests: make(map[uuid.UUID]struct{}),
306314
}
307315
}
308316

309-
func (c *singleDestController) New(client CoordinatorClient) CloserWaiter {
317+
func (c *TunnelSrcCoordController) New(client CoordinatorClient) CloserWaiter {
318+
c.mu.Lock()
319+
defer c.mu.Unlock()
310320
b := c.BasicCoordinationController.NewCoordination(client)
311-
err := client.Send(&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: c.dest[:]}})
312-
if err != nil {
313-
b.SendErr(err)
321+
c.coordination = b
322+
// resync destinations on reconnect
323+
for dest := range c.dests {
324+
err := client.Send(&proto.CoordinateRequest{
325+
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
326+
})
327+
if err != nil {
328+
b.SendErr(err)
329+
c.coordination = nil
330+
cErr := client.Close()
331+
if cErr != nil {
332+
c.Logger.Debug(
333+
context.Background(),
334+
"failed to close coordinator client after add tunnel failure",
335+
slog.Error(cErr),
336+
)
337+
}
338+
break
339+
}
314340
}
315341
return b
316342
}
317343

344+
func (c *TunnelSrcCoordController) AddDestination(dest uuid.UUID) {
345+
c.mu.Lock()
346+
defer c.mu.Unlock()
347+
c.Coordinatee.SetTunnelDestination(dest) // this prepares us for an ack
348+
c.dests[dest] = struct{}{}
349+
if c.coordination == nil {
350+
return
351+
}
352+
err := c.coordination.Client.Send(
353+
&proto.CoordinateRequest{
354+
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
355+
})
356+
if err != nil {
357+
c.coordination.SendErr(err)
358+
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
359+
if cErr != nil {
360+
c.Logger.Debug(context.Background(),
361+
"failed to close coordinator client after add tunnel failure",
362+
slog.Error(cErr))
363+
}
364+
c.coordination = nil
365+
}
366+
}
367+
368+
func (c *TunnelSrcCoordController) RemoveDestination(dest uuid.UUID) {
369+
c.mu.Lock()
370+
defer c.mu.Unlock()
371+
delete(c.dests, dest)
372+
if c.coordination == nil {
373+
return
374+
}
375+
err := c.coordination.Client.Send(
376+
&proto.CoordinateRequest{
377+
RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
378+
})
379+
if err != nil {
380+
c.coordination.SendErr(err)
381+
cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect
382+
if cErr != nil {
383+
c.Logger.Debug(context.Background(),
384+
"failed to close coordinator client after remove tunnel failure",
385+
slog.Error(cErr))
386+
}
387+
c.coordination = nil
388+
}
389+
}
390+
391+
func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
392+
c.mu.Lock()
393+
defer c.mu.Unlock()
394+
toAdd := make(map[uuid.UUID]struct{})
395+
toRemove := maps.Clone(c.dests)
396+
all := make(map[uuid.UUID]struct{})
397+
for _, dest := range destinations {
398+
all[dest] = struct{}{}
399+
delete(toRemove, dest)
400+
if _, ok := c.dests[dest]; !ok {
401+
toAdd[dest] = struct{}{}
402+
}
403+
}
404+
c.dests = all
405+
if c.coordination == nil {
406+
return
407+
}
408+
var err error
409+
defer func() {
410+
if err != nil {
411+
c.coordination.SendErr(err)
412+
cErr := c.coordination.Client.Close() // don't gracefully disconnect
413+
if cErr != nil {
414+
c.Logger.Debug(context.Background(),
415+
"failed to close coordinator client during sync destinations",
416+
slog.Error(cErr))
417+
}
418+
c.coordination = nil
419+
}
420+
}()
421+
for dest := range toAdd {
422+
err = c.coordination.Client.Send(
423+
&proto.CoordinateRequest{
424+
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
425+
})
426+
if err != nil {
427+
return
428+
}
429+
}
430+
for dest := range toRemove {
431+
err = c.coordination.Client.Send(
432+
&proto.CoordinateRequest{
433+
RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
434+
})
435+
if err != nil {
436+
return
437+
}
438+
}
439+
}
440+
318441
// NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
319442
// create tunnels and always send ReadyToHandshake acknowledgements.
320-
func NewAgentCoordinationController(logger slog.Logger, coordinatee Coordinatee) CoordinationController {
443+
func NewAgentCoordinationController(
444+
logger slog.Logger, coordinatee Coordinatee,
445+
) CoordinationController {
321446
return &BasicCoordinationController{
322447
Logger: logger,
323448
Coordinatee: coordinatee,

0 commit comments

Comments
 (0)