Skip to content

Commit 5f516ed

Browse files
authored
feat: improve coder connect tunnel handling on reconnect (#17598)
Closes coder/internal#563 The [Coder Connect tunnel](https://github.com/coder/coder/blob/main/vpn/tunnel.go) receives workspace state from the Coder server over a [dRPC stream.](https://github.com/coder/coder/blob/114ba4593b2a82dfd41cdcb7fd6eb70d866e7b86/tailnet/controllers.go#L1029) When first connecting to this stream, the current state of the user's workspaces is received, with subsequent messages being diffs on top of that state. However, if the client disconnects from this stream, such as when the user's device is suspended, and then reconnects later, no mechanism exists for the tunnel to differentiate that message containing the entire initial state from another diff, and so that state is incorrectly applied as a diff. In practice: - Tunnel connects, receives a workspace update containing all the existing workspaces & agents. - Tunnel loses connection, but isn't completely stopped. - All the user's workspaces are restarted, producing a new set of agents. - Tunnel regains connection, and receives a workspace update containing all the existing workspaces & agents. - This initial update is incorrectly applied as a diff, with the Tunnel's state containing both the old & new agents. This PR introduces a solution in which tunnelUpdater, when created, sends a FreshState flag with the WorkspaceUpdate type. This flag is handled in the vpn tunnel in the following fashion: - Preserve existing Agents - Remove current Agents in the tunnel that are not present in the WorkspaceUpdate - Remove unreferenced Workspaces
1 parent ebad5c3 commit 5f516ed

File tree

4 files changed

+498
-16
lines changed

4 files changed

+498
-16
lines changed

tailnet/controllers.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,21 @@ type Workspace struct {
897897
agents map[uuid.UUID]*Agent
898898
}
899899

900+
func (w *Workspace) Clone() Workspace {
901+
agents := make(map[uuid.UUID]*Agent, len(w.agents))
902+
for k, v := range w.agents {
903+
clone := v.Clone()
904+
agents[k] = &clone
905+
}
906+
return Workspace{
907+
ID: w.ID,
908+
Name: w.Name,
909+
Status: w.Status,
910+
ownerUsername: w.ownerUsername,
911+
agents: agents,
912+
}
913+
}
914+
900915
type DNSNameOptions struct {
901916
Suffix string
902917
}
@@ -1049,6 +1064,7 @@ func (t *tunnelUpdater) recvLoop() {
10491064
t.logger.Debug(context.Background(), "tunnel updater recvLoop started")
10501065
defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done")
10511066
defer close(t.recvLoopDone)
1067+
updateKind := Snapshot
10521068
for {
10531069
update, err := t.client.Recv()
10541070
if err != nil {
@@ -1061,8 +1077,10 @@ func (t *tunnelUpdater) recvLoop() {
10611077
}
10621078
t.logger.Debug(context.Background(), "got workspace update",
10631079
slog.F("workspace_update", update),
1080+
slog.F("update_kind", updateKind),
10641081
)
1065-
err = t.handleUpdate(update)
1082+
err = t.handleUpdate(update, updateKind)
1083+
updateKind = Diff
10661084
if err != nil {
10671085
t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err))
10681086
cErr := t.client.Close()
@@ -1083,14 +1101,23 @@ type WorkspaceUpdate struct {
10831101
UpsertedAgents []*Agent
10841102
DeletedWorkspaces []*Workspace
10851103
DeletedAgents []*Agent
1104+
Kind UpdateKind
10861105
}
10871106

1107+
type UpdateKind int
1108+
1109+
const (
1110+
Diff UpdateKind = iota
1111+
Snapshot
1112+
)
1113+
10881114
func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
10891115
clone := WorkspaceUpdate{
10901116
UpsertedWorkspaces: make([]*Workspace, len(w.UpsertedWorkspaces)),
10911117
UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)),
10921118
DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)),
10931119
DeletedAgents: make([]*Agent, len(w.DeletedAgents)),
1120+
Kind: w.Kind,
10941121
}
10951122
for i, ws := range w.UpsertedWorkspaces {
10961123
clone.UpsertedWorkspaces[i] = &Workspace{
@@ -1115,7 +1142,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
11151142
return clone
11161143
}
11171144

1118-
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
1145+
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate, updateKind UpdateKind) error {
11191146
t.Lock()
11201147
defer t.Unlock()
11211148

@@ -1124,6 +1151,7 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
11241151
UpsertedAgents: []*Agent{},
11251152
DeletedWorkspaces: []*Workspace{},
11261153
DeletedAgents: []*Agent{},
1154+
Kind: updateKind,
11271155
}
11281156

