Skip to content

chore: cherry pick for release 2.22 #17842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: improve coder connect tunnel handling on reconnect (#17598)
Closes coder/internal#563

The [Coder Connect
tunnel](https://github.com/coder/coder/blob/main/vpn/tunnel.go) receives
workspace state from the Coder server over a [dRPC
stream.](https://github.com/coder/coder/blob/114ba4593b2a82dfd41cdcb7fd6eb70d866e7b86/tailnet/controllers.go#L1029)
When first connecting to this stream, the current state of the user's
workspaces is received, with subsequent messages being diffs on top of
that state.

However, if the client disconnects from this stream, such as when the
user's device is suspended, and then reconnects later, no mechanism
exists for the tunnel to differentiate that message containing the
entire initial state from another diff, and so that state is incorrectly
applied as a diff.

In practice:
- Tunnel connects, receives a workspace update containing all the
existing workspaces & agents.
- Tunnel loses connection, but isn't completely stopped.
- All the user's workspaces are restarted, producing a new set of
agents.
- Tunnel regains connection, and receives a workspace update containing
all the existing workspaces & agents.
- This initial update is incorrectly applied as a diff, with the
Tunnel's state containing both the old & new agents.

This PR introduces a solution in which tunnelUpdater, when created,
sends a FreshState flag with the WorkspaceUpdate type. This flag is
handled in the vpn tunnel in the following fashion:
- Preserve existing Agents
- Remove current Agents in the tunnel that are not present in the
WorkspaceUpdate
- Remove unreferenced Workspaces

(cherry picked from commit 5f516ed)
  • Loading branch information
ibetitsmike authored and stirby committed May 14, 2025
commit 0d4b15ea081a157b23699e86fe8e5ee636438a8e
32 changes: 30 additions & 2 deletions tailnet/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,21 @@ type Workspace struct {
agents map[uuid.UUID]*Agent
}

func (w *Workspace) Clone() Workspace {
agents := make(map[uuid.UUID]*Agent, len(w.agents))
for k, v := range w.agents {
clone := v.Clone()
agents[k] = &clone
}
return Workspace{
ID: w.ID,
Name: w.Name,
Status: w.Status,
ownerUsername: w.ownerUsername,
agents: agents,
}
}

type DNSNameOptions struct {
Suffix string
}
Expand Down Expand Up @@ -1049,6 +1064,7 @@ func (t *tunnelUpdater) recvLoop() {
t.logger.Debug(context.Background(), "tunnel updater recvLoop started")
defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done")
defer close(t.recvLoopDone)
updateKind := Snapshot
for {
update, err := t.client.Recv()
if err != nil {
Expand All @@ -1061,8 +1077,10 @@ func (t *tunnelUpdater) recvLoop() {
}
t.logger.Debug(context.Background(), "got workspace update",
slog.F("workspace_update", update),
slog.F("update_kind", updateKind),
)
err = t.handleUpdate(update)
err = t.handleUpdate(update, updateKind)
updateKind = Diff
if err != nil {
t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err))
cErr := t.client.Close()
Expand All @@ -1083,14 +1101,23 @@ type WorkspaceUpdate struct {
UpsertedAgents []*Agent
DeletedWorkspaces []*Workspace
DeletedAgents []*Agent
Kind UpdateKind
}

type UpdateKind int

const (
Diff UpdateKind = iota
Snapshot
)

func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
clone := WorkspaceUpdate{
UpsertedWorkspaces: make([]*Workspace, len(w.UpsertedWorkspaces)),
UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)),
DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)),
DeletedAgents: make([]*Agent, len(w.DeletedAgents)),
Kind: w.Kind,
}
for i, ws := range w.UpsertedWorkspaces {
clone.UpsertedWorkspaces[i] = &Workspace{
Expand All @@ -1115,7 +1142,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
return clone
}

func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate, updateKind UpdateKind) error {
t.Lock()
defer t.Unlock()

Expand All @@ -1124,6 +1151,7 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
UpsertedAgents: []*Agent{},
DeletedWorkspaces: []*Workspace{},
DeletedAgents: []*Agent{},
Kind: updateKind,
}

for _, uw := range update.UpsertedWorkspaces {
Expand Down
9 changes: 8 additions & 1 deletion tailnet/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,7 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
Kind: tailnet.Snapshot,
}

// And the callback
Expand All @@ -1626,6 +1627,9 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
slices.SortFunc(recvState.UpsertedAgents, func(a, b *tailnet.Agent) int {
return strings.Compare(a.Name, b.Name)
})
// tunnel is still open, so it's a diff
currentState.Kind = tailnet.Diff

require.Equal(t, currentState, recvState)
}

Expand Down Expand Up @@ -1692,14 +1696,17 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
Kind: tailnet.Snapshot,
}

cbUpdate := testutil.TryReceive(ctx, t, fUH.ch)
require.Equal(t, initRecvUp, cbUpdate)

// Current state should match initial
state, err := updateCtrl.CurrentState()
require.NoError(t, err)
// tunnel is still open, so it's a diff
initRecvUp.Kind = tailnet.Diff

