Skip to content

Commit 52f1c2b

Browse files
committed
initial implementation
1 parent b47d54d commit 52f1c2b

File tree

4 files changed

+159
-16
lines changed

4 files changed

+159
-16
lines changed

tailnet/controllers.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ func (t *tunnelUpdater) recvLoop() {
10491049
t.logger.Debug(context.Background(), "tunnel updater recvLoop started")
10501050
defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done")
10511051
defer close(t.recvLoopDone)
1052+
freshState := true
10521053
for {
10531054
update, err := t.client.Recv()
10541055
if err != nil {
@@ -1061,8 +1062,10 @@ func (t *tunnelUpdater) recvLoop() {
10611062
}
10621063
t.logger.Debug(context.Background(), "got workspace update",
10631064
slog.F("workspace_update", update),
1065+
slog.F("fresh_state", freshState),
10641066
)
1065-
err = t.handleUpdate(update)
1067+
err = t.handleUpdate(update, freshState)
1068+
freshState = false
10661069
if err != nil {
10671070
t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err))
10681071
cErr := t.client.Close()
@@ -1083,6 +1086,7 @@ type WorkspaceUpdate struct {
10831086
UpsertedAgents []*Agent
10841087
DeletedWorkspaces []*Workspace
10851088
DeletedAgents []*Agent
1089+
FreshState bool
10861090
}
10871091

10881092
func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
@@ -1091,6 +1095,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
10911095
UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)),
10921096
DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)),
10931097
DeletedAgents: make([]*Agent, len(w.DeletedAgents)),
1098+
FreshState: w.FreshState,
10941099
}
10951100
for i, ws := range w.UpsertedWorkspaces {
10961101
clone.UpsertedWorkspaces[i] = &Workspace{
@@ -1115,7 +1120,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
11151120
return clone
11161121
}
11171122

1118-
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
1123+
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate, freshState bool) error {
11191124
t.Lock()
11201125
defer t.Unlock()
11211126

@@ -1124,6 +1129,7 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
11241129
UpsertedAgents: []*Agent{},
11251130
DeletedWorkspaces: []*Workspace{},
11261131
DeletedAgents: []*Agent{},
1132+
FreshState: freshState,
11271133
}
11281134

11291135
for _, uw := range update.UpsertedWorkspaces {

tailnet/controllers_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -1611,13 +1611,15 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
16111611
},
16121612
DeletedWorkspaces: []*tailnet.Workspace{},
16131613
DeletedAgents: []*tailnet.Agent{},
1614+
FreshState: true,
16141615
}
16151616

16161617
// And the callback
16171618
cbUpdate := testutil.TryReceive(ctx, t, fUH.ch)
16181619
require.Equal(t, currentState, cbUpdate)
16191620

1620-
// Current recvState should match
1621+
// Current recvState should match but shouldn't be a fresh state
1622+
currentState.FreshState = false
16211623
recvState, err := updateCtrl.CurrentState()
16221624
require.NoError(t, err)
16231625
slices.SortFunc(recvState.UpsertedWorkspaces, func(a, b *tailnet.Workspace) int {
@@ -1692,12 +1694,14 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
16921694
},
16931695
DeletedWorkspaces: []*tailnet.Workspace{},
16941696
DeletedAgents: []*tailnet.Agent{},
1697+
FreshState: true,
16951698
}
16961699

16971700
cbUpdate := testutil.TryReceive(ctx, t, fUH.ch)
16981701
require.Equal(t, initRecvUp, cbUpdate)
16991702

1700-
// Current state should match initial
1703+
// Current state should match initial but shouldn't be a fresh state
1704+
initRecvUp.FreshState = false
17011705
state, err := updateCtrl.CurrentState()
17021706
require.NoError(t, err)
17031707
require.Equal(t, initRecvUp, state)
@@ -1753,6 +1757,7 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
17531757
"w1.coder.": {ws1a1IP},
17541758
}},
17551759
},
1760+
FreshState: false,
17561761
}
17571762
require.Equal(t, sndRecvUpdate, cbUpdate)
17581763

@@ -1771,6 +1776,7 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
17711776
},
17721777
DeletedWorkspaces: []*tailnet.Workspace{},
17731778
DeletedAgents: []*tailnet.Agent{},
1779+
FreshState: false,
17741780
}, state)
17751781
}
17761782

