Skip to content

fix: agent disconnects from coordinator #7430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func (t *TrackedConn) Close() error {
return t.conn.Close()
}

// WriteTimeout is the amount of time we wait to write a node update to a connection before we declare it hung.
// It is exported so that tests can use it.
const WriteTimeout = time.Second * 5

// SendUpdates reads node updates and writes them to the connection. Ends when writes hit an error or context is
// canceled.
func (t *TrackedConn) SendUpdates() {
Expand All @@ -223,7 +227,7 @@ func (t *TrackedConn) SendUpdates() {

// Set a deadline so that hung connections don't put back pressure on the system.
// Node updates are tiny, so even the dinkiest connection can handle them if it's not hung.
err = t.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
err = t.conn.SetWriteDeadline(time.Now().Add(WriteTimeout))
if err != nil {
// often, this is just because the connection is closed/broken, so only log at debug.
t.logger.Debug(t.ctx, "unable to set write deadline", slog.Error(err))
Expand All @@ -238,6 +242,19 @@ func (t *TrackedConn) SendUpdates() {
return
}
t.logger.Debug(t.ctx, "wrote nodes", slog.F("nodes", nodes))

// nhooyr.io/websocket has a bugged implementation of deadlines on a websocket net.Conn. What they are
// *supposed* to do is set a deadline for any subsequent writes to complete, otherwise the call to Write()
// fails. What nhooyr.io/websocket does is set a timer, after which it expires the websocket write context.
// If this timer fires, then the next write will fail *even if we set a new write deadline*. So, after
// our successful write, it is important that we reset the deadline before it fires.
err = t.conn.SetWriteDeadline(time.Time{})
if err != nil {
// often, this is just because the connection is closed/broken, so only log at debug.
t.logger.Debug(t.ctx, "unable to extend write deadline", slog.Error(err))
_ = t.Close()
return
}
}
}
}
Expand Down
55 changes: 49 additions & 6 deletions tailnet/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package tailnet_test

import (
"context"
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

"nhooyr.io/websocket"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"

Expand Down Expand Up @@ -74,7 +79,10 @@ func TestCoordinator(t *testing.T) {
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
coordinator := tailnet.NewCoordinator(logger)

agentWS, agentServerWS := net.Pipe()
// in this test we use real websockets to test use of deadlines
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong)
defer cancel()
agentWS, agentServerWS := websocketConn(ctx, t)
defer agentWS.Close()
agentNodeChan := make(chan []*tailnet.Node)
sendAgentNode, agentErrChan := tailnet.ServeCoordinator(agentWS, func(nodes []*tailnet.Node) error {
Expand All @@ -93,7 +101,7 @@ func TestCoordinator(t *testing.T) {
return coordinator.Node(agentID) != nil
}, testutil.WaitShort, testutil.IntervalFast)

clientWS, clientServerWS := net.Pipe()
clientWS, clientServerWS := websocketConn(ctx, t)
defer clientWS.Close()
defer clientServerWS.Close()
clientNodeChan := make(chan []*tailnet.Node)
Expand All @@ -108,16 +116,28 @@ func TestCoordinator(t *testing.T) {
assert.NoError(t, err)
close(closeClientChan)
}()
agentNodes := <-clientNodeChan
require.Len(t, agentNodes, 1)
select {
case agentNodes := <-clientNodeChan:
require.Len(t, agentNodes, 1)
case <-ctx.Done():
t.Fatal("timed out")
}
sendClientNode(&tailnet.Node{})
clientNodes := <-agentNodeChan
require.Len(t, clientNodes, 1)

// wait longer than the internal wait timeout.
// this tests for regression of https://github.com/coder/coder/issues/7428
time.Sleep(tailnet.WriteTimeout * 3 / 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should shorten the write timeout for tests so that we don't add a long delay? Even though we're running tests concurrently, if we have a lot of these it'll end up impacting test times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I could.

I could make the write timeout an option, and plumb it through. However, when we run these tests in parallel, it can slow them down quite a bit, so I wouldn't want to set the timer too short, lest we get a flaky test under load. Maybe 1 second? 500 ms?

The current sleep is 7.5s, and I'd be able to shave it down to maybe 750ms, which doesn't really feel worth the trouble to me.

Copy link
Member

@mafredri mafredri May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid plumbing, how about something like:

var WriteTimeout = func() time.Duration {
	if inTest() {
		return 1 * time.Second
	}
	return 5 * time.Second
}()

?

Maybe that'd be testutil.InTest() or just an inline check in this package. I think that'd be fine for now, at some point we may want a better way to know all places where we may be modifying behavior for tests.

Perhaps not ideal since this would affect all tests. I'll leave this up to you.


// Ensure an update to the agent node reaches the client!
sendAgentNode(&tailnet.Node{})
agentNodes = <-clientNodeChan
require.Len(t, agentNodes, 1)
select {
case agentNodes := <-clientNodeChan:
require.Len(t, agentNodes, 1)
case <-ctx.Done():
t.Fatal("timed out")
}

// Close the agent WebSocket so a new one can connect.
err := agentWS.Close()
Expand Down Expand Up @@ -334,3 +354,26 @@ func TestCoordinator_AgentUpdateWhileClientConnects(t *testing.T) {
require.Len(t, cNodes, 1)
require.Equal(t, 1, cNodes[0].PreferredDERP)
}

func websocketConn(ctx context.Context, t *testing.T) (client net.Conn, server net.Conn) {
t.Helper()
sc := make(chan net.Conn, 1)
s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
wss, err := websocket.Accept(rw, r, nil)
require.NoError(t, err)
conn := websocket.NetConn(r.Context(), wss, websocket.MessageBinary)
sc <- conn
close(sc) // there can be only one

// hold open until context canceled
<-ctx.Done()
}))
t.Cleanup(s.Close)
// nolint: bodyclose
wsc, _, err := websocket.Dial(ctx, s.URL, nil)
require.NoError(t, err)
client = websocket.NetConn(ctx, wsc, websocket.MessageBinary)
server, ok := <-sc
require.True(t, ok)
return client, server
}