11291157
for _, uw := range update.UpsertedWorkspaces {

tailnet/controllers_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -1611,6 +1611,7 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
16111611
},
16121612
DeletedWorkspaces: []*tailnet.Workspace{},
16131613
DeletedAgents: []*tailnet.Agent{},
1614+
Kind: tailnet.Snapshot,
16141615
}
16151616

16161617
// And the callback
@@ -1626,6 +1627,9 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
16261627
slices.SortFunc(recvState.UpsertedAgents, func(a, b *tailnet.Agent) int {
16271628
return strings.Compare(a.Name, b.Name)
16281629
})
1630+
// tunnel is still open, so it's a diff
1631+
currentState.Kind = tailnet.Diff
1632+
16291633
require.Equal(t, currentState, recvState)
16301634
}
16311635

@@ -1692,14 +1696,17 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
16921696
},
16931697
DeletedWorkspaces: []*tailnet.Workspace{},
16941698
DeletedAgents: []*tailnet.Agent{},
1699+
Kind: tailnet.Snapshot,
16951700
}
16961701

16971702
cbUpdate := testutil.TryReceive(ctx, t, fUH.ch)
16981703
require.Equal(t, initRecvUp, cbUpdate)
16991704

1700-
// Current state should match initial
17011705
state, err := updateCtrl.CurrentState()
17021706
require.NoError(t, err)
1707+
// tunnel is still open, so it's a diff
1708+
initRecvUp.Kind = tailnet.Diff
1709+
17031710
require.Equal(t, initRecvUp, state)
17041711

17051712
// Send update that removes w1a1 and adds w1a2

vpn/tunnel.go

+64-13
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func NewTunnel(
8888
netLoopDone: make(chan struct{}),
8989
uSendCh: s.sendCh,
9090
agents: map[uuid.UUID]tailnet.Agent{},
91+
workspaces: map[uuid.UUID]tailnet.Workspace{},
9192
clock: quartz.NewReal(),
9293
},
9394
}
@@ -347,7 +348,9 @@ type updater struct {
347348
uSendCh chan<- *TunnelMessage
348349
// agents contains the agents that are currently connected to the tunnel.
349350
agents map[uuid.UUID]tailnet.Agent
350-
conn Conn
351+
// workspaces contains the workspaces to which agents are currently connected via the tunnel.
352+
workspaces map[uuid.UUID]tailnet.Workspace
353+
conn Conn
351354

352355
clock quartz.Clock
353356
}
@@ -397,14 +400,32 @@ func (u *updater) sendUpdateResponse(req *request[*TunnelMessage, *ManagerMessag
397400
// createPeerUpdateLocked creates a PeerUpdate message from a workspace update, populating
398401
// the network status of the agents.
399402
func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUpdate {
403+
// if the update is a snapshot, we need to process the full state
404+
if update.Kind == tailnet.Snapshot {
405+
processSnapshotUpdate(&update, u.agents, u.workspaces)
406+
}
407+
400408
out := &PeerUpdate{
401409
UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)),
402410
UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)),
403411
DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)),
404412
DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
405413
}
406414

407-
u.saveUpdateLocked(update)
415+
// save the workspace update to the tunnel's state, such that it can
416+
// be used to populate automated peer updates.
417+
for _, agent := range update.UpsertedAgents {
418+
u.agents[agent.ID] = agent.Clone()
419+
}
420+
for _, agent := range update.DeletedAgents {
421+
delete(u.agents, agent.ID)
422+
}
423+
for _, workspace := range update.UpsertedWorkspaces {
424+
u.workspaces[workspace.ID] = workspace.Clone()
425+
}
426+
for _, workspace := range update.DeletedWorkspaces {
427+
delete(u.workspaces, workspace.ID)
428+
}
408429