vpn/tunnel.go

+45-12
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,57 @@ func (u *updater) sendUpdateResponse(req *request[*TunnelMessage, *ManagerMessag
397397
// createPeerUpdateLocked creates a PeerUpdate message from a workspace update, populating
398398
// the network status of the agents.
399399
func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUpdate {
400+
// this flag is true on the first update after a reconnect
401+
if update.FreshState {
402+
// ignoredWorkspaces is initially populated with the workspaces that are
403+
// in the current update. Later on we populate it with the deleted workspaces too
404+
// so that we don't send duplicate updates. Same applies to ignoredAgents.
405+
ignoredWorkspaces := make(map[uuid.UUID]struct{}, len(update.UpsertedWorkspaces))
406+
ignoredAgents := make(map[uuid.UUID]struct{}, len(update.UpsertedAgents))
407+
408+
for _, workspace := range update.UpsertedWorkspaces {
409+
ignoredWorkspaces[workspace.ID] = struct{}{}
410+
}
411+
for _, agent := range update.UpsertedAgents {
412+
ignoredAgents[agent.ID] = struct{}{}
413+
}
414+
for _, agent := range u.agents {
415+
if _, ok := ignoredAgents[agent.ID]; !ok {
416+
// delete any current agents that are not in the new update
417+
update.DeletedAgents = append(update.DeletedAgents, &tailnet.Agent{
418+
ID: agent.ID,
419+
Name: agent.Name,
420+
WorkspaceID: agent.WorkspaceID,
421+
})
422+
// if the workspace connected to an agent we're deleting,
423+
// is not present in the fresh state, add it to the deleted workspaces
424+
if _, ok := ignoredWorkspaces[agent.WorkspaceID]; !ok {
425+
update.DeletedWorkspaces = append(update.DeletedWorkspaces, &tailnet.Workspace{
426+
// other fields cannot be populated because the tunnel
427+
// only stores agents and corresponding workspaceIDs
428+
ID: agent.WorkspaceID,
429+
})
430+
ignoredWorkspaces[agent.WorkspaceID] = struct{}{}
431+
}
432+
}
433+
}
434+
}
435+
400436
out := &PeerUpdate{
401437
UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)),
402438
UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)),
403439
DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)),
404440
DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
405441
}
406442

407-
u.saveUpdateLocked(update)
443+
// save the workspace update to the tunnel's state, such that it can
444+
// be used to populate automated peer updates.
445+
for _, agent := range update.UpsertedAgents {
446+
u.agents[agent.ID] = agent.Clone()
447+
}
448+
for _, agent := range update.DeletedAgents {
449+
delete(u.agents, agent.ID)
450+
}
408451