require.Equal(t, initRecvUp, state)

// Send update that removes w1a1 and adds w1a2
Expand Down
77 changes: 64 additions & 13 deletions vpn/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func NewTunnel(
netLoopDone: make(chan struct{}),
uSendCh: s.sendCh,
agents: map[uuid.UUID]tailnet.Agent{},
workspaces: map[uuid.UUID]tailnet.Workspace{},
clock: quartz.NewReal(),
},
}
Expand Down Expand Up @@ -347,7 +348,9 @@ type updater struct {
uSendCh chan<- *TunnelMessage
// agents contains the agents that are currently connected to the tunnel.
agents map[uuid.UUID]tailnet.Agent
conn Conn
// workspaces contains the workspaces to which agents are currently connected via the tunnel.
workspaces map[uuid.UUID]tailnet.Workspace
conn Conn

clock quartz.Clock
}
Expand Down Expand Up @@ -397,14 +400,32 @@ func (u *updater) sendUpdateResponse(req *request[*TunnelMessage, *ManagerMessag
// createPeerUpdateLocked creates a PeerUpdate message from a workspace update, populating
// the network status of the agents.
func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUpdate {
// if the update is a snapshot, we need to process the full state
if update.Kind == tailnet.Snapshot {
processSnapshotUpdate(&update, u.agents, u.workspaces)
}

out := &PeerUpdate{
UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)),
UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)),
DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)),
DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
}

u.saveUpdateLocked(update)
// save the workspace update to the tunnel's state, such that it can
// be used to populate automated peer updates.
for _, agent := range update.UpsertedAgents {
u.agents[agent.ID] = agent.Clone()
}
for _, agent := range update.DeletedAgents {
delete(u.agents, agent.ID)
}
for _, workspace := range update.UpsertedWorkspaces {
u.workspaces[workspace.ID] = workspace.Clone()
}
for _, workspace := range update.DeletedWorkspaces {
delete(u.workspaces, workspace.ID)
}

for i, ws := range update.UpsertedWorkspaces {
out.UpsertedWorkspaces[i] = &Workspace{
Expand All @@ -413,6 +434,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
Status: Workspace_Status(ws.Status),
}
}

upsertedAgents := u.convertAgentsLocked(update.UpsertedAgents)
out.UpsertedAgents = upsertedAgents
for i, ws := range update.DeletedWorkspaces {
Expand Down Expand Up @@ -472,17 +494,6 @@ func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent {
return out
}

// saveUpdateLocked saves the workspace update to the tunnel's state, such that it can
// be used to populate automated peer updates.
func (u *updater) saveUpdateLocked(update tailnet.WorkspaceUpdate) {
for _, agent := range update.UpsertedAgents {
u.agents[agent.ID] = agent.Clone()
}
for _, agent := range update.DeletedAgents {
delete(u.agents, agent.ID)
}
}

// setConn sets the `conn` and returns false if there's already a connection set.
func (u *updater) setConn(conn Conn) bool {
u.mu.Lock()
Expand Down Expand Up @@ -552,6 +563,46 @@ func (u *updater) netStatusLoop() {
}
}

// processSnapshotUpdate handles the logic when a full state update is received.
// When the tunnel is live, we only receive diffs, but the first packet on any given
// reconnect to the tailnet API is a full state.
// Without this logic we weren't processing deletes for any workspaces or agents deleted
// while the client was disconnected while the computer was asleep.
func processSnapshotUpdate(update *tailnet.WorkspaceUpdate, agents map[uuid.UUID]tailnet.Agent, workspaces map[uuid.UUID]tailnet.Workspace) {
// ignoredWorkspaces is initially populated with the workspaces that are
// in the current update. Later on we populate it with the deleted workspaces too
// so that we don't send duplicate updates. Same applies to ignoredAgents.
ignoredWorkspaces := make(map[uuid.UUID]struct{}, len(update.UpsertedWorkspaces))
ignoredAgents := make(map[uuid.UUID]struct{}, len(update.UpsertedAgents))

for _, workspace := range update.UpsertedWorkspaces {
ignoredWorkspaces[workspace.ID] = struct{}{}
}
for _, agent := range update.UpsertedAgents {
ignoredAgents[agent.ID] = struct{}{}
}
for _, agent := range agents {
if _, present := ignoredAgents[agent.ID]; !present {
// delete any current agents that are not in the new update
update.DeletedAgents = append(update.DeletedAgents, &tailnet.Agent{
ID: agent.ID,
Name: agent.Name,
WorkspaceID: agent.WorkspaceID,
})
}
}
for _, workspace := range workspaces {
if _, present := ignoredWorkspaces[workspace.ID]; !present {
update.DeletedWorkspaces = append(update.DeletedWorkspaces, &tailnet.Workspace{
ID: workspace.ID,
Name: workspace.Name,
Status: workspace.Status,
})
ignoredWorkspaces[workspace.ID] = struct{}{}
}
}
}

// hostsToIPStrings returns a slice of all unique IP addresses in the values
// of the given map.
func hostsToIPStrings(hosts map[dnsname.FQDN][]netip.Addr) []string {
Expand Down
Loading