409430
for i, ws := range update.UpsertedWorkspaces {
410431
out.UpsertedWorkspaces[i] = &Workspace{
@@ -413,6 +434,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
413434
Status: Workspace_Status(ws.Status),
414435
}
415436
}
437+
416438
upsertedAgents := u.convertAgentsLocked(update.UpsertedAgents)
417439
out.UpsertedAgents = upsertedAgents
418440
for i, ws := range update.DeletedWorkspaces {
@@ -472,17 +494,6 @@ func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent {
472494
return out
473495
}
474496

475-
// saveUpdateLocked saves the workspace update to the tunnel's state, such that it can
476-
// be used to populate automated peer updates.
477-
func (u *updater) saveUpdateLocked(update tailnet.WorkspaceUpdate) {
478-
for _, agent := range update.UpsertedAgents {
479-
u.agents[agent.ID] = agent.Clone()
480-
}
481-
for _, agent := range update.DeletedAgents {
482-
delete(u.agents, agent.ID)
483-
}
484-
}
485-
486497
// setConn sets the `conn` and returns false if there's already a connection set.
487498
func (u *updater) setConn(conn Conn) bool {
488499
u.mu.Lock()
@@ -552,6 +563,46 @@ func (u *updater) netStatusLoop() {
552563
}
553564
}
554565

566+
// processSnapshotUpdate handles the logic when a full state update is received.
567+
// When the tunnel is live, we only receive diffs, but the first packet on any given
568+
// reconnect to the tailnet API is a full state.
569+
// Without this logic we weren't processing deletes for any workspaces or agents deleted
570+
// while the client was disconnected while the computer was asleep.
571+
func processSnapshotUpdate(update *tailnet.WorkspaceUpdate, agents map[uuid.UUID]tailnet.Agent, workspaces map[uuid.UUID]tailnet.Workspace) {
572+
// ignoredWorkspaces is initially populated with the workspaces that are
573+
// in the current update. Later on we populate it with the deleted workspaces too
574+
// so that we don't send duplicate updates. Same applies to ignoredAgents.
575+
ignoredWorkspaces := make(map[uuid.UUID]struct{}, len(update.UpsertedWorkspaces))
576+
ignoredAgents := make(map[uuid.UUID]struct{}, len(update.UpsertedAgents))
577+
578+
for _, workspace := range update.UpsertedWorkspaces {
579+
ignoredWorkspaces[workspace.ID] = struct{}{}
580+
}
581+
for _, agent := range update.UpsertedAgents {
582+
ignoredAgents[agent.ID] = struct{}{}
583+
}
584+
for _, agent := range agents {
585+
if _, present := ignoredAgents[agent.ID]; !present {
586+
// delete any current agents that are not in the new update
587+
update.DeletedAgents = append(update.DeletedAgents, &tailnet.Agent{
588+
ID: agent.ID,
589+
Name: agent.Name,
590+
WorkspaceID: agent.WorkspaceID,
591+
})
592+
}
593+
}
594+
for _, workspace := range workspaces {
595+
if _, present := ignoredWorkspaces[workspace.ID]; !present {
596+
update.DeletedWorkspaces = append(update.DeletedWorkspaces, &tailnet.Workspace{
597+
ID: workspace.ID,
598+
Name: workspace.Name,
599+
Status: workspace.Status,
600+
})
601+
ignoredWorkspaces[workspace.ID] = struct{}{}
602+
}
603+
}
604+
}
605+
555606
// hostsToIPStrings returns a slice of all unique IP addresses in the values
556607
// of the given map.
557608
func hostsToIPStrings(hosts map[dnsname.FQDN][]netip.Addr) []string {

0 commit comments

Comments
 (0)