Skip to content

Commit 5e8f97d

Browse files
authored
chore: add DERP websocket integration tests (#13168)
- `DERPForceWebSockets`: Test that DERP over WebSocket (as well as DERPForceWebSockets works). This does not test the actual DERP failure detection code and automatic fallback. - `DERPFallbackWebSockets`: Test that falling back to DERP over WebSocket works. Also: - Rearranges some test code and refactors `TestTopology.StartServer` to be `TestTopology.ServerOptions` and take a struct instead of a function Closes #13045
1 parent b56c9c4 commit 5e8f97d

File tree

3 files changed

+160
-58
lines changed

3 files changed

+160
-58
lines changed

tailnet/test/integration/integration.go

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/netip"
1212
"net/url"
1313
"strconv"
14+
"strings"
1415
"sync/atomic"
1516
"testing"
1617
"time"
@@ -39,8 +40,21 @@ var (
3940
Client2ID = uuid.MustParse("00000000-0000-0000-0000-000000000002")
4041
)
4142

42-
// StartServerBasic creates a coordinator and DERP server.
43-
func StartServerBasic(t *testing.T, logger slog.Logger, listenAddr string) {
43+
type ServerOptions struct {
44+
// FailUpgradeDERP will make the DERP server fail to handle the initial DERP
45+
// upgrade in a way that causes the client to fallback to
46+
// DERP-over-WebSocket fallback automatically.
47+
// Incompatible with DERPWebsocketOnly.
48+
FailUpgradeDERP bool
49+
// DERPWebsocketOnly will make the DERP server only accept WebSocket
50+
// connections. If a DERP request is received that is not using WebSocket
51+
// fallback, the test will fail.
52+
// Incompatible with FailUpgradeDERP.
53+
DERPWebsocketOnly bool
54+
}
55+
56+
//nolint:revive
57+
func (o ServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux {
4458
coord := tailnet.NewCoordinator(logger)
4559
var coordPtr atomic.Pointer[tailnet.Coordinator]
4660
coordPtr.Store(&coord)
@@ -69,15 +83,38 @@ func StartServerBasic(t *testing.T, logger slog.Logger, listenAddr string) {
6983
tracing.StatusWriterMiddleware,
7084
httpmw.Logger(logger),
7185
)
86+
7287
r.Route("/derp", func(r chi.Router) {
7388
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
7489
logger.Info(r.Context(), "start derp request", slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
90+
91+
upgrade := strings.ToLower(r.Header.Get("Upgrade"))
92+
if upgrade != "derp" && upgrade != "websocket" {
93+
http.Error(w, "invalid DERP upgrade header", http.StatusBadRequest)
94+
t.Errorf("invalid DERP upgrade header: %s", upgrade)
95+
return
96+
}
97+
98+
if o.FailUpgradeDERP && upgrade == "derp" {
99+
// 4xx status codes will cause the client to fallback to
100+
// DERP-over-WebSocket.
101+
http.Error(w, "test derp upgrade failure", http.StatusBadRequest)
102+
return
103+
}
104+
if o.DERPWebsocketOnly && upgrade != "websocket" {
105+
logger.Error(r.Context(), "non-websocket DERP request received", slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
106+
http.Error(w, "non-websocket DERP request received", http.StatusBadRequest)
107+
t.Error("non-websocket DERP request received")
108+
return
109+
}
110+
75111
derpHandler.ServeHTTP(w, r)
76112
})
77113
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
78114
w.WriteHeader(http.StatusOK)
79115
})
80116
})
117+
81118
r.Get("/api/v2/workspaceagents/{id}/coordinate", func(w http.ResponseWriter, r *http.Request) {
82119
ctx := r.Context()
83120
idStr := chi.URLParamFromCtx(ctx, "id")
@@ -116,28 +153,44 @@ func StartServerBasic(t *testing.T, logger slog.Logger, listenAddr string) {
116153
}
117154
})
118155

119-
// We have a custom listen address.
120-
srv := http.Server{
121-
Addr: listenAddr,
122-
Handler: r,
123-
ReadTimeout: 10 * time.Second,
124-
}
125-
serveDone := make(chan struct{})
126-
go func() {
127-
defer close(serveDone)
128-
err := srv.ListenAndServe()
129-
if err != nil && !xerrors.Is(err, http.ErrServerClosed) {
130-
t.Error("HTTP server error:", err)
131-
}
132-
}()
133-
t.Cleanup(func() {
134-
_ = srv.Close()
135-
<-serveDone
156+
return r
157+
}
158+
159+
// StartClientDERP creates a client connection to the server for coordination
160+
// and creates a tailnet.Conn which will only use DERP to connect to the peer.
161+
func StartClientDERP(t *testing.T, logger slog.Logger, serverURL *url.URL, myID, peerID uuid.UUID) *tailnet.Conn {
162+
return startClientOptions(t, logger, serverURL, myID, peerID, &tailnet.Options{
163+
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IPFromUUID(myID), 128)},
164+
DERPMap: basicDERPMap(t, serverURL),
165+
BlockEndpoints: true,
166+
Logger: logger,
167+
DERPForceWebSockets: false,
168+
// These tests don't have internet connection, so we need to force
169+
// magicsock to do anything.
170+
ForceNetworkUp: true,
171+
})
172+
}
173+
174+
// StartClientDERPWebSockets does the same thing as StartClientDERP but will
175+
// only use DERP WebSocket fallback.
176+
func StartClientDERPWebSockets(t *testing.T, logger slog.Logger, serverURL *url.URL, myID, peerID uuid.UUID) *tailnet.Conn {
177+
return startClientOptions(t, logger, serverURL, myID, peerID, &tailnet.Options{
178+
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IPFromUUID(myID), 128)},
179+
DERPMap: basicDERPMap(t, serverURL),
180+
BlockEndpoints: true,
181+
Logger: logger,
182+
DERPForceWebSockets: true,
183+
// These tests don't have internet connection, so we need to force
184+
// magicsock to do anything.
185+
ForceNetworkUp: true,
136186
})
137187
}
138188

