Skip to content

Commit dc3d39b

Browse files
authored
fix: agent disconnects from coordinator (#7430)
* work around websocket deadline bug Signed-off-by: Spike Curtis <spike@coder.com> * Use test context to hold websocket open Signed-off-by: Spike Curtis <spike@coder.com> * Fix race creating test websocket Signed-off-by: Spike Curtis <spike@coder.com> * set write deadline to time.Time zero Signed-off-by: Spike Curtis <spike@coder.com> --------- Signed-off-by: Spike Curtis <spike@coder.com>
1 parent 5ffa6da commit dc3d39b

File tree

2 files changed

+67
-7
lines changed

2 files changed

+67
-7
lines changed

tailnet/coordinator.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ func (t *TrackedConn) Close() error {
206206
return t.conn.Close()
207207
}
208208

209+
// WriteTimeout is the amount of time we wait to write a node update to a connection before we declare it hung.
210+
// It is exported so that tests can use it.
211+
const WriteTimeout = time.Second * 5
212+
209213
// SendUpdates reads node updates and writes them to the connection. Ends when writes hit an error or context is
210214
// canceled.
211215
func (t *TrackedConn) SendUpdates() {
@@ -223,7 +227,7 @@ func (t *TrackedConn) SendUpdates() {
223227

224228
// Set a deadline so that hung connections don't put back pressure on the system.
225229
// Node updates are tiny, so even the dinkiest connection can handle them if it's not hung.
226-
err = t.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
230+
err = t.conn.SetWriteDeadline(time.Now().Add(WriteTimeout))
227231
if err != nil {
228232
// often, this is just because the connection is closed/broken, so only log at debug.
229233
t.logger.Debug(t.ctx, "unable to set write deadline", slog.Error(err))
@@ -238,6 +242,19 @@ func (t *TrackedConn) SendUpdates() {
238242
return
239243
}
240244
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", nodes))
245+
246+
// nhooyr.io/websocket has a bugged implementation of deadlines on a websocket net.Conn. What they are
247+
// *supposed* to do is set a deadline for any subsequent writes to complete, otherwise the call to Write()
248+
// fails. What nhooyr.io/websocket does is set a timer, after which it expires the websocket write context.
249+
// If this timer fires, then the next write will fail *even if we set a new write deadline*. So, after
250+
// our successful write, it is important that we reset the deadline before it fires.
251+
err = t.conn.SetWriteDeadline(time.Time{})
252+
if err != nil {
253+
// often, this is just because the connection is closed/broken, so only log at debug.
254+
t.logger.Debug(t.ctx, "unable to extend write deadline", slog.Error(err))
255+
_ = t.Close()
256+
return
257+
}
241258
}
242259
}
243260
}

tailnet/coordinator_test.go

+49-6
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package tailnet_test
22

33
import (
4+
"context"
45
"encoding/json"
56
"net"
7+
"net/http"
8+
"net/http/httptest"
69
"testing"
710
"time"
811

12+
"nhooyr.io/websocket"
13+
914
"cdr.dev/slog"
1015
"cdr.dev/slog/sloggers/slogtest"
1116

@@ -74,7 +79,10 @@ func TestCoordinator(t *testing.T) {
7479
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
7580
coordinator := tailnet.NewCoordinator(logger)
7681

77-
agentWS, agentServerWS := net.Pipe()
82+
// in this test we use real websockets to test use of deadlines
83+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
84+
defer cancel()
85+
agentWS, agentServerWS := websocketConn(ctx, t)
7886
defer agentWS.Close()
7987
agentNodeChan := make(chan []*tailnet.Node)
8088
sendAgentNode, agentErrChan := tailnet.ServeCoordinator(agentWS, func(nodes []*tailnet.Node) error {
@@ -93,7 +101,7 @@ func TestCoordinator(t *testing.T) {
93101
return coordinator.Node(agentID) != nil
94102
}, testutil.WaitShort, testutil.IntervalFast)
95103

96-
clientWS, clientServerWS := net.Pipe()
104+
clientWS, clientServerWS := websocketConn(ctx, t)
97105
defer clientWS.Close()
98106
defer clientServerWS.Close()
99107
clientNodeChan := make(chan []*tailnet.Node)
@@ -108,16 +116,28 @@ func TestCoordinator(t *testing.T) {
108116
assert.NoError(t, err)
109117
close(closeClientChan)
110118
}()
111-
agentNodes := <-clientNodeChan
112-
require.Len(t, agentNodes, 1)
119+
select {
120+
case agentNodes := <-clientNodeChan:
121+
require.Len(t, agentNodes, 1)
122+
case <-ctx.Done():
123+
t.Fatal("timed out")
124+
}
113125
sendClientNode(&tailnet.Node{})
114126
clientNodes := <-agentNodeChan
115127
require.Len(t, clientNodes, 1)
116128

129+
// wait longer than the internal wait timeout.
130+
// this tests for regression of https://github.com/coder/coder/issues/7428
131+
time.Sleep(tailnet.WriteTimeout * 3 / 2)
132+
117133
// Ensure an update to the agent node reaches the client!
118134
sendAgentNode(&tailnet.Node{})
119-
agentNodes = <-clientNodeChan
120-
require.Len(t, agentNodes, 1)
135+
select {
136+
case agentNodes := <-clientNodeChan:
137+
require.Len(t, agentNodes, 1)
138+
case <-ctx.Done():
139+
t.Fatal("timed out")
140+
}
121141

122142
// Close the agent WebSocket so a new one can connect.
123143
err := agentWS.Close()
@@ -334,3 +354,26 @@ func TestCoordinator_AgentUpdateWhileClientConnects(t *testing.T) {
334354
require.Len(t, cNodes, 1)
335355
require.Equal(t, 1, cNodes[0].PreferredDERP)
336356
}
357+
358+
func websocketConn(ctx context.Context, t *testing.T) (client net.Conn, server net.Conn) {
359+
t.Helper()
360+
sc := make(chan net.Conn, 1)
361+
s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
362+
wss, err := websocket.Accept(rw, r, nil)
363+
require.NoError(t, err)
364+
conn := websocket.NetConn(r.Context(), wss, websocket.MessageBinary)
365+
sc <- conn
366+
close(sc) // there can be only one
367+
368+
// hold open until context canceled
369+
<-ctx.Done()
370+
}))
371+
t.Cleanup(s.Close)
372+
// nolint: bodyclose
373+
wsc, _, err := websocket.Dial(ctx, s.URL, nil)
374+
require.NoError(t, err)
375+
client = websocket.NetConn(ctx, wsc, websocket.MessageBinary)
376+
server, ok := <-sc
377+
require.True(t, ok)
378+
return client, server
379+
}

0 commit comments

Comments
 (0)