409452
for i, ws := range update.UpsertedWorkspaces {
410453
out.UpsertedWorkspaces[i] = &Workspace{
@@ -413,6 +456,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
413456
Status: Workspace_Status(ws.Status),
414457
}
415458
}
459+
416460
upsertedAgents := u.convertAgentsLocked(update.UpsertedAgents)
417461
out.UpsertedAgents = upsertedAgents
418462
for i, ws := range update.DeletedWorkspaces {
@@ -472,17 +516,6 @@ func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent {
472516
return out
473517
}
474518

475-
// saveUpdateLocked saves the workspace update to the tunnel's state, such that it can
476-
// be used to populate automated peer updates.
477-
func (u *updater) saveUpdateLocked(update tailnet.WorkspaceUpdate) {
478-
for _, agent := range update.UpsertedAgents {
479-
u.agents[agent.ID] = agent.Clone()
480-
}
481-
for _, agent := range update.DeletedAgents {
482-
delete(u.agents, agent.ID)
483-
}
484-
}
485-
486519
// setConn sets the `conn` and returns false if there's already a connection set.
487520
func (u *updater) setConn(conn Conn) bool {
488521
u.mu.Lock()

vpn/tunnel_internal_test.go

+98
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,104 @@ func TestTunnel_sendAgentUpdate(t *testing.T) {
486486
require.Equal(t, hsTime, req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
487487
}
488488

489+
func TestTunnel_sendAgentUpdateReconnect(t *testing.T) {
490+
t.Parallel()
491+
492+
ctx := testutil.Context(t, testutil.WaitShort)
493+
494+
mClock := quartz.NewMock(t)
495+
496+
wID1 := uuid.UUID{1}
497+
aID1 := uuid.UUID{2}
498+
aID2 := uuid.UUID{3}
499+
hsTime := time.Now().Add(-time.Minute).UTC()
500+
501+
client := newFakeClient(ctx, t)
502+
conn := newFakeConn(tailnet.WorkspaceUpdate{}, hsTime)
503+
504+
tun, mgr := setupTunnel(t, ctx, client, mClock)
505+
errCh := make(chan error, 1)
506+
var resp *TunnelMessage
507+
go func() {
508+
r, err := mgr.unaryRPC(ctx, &ManagerMessage{
509+
Msg: &ManagerMessage_Start{
510+
Start: &StartRequest{
511+
TunnelFileDescriptor: 2,
512+
CoderUrl: "https://coder.example.com",
513+
ApiToken: "fakeToken",
514+
},
515+
},
516+
})
517+
resp = r
518+
errCh <- err
519+
}()
520+
testutil.RequireSend(ctx, t, client.ch, conn)
521+
err := testutil.TryReceive(ctx, t, errCh)
522+
require.NoError(t, err)
523+
_, ok := resp.Msg.(*TunnelMessage_Start)
524+
require.True(t, ok)
525+
526+
// Inform the tunnel of the initial state
527+
err = tun.Update(tailnet.WorkspaceUpdate{
528+
UpsertedWorkspaces: []*tailnet.Workspace{
529+
{
530+
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
531+
},
532+
},
533+
UpsertedAgents: []*tailnet.Agent{
534+
{
535+
ID: aID1,
536+
Name: "agent1",
537+
WorkspaceID: wID1,
538+
Hosts: map[dnsname.FQDN][]netip.Addr{
539+
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
540+
},
541+
},
542+
},
543+
})
544+
require.NoError(t, err)
545+
req := testutil.TryReceive(ctx, t, mgr.requests)
546+
require.Nil(t, req.msg.Rpc)
547+
require.NotNil(t, req.msg.GetPeerUpdate())
548+
require.Len(t, req.msg.GetPeerUpdate().UpsertedAgents, 1)
549+
require.Equal(t, aID1[:], req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
550+
551+
// Upsert a new agent simulating a reconnect
552+
err = tun.Update(tailnet.WorkspaceUpdate{
553+
UpsertedWorkspaces: []*tailnet.Workspace{
554+
{
555+
ID: wID1, Name: "w1", Status: proto.Workspace_STARTING,
556+
},
557+
},
558+
UpsertedAgents: []*tailnet.Agent{
559+
{
560+
ID: aID2,
561+
Name: "agent2",
562+
WorkspaceID: wID1,
563+
Hosts: map[dnsname.FQDN][]netip.Addr{
564+
"agent2.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
565+
},
566+
},
567+
},
568+
FreshState: true,
569+
})
570+
require.NoError(t, err)
571+
572+
// The new update only contains the new agent
573+
mClock.AdvanceNext()
574+
req = testutil.TryReceive(ctx, t, mgr.requests)
575+
require.Nil(t, req.msg.Rpc)
576+
peerUpdate := req.msg.GetPeerUpdate()
577+
require.NotNil(t, peerUpdate)
578+
require.Len(t, peerUpdate.UpsertedAgents, 1)
579+
require.Len(t, peerUpdate.DeletedAgents, 1)
580+
581+
require.Equal(t, aID2[:], peerUpdate.UpsertedAgents[0].Id)
582+
require.Equal(t, hsTime, peerUpdate.UpsertedAgents[0].LastHandshake.AsTime())
583+
584+
require.Equal(t, aID1[:], peerUpdate.DeletedAgents[0].Id)
585+
}
586+
489587
//nolint:revive // t takes precedence
490588
func setupTunnel(t *testing.T, ctx context.Context, client *fakeClient, mClock quartz.Clock) (*Tunnel, *speaker[*ManagerMessage, *TunnelMessage, TunnelMessage]) {
491589
mp, tp := net.Pipe()

0 commit comments

Comments
 (0)