139-
// StartClientBasic creates a client connection to the server.
140-
func StartClientBasic(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn {
189+
type ClientStarter struct {
190+
Options *tailnet.Options
191+
}
192+
193+
func startClientOptions(t *testing.T, logger slog.Logger, serverURL *url.URL, myID, peerID uuid.UUID, options *tailnet.Options) *tailnet.Conn {
141194
u, err := serverURL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/coordinate", myID.String()))
142195
require.NoError(t, err)
143196
//nolint:bodyclose
@@ -156,15 +209,7 @@ func StartClientBasic(t *testing.T, logger slog.Logger, serverURL *url.URL, myID
156209
coord, err := client.Coordinate(context.Background())
157210
require.NoError(t, err)
158211

159-
conn, err := tailnet.NewConn(&tailnet.Options{
160-
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IPFromUUID(myID), 128)},
161-
DERPMap: basicDERPMap(t, serverURL),
162-
BlockEndpoints: true,
163-
Logger: logger,
164-
// These tests don't have internet connection, so we need to force
165-
// magicsock to do anything.
166-
ForceNetworkUp: true,
167-
})
212+
conn, err := tailnet.NewConn(options)
168213
require.NoError(t, err)
169214
t.Cleanup(func() {
170215
_ = conn.Close()

tailnet/test/integration/integration_test.go

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/google/uuid"
2222
"github.com/stretchr/testify/require"
23+
"golang.org/x/xerrors"
2324

2425
"cdr.dev/slog"
2526
"cdr.dev/slog/sloggers/slogtest"
@@ -68,19 +69,48 @@ func TestMain(m *testing.M) {
6869

6970
var topologies = []integration.TestTopology{
7071
{
72+
// Test that DERP over loopback works.
7173
Name: "BasicLoopbackDERP",
7274
SetupNetworking: integration.SetupNetworkingLoopback,
73-
StartServer: integration.StartServerBasic,
74-
StartClient: integration.StartClientBasic,
75+
ServerOptions: integration.ServerOptions{},
76+
StartClient: integration.StartClientDERP,
7577
RunTests: integration.TestSuite,
7678
},
7779
{
80+
// Test that DERP over "easy" NAT works. The server, client 1 and client
81+
// 2 are on different networks with a shared router, and the router
82+
// masquerades the traffic.
7883
Name: "EasyNATDERP",
7984
SetupNetworking: integration.SetupNetworkingEasyNAT,
80-
StartServer: integration.StartServerBasic,
81-
StartClient: integration.StartClientBasic,
85+
ServerOptions: integration.ServerOptions{},
86+
StartClient: integration.StartClientDERP,
8287
RunTests: integration.TestSuite,
8388
},
89+
{
90+
// Test that DERP over WebSocket (as well as DERPForceWebSockets works).
91+
// This does not test the actual DERP failure detection code and
92+
// automatic fallback.
93+
Name: "DERPForceWebSockets",
94+
SetupNetworking: integration.SetupNetworkingEasyNAT,
95+
ServerOptions: integration.ServerOptions{
96+
FailUpgradeDERP: false,
97+
DERPWebsocketOnly: true,
98+
},
99+
StartClient: integration.StartClientDERPWebSockets,
100+
RunTests: integration.TestSuite,
101+
},
102+
{
103+
// Test that falling back to DERP over WebSocket works.
104+
Name: "DERPFallbackWebSockets",
105+
SetupNetworking: integration.SetupNetworkingEasyNAT,
106+
ServerOptions: integration.ServerOptions{
107+
FailUpgradeDERP: true,
108+
DERPWebsocketOnly: false,
109+
},
110+
// Use a basic client that will try `Upgrade: derp` first.
111+
StartClient: integration.StartClientDERP,
112+
RunTests: integration.TestSuite,
113+
},
84114
}
85115

86116
//nolint:paralleltest,tparallel
@@ -101,19 +131,17 @@ func TestIntegration(t *testing.T) {
101131
networking := topo.SetupNetworking(t, log)
102132

103133
// Fork the three child processes.
104-
serverErrCh, closeServer := startServerSubprocess(t, topo.Name, networking)
134+
closeServer := startServerSubprocess(t, topo.Name, networking)
105135
// client1 runs the tests.
106136
client1ErrCh, _ := startClientSubprocess(t, topo.Name, networking, 1)
107-
client2ErrCh, closeClient2 := startClientSubprocess(t, topo.Name, networking, 2)
137+
_, closeClient2 := startClientSubprocess(t, topo.Name, networking, 2)
108138

109139
// Wait for client1 to exit.
110140
require.NoError(t, <-client1ErrCh, "client 1 exited")
111141

112142
// Close client2 and the server.
113-
closeClient2()
114-
require.NoError(t, <-client2ErrCh, "client 2 exited")
115-
closeServer()
116-
require.NoError(t, <-serverErrCh, "server exited")
143+
require.NoError(t, closeClient2(), "client 2 exited")
144+
require.NoError(t, closeServer(), "server exited")
117145
})
118146
}
119147
}
@@ -138,15 +166,32 @@ func handleTestSubprocess(t *testing.T) {
138166

139167
//nolint:parralleltest
140168
t.Run(testName, func(t *testing.T) {
141-
log := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
169+
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
142170
switch *role {
143171
case "server":
144-
log = log.Named("server")
145-
topo.StartServer(t, log, *serverListenAddr)
172+
logger = logger.Named("server")
173+
174+
srv := http.Server{
175+
Addr: *serverListenAddr,
176+
Handler: topo.ServerOptions.Router(t, logger),
177+
ReadTimeout: 10 * time.Second,
178+
}
179+
serveDone := make(chan struct{})
180+
go func() {
181+
defer close(serveDone)
182+
err := srv.ListenAndServe()
183+
if err != nil && !xerrors.Is(err, http.ErrServerClosed) {
184+
t.Error("HTTP server error:", err)
185+
}
186+
}()
187+
t.Cleanup(func() {
188+
_ = srv.Close()
189+
<-serveDone
190+
})
146191
// no exit
147192

148193
case "client":
149-
log = log.Named(*clientName)
194+
logger = logger.Named(*clientName)
150195
serverURL, err := url.Parse(*clientServerURL)
151196
require.NoErrorf(t, err, "parse server url %q", *clientServerURL)
152197
myID, err := uuid.Parse(*clientMyID)
@@ -156,7 +201,7 @@ func handleTestSubprocess(t *testing.T) {
156201

157202
waitForServerAvailable(t, serverURL)
158203

159-
conn := topo.StartClient(t, log, serverURL, myID, peerID)
204+
conn := topo.StartClient(t, logger, serverURL, myID, peerID)
160205

161206
if *clientRunTests {
162207
// Wait for connectivity.
@@ -165,7 +210,7 @@ func handleTestSubprocess(t *testing.T) {
165210
t.Fatalf("peer %v did not become reachable", peerIP)
166211
}
167212

168-
topo.RunTests(t, log, serverURL, myID, peerID, conn)
213+
topo.RunTests(t, logger, serverURL, myID, peerID, conn)
169214
// then exit
170215
return
171216
}
@@ -206,16 +251,17 @@ func waitForServerAvailable(t *testing.T, serverURL *url.URL) {
206251
t.Fatalf("server did not become available after %v", timeout)
207252
}
208253

209-
func startServerSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking) (<-chan error, func()) {
210-
return startSubprocess(t, "server", networking.ProcessServer.NetNS, []string{
254+
func startServerSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking) func() error {
255+
_, closeFn := startSubprocess(t, "server", networking.ProcessServer.NetNS, []string{
211256
"--subprocess",
212257
"--test-name=" + topologyName,
213258
"--role=server",
214259
"--server-listen-addr=" + networking.ServerListenAddr,
215260
})
261+
return closeFn
216262
}
217263

218-
func startClientSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking, clientNumber int) (<-chan error, func()) {
264+
func startClientSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking, clientNumber int) (<-chan error, func() error) {
219265
require.True(t, clientNumber == 1 || clientNumber == 2)
220266

221267
var (
@@ -247,7 +293,13 @@ func startClientSubprocess(t *testing.T, topologyName string, networking integra
247293
return startSubprocess(t, clientName, netNS, flags)
248294
}
249295

250-
func startSubprocess(t *testing.T, processName string, netNS *os.File, flags []string) (<-chan error, func()) {
296+
// startSubprocess starts a subprocess with the given flags and returns a
297+
// channel that will receive the error when the subprocess exits. The returned
298+
// function can be used to close the subprocess.
299+
//
300+
// Do not call close then wait on the channel. Use the returned value from the
301+
// function instead in this case.
302+
func startSubprocess(t *testing.T, processName string, netNS *os.File, flags []string) (<-chan error, func() error) {
251303
name := os.Args[0]
252304
// Always use verbose mode since it gets piped to the parent test anyways.
253305
args := append(os.Args[1:], append([]string{"-test.v=true"}, flags...)...)
@@ -289,15 +341,15 @@ func startSubprocess(t *testing.T, processName string, netNS *os.File, flags []s
289341
close(waitErr)
290342
}()
291343

292-
closeFn := func() {
344+
closeFn := func() error {
293345
_ = cmd.Process.Signal(syscall.SIGTERM)
294346
select {
295347
case <-time.After(5 * time.Second):
296348
_ = cmd.Process.Kill()
297-
case <-waitErr:
298-
return
349+
case err := <-waitErr:
350+
return err
299351
}
300-
<-waitErr
352+
return <-waitErr
301353
}
302354

303355
t.Cleanup(func() {
@@ -310,7 +362,7 @@ func startSubprocess(t *testing.T, processName string, netNS *os.File, flags []s
310362
default:
311363
}
312364

313-
closeFn()
365+
_ = closeFn()
314366
})
315367

316368
return waitErr, closeFn
@@ -338,6 +390,11 @@ func (w *testWriter) Write(p []byte) (n int, err error) {
338390
// then it's a test result line. We want to capture it and log it later.
339391
trimmed := strings.TrimSpace(s)
340392
if strings.HasPrefix(trimmed, "--- PASS") || strings.HasPrefix(trimmed, "--- FAIL") || trimmed == "PASS" || trimmed == "FAIL" {
393+
// Also fail the test if we see a FAIL line.
394+
if strings.Contains(trimmed, "FAIL") {
395+
w.t.Errorf("subprocess logged test failure: %s: \t%s", w.name, s)
396+
}
397+
341398
w.capturedLines = append(w.capturedLines, s)
342399
continue
343400
}

tailnet/test/integration/network.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ type TestTopology struct {
2929
// a network namespace shared for all tests.
3030
SetupNetworking func(t *testing.T, logger slog.Logger) TestNetworking
3131

32-
// StartServer gets called in the server subprocess. It's expected to start
33-
// the coordinator server in the background and return.
34-
StartServer func(t *testing.T, logger slog.Logger, listenAddr string)
32+
// ServerOptions is the configuration for the server. It's passed to the
33+
// server process.
34+
ServerOptions ServerOptions
3535
// StartClient gets called in each client subprocess. It's expected to
3636
// create the tailnet.Conn and ensure connectivity to it's peer.
3737
StartClient func(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn

0 commit comments

Comments
 (0)