From 352786229b52aa9333f9985f6b8ae329ada379a2 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 11 Jun 2024 07:09:22 +0000 Subject: [PATCH 01/11] derp server restart test --- tailnet/test/integration/integration.go | 9 +++++++ tailnet/test/integration/suite.go | 33 ++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/tailnet/test/integration/integration.go b/tailnet/test/integration/integration.go index 3877542c8eafc..bf97136e1c689 100644 --- a/tailnet/test/integration/integration.go +++ b/tailnet/test/integration/integration.go @@ -171,6 +171,15 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) + r.Post("/restart", func(w http.ResponseWriter, r *http.Request) { + logger.Info(r.Context(), "killing DERP server") + derpServer.Close() + derpServer = derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp"))) + derpHandler, derpCloseFunc = tailnet.WithWebsocketSupport(derpServer, derphttp.Handler(derpServer)) + t.Cleanup(derpCloseFunc) + logger.Info(r.Context(), "restarted DERP server") + w.WriteHeader(http.StatusOK) + }) }) r.Get("/api/v2/workspaceagents/{id}/coordinate", func(w http.ResponseWriter, r *http.Request) { diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index 32d9adb2e4a14..dab6c63cfe6d9 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -4,8 +4,10 @@ package integration import ( + "net/http" "net/url" "testing" + "time" "github.com/stretchr/testify/require" @@ -14,9 +16,28 @@ import ( "github.com/coder/coder/v2/testutil" ) +func restartDERP(t *testing.T, serverURL *url.URL) { + const reqTimeout = 2 * time.Second + + serverURL, err := url.Parse(serverURL.String() + "/derp/restart") + require.NoError(t, err) + + client := http.Client{ + Timeout: reqTimeout, + } + + resp, err := client.Post(serverURL.String(), "text/plain", nil) + require.NoError(t, err) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %d", resp.StatusCode) + } +} + // TODO: instead of reusing one conn for each suite, maybe we should make a new // one for each subtest? -func TestSuite(t *testing.T, _ slog.Logger, _ *url.URL, conn *tailnet.Conn, _, peer Client) { +func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Conn, _, peer Client) { t.Parallel() t.Run("Connectivity", func(t *testing.T) { @@ -26,5 +47,15 @@ func TestSuite(t *testing.T, _ slog.Logger, _ *url.URL, conn *tailnet.Conn, _, p require.NoError(t, err, "ping peer") }) + t.Run("RestartDERP", func(t *testing.T) { + peerIP := tailnet.IPFromUUID(peer.ID) + _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) + require.NoError(t, err, "ping peer") + restartDERP(t, serverURL) + peerIP = tailnet.IPFromUUID(peer.ID) + _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) + require.NoError(t, err, "ping peer") + }) + // TODO: more } From 5c1c70e5d982ca424e78e7dab3497548e48abd96 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 11 Jun 2024 07:11:47 +0000 Subject: [PATCH 02/11] fix --- tailnet/test/integration/suite.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index dab6c63cfe6d9..e5d855b6e7725 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -52,9 +52,8 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") restartDERP(t, serverURL) - peerIP = tailnet.IPFromUUID(peer.ID) _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) - require.NoError(t, err, "ping peer") + require.NoError(t, err, "ping peer after derp restart") }) // TODO: more From 3fee5ed8049644c3f0583ce54edb1d52e50de37d Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 11 Jun 2024 08:32:58 +0000 Subject: [PATCH 03/11] server interface --- tailnet/test/integration/integration.go | 47 +++++++++++++++----- tailnet/test/integration/integration_test.go | 14 +++--- tailnet/test/integration/suite.go | 16 +++++-- 3 files changed, 55 insertions(+), 22 deletions(-) diff --git a/tailnet/test/integration/integration.go b/tailnet/test/integration/integration.go index bf97136e1c689..1e6b15f927613 100644 --- a/tailnet/test/integration/integration.go +++ b/tailnet/test/integration/integration.go @@ -78,7 +78,7 @@ type TestTopology struct { // Server is the server starter for the test. It is executed in the server // subprocess. - Server ServerStarter + Server Server // StartClient gets called in each client subprocess. It's expected to // create the tailnet.Conn and ensure connectivity to it's peer. StartClient func(t *testing.T, logger slog.Logger, serverURL *url.URL, derpMap *tailcfg.DERPMap, me Client, peer Client) *tailnet.Conn @@ -89,11 +89,13 @@ type TestTopology struct { RunTests func(t *testing.T, logger slog.Logger, serverURL *url.URL, conn *tailnet.Conn, me Client, peer Client) } -type ServerStarter interface { +type Server interface { // StartServer should start the server and return once it's listening. It // should not block once it's listening. Cleanup should be handled by // t.Cleanup. StartServer(t *testing.T, logger slog.Logger, listenAddr string) + // Must be called after StartServer. + Stop(t *testing.T) } type SimpleServerOptions struct { @@ -107,9 +109,15 @@ type SimpleServerOptions struct { // fallback, the test will fail. // Incompatible with FailUpgradeDERP. DERPWebsocketOnly bool + *http.Server } -var _ ServerStarter = SimpleServerOptions{} +var _ Server = &SimpleServerOptions{} + +func (s SimpleServerOptions) Stop(t *testing.T) { + err := s.Close() + require.NoError(t, err) +} //nolint:revive func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { @@ -182,6 +190,11 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { }) }) + r.Post("/restart", func(w http.ResponseWriter, r *http.Request) { + logger.Info(r.Context(), "restarting server") + w.WriteHeader(http.StatusOK) + }) + r.Get("/api/v2/workspaceagents/{id}/coordinate", func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() idStr := chi.URLParamFromCtx(ctx, "id") @@ -223,10 +236,10 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { return r } -func (o SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { +func (n *SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { srv := http.Server{ Addr: listenAddr, - Handler: o.Router(t, logger), + Handler: n.Router(t, logger), ReadTimeout: 10 * time.Second, } serveDone := make(chan struct{}) @@ -241,15 +254,23 @@ func (o SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, liste _ = srv.Close() <-serveDone }) + n.Server = &srv } type NGINXServerOptions struct { - SimpleServerOptions + inner SimpleServerOptions + closeFn func() error } -var _ ServerStarter = NGINXServerOptions{} +var _ Server = &NGINXServerOptions{} + +func (n NGINXServerOptions) Stop(t *testing.T) { + err := n.closeFn() + require.NoError(t, err) + n.inner.Stop(t) +} -func (o NGINXServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { +func (n *NGINXServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { host, nginxPortStr, err := net.SplitHostPort(listenAddr) require.NoError(t, err) @@ -259,11 +280,12 @@ func (o NGINXServerOptions) StartServer(t *testing.T, logger slog.Logger, listen serverPort := nginxPort + 1 serverListenAddr := net.JoinHostPort(host, strconv.Itoa(serverPort)) - o.SimpleServerOptions.StartServer(t, logger, serverListenAddr) - startNginx(t, nginxPortStr, serverListenAddr) + n.inner.StartServer(t, logger, serverListenAddr) + closeFn := startNginx(t, nginxPortStr, serverListenAddr) + n.closeFn = closeFn } -func startNginx(t *testing.T, listenPort, serverAddr string) { +func startNginx(t *testing.T, listenPort, serverAddr string) func() error { cfg := `events {} http { server { @@ -290,7 +312,8 @@ http { require.NoError(t, err) // ExecBackground will handle cleanup. - _, _ = ExecBackground(t, "server.nginx", nil, "nginx", []string{"-c", cfgPath}) + _, closeFn := ExecBackground(t, "server.nginx", nil, "nginx", []string{"-c", cfgPath}) + return closeFn } // StartClientDERP creates a client connection to the server for coordination diff --git a/tailnet/test/integration/integration_test.go b/tailnet/test/integration/integration_test.go index 142df60db0d5b..1cb6f33cd2418 100644 --- a/tailnet/test/integration/integration_test.go +++ b/tailnet/test/integration/integration_test.go @@ -79,7 +79,7 @@ var topologies = []integration.TestTopology{ // Test that DERP over loopback works. Name: "BasicLoopbackDERP", SetupNetworking: integration.SetupNetworkingLoopback, - Server: integration.SimpleServerOptions{}, + Server: &integration.SimpleServerOptions{}, StartClient: integration.StartClientDERP, RunTests: integration.TestSuite, }, @@ -89,7 +89,7 @@ var topologies = []integration.TestTopology{ // by a bridge. Name: "EasyNATDERP", SetupNetworking: integration.SetupNetworkingEasyNAT, - Server: integration.SimpleServerOptions{}, + Server: &integration.SimpleServerOptions{}, StartClient: integration.StartClientDERP, RunTests: integration.TestSuite, }, @@ -98,7 +98,7 @@ var topologies = []integration.TestTopology{ // STUN. Name: "EasyNATDirect", SetupNetworking: integration.SetupNetworkingEasyNATWithSTUN, - Server: integration.SimpleServerOptions{}, + Server: &integration.SimpleServerOptions{}, StartClient: integration.StartClientDirect, RunTests: integration.TestSuite, }, @@ -106,7 +106,7 @@ var topologies = []integration.TestTopology{ // Test that direct over hard NAT <=> easy NAT works. Name: "HardNATEasyNATDirect", SetupNetworking: integration.SetupNetworkingHardNATEasyNATDirect, - Server: integration.SimpleServerOptions{}, + Server: &integration.SimpleServerOptions{}, StartClient: integration.StartClientDirect, RunTests: integration.TestSuite, }, @@ -116,7 +116,7 @@ var topologies = []integration.TestTopology{ // automatic fallback. Name: "DERPForceWebSockets", SetupNetworking: integration.SetupNetworkingEasyNAT, - Server: integration.SimpleServerOptions{ + Server: &integration.SimpleServerOptions{ FailUpgradeDERP: false, DERPWebsocketOnly: true, }, @@ -127,7 +127,7 @@ var topologies = []integration.TestTopology{ // Test that falling back to DERP over WebSocket works. Name: "DERPFallbackWebSockets", SetupNetworking: integration.SetupNetworkingEasyNAT, - Server: integration.SimpleServerOptions{ + Server: &integration.SimpleServerOptions{ FailUpgradeDERP: true, DERPWebsocketOnly: false, }, @@ -138,7 +138,7 @@ var topologies = []integration.TestTopology{ { Name: "BasicLoopbackDERPNGINX", SetupNetworking: integration.SetupNetworkingLoopback, - Server: integration.NGINXServerOptions{}, + Server: &integration.NGINXServerOptions{}, StartClient: integration.StartClientDERP, RunTests: integration.TestSuite, }, diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index e5d855b6e7725..02add7222db68 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -16,16 +16,17 @@ import ( "github.com/coder/coder/v2/testutil" ) -func restartDERP(t *testing.T, serverURL *url.URL) { +func doRestart(t *testing.T, serverURL *url.URL, endpoint string) { const reqTimeout = 2 * time.Second - serverURL, err := url.Parse(serverURL.String() + "/derp/restart") + serverURL, err := url.Parse(serverURL.String() + endpoint) require.NoError(t, err) client := http.Client{ Timeout: reqTimeout, } + //nolint:noctx resp, err := client.Post(serverURL.String(), "text/plain", nil) require.NoError(t, err) defer resp.Body.Close() @@ -51,10 +52,19 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - restartDERP(t, serverURL) + doRestart(t, serverURL, "/derp/restart") _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after derp restart") }) + t.Run("Restart server", func(t *testing.T) { + peerIP := tailnet.IPFromUUID(peer.ID) + _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) + require.NoError(t, err, "ping peer") + doRestart(t, serverURL, "/restart") + _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) + require.NoError(t, err, "ping peer after server restart") + }) + // TODO: more } From 5e73d6375f1f5eaf2ee2ea52cd9ccaca01719a42 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 11 Jun 2024 08:37:17 +0000 Subject: [PATCH 04/11] WIP --- tailnet/test/integration/integration.go | 6 +++--- tailnet/test/integration/suite.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tailnet/test/integration/integration.go b/tailnet/test/integration/integration.go index 1e6b15f927613..dc912c32afdfe 100644 --- a/tailnet/test/integration/integration.go +++ b/tailnet/test/integration/integration.go @@ -236,10 +236,10 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { return r } -func (n *SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { +func (s *SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { srv := http.Server{ Addr: listenAddr, - Handler: n.Router(t, logger), + Handler: s.Router(t, logger), ReadTimeout: 10 * time.Second, } serveDone := make(chan struct{}) @@ -254,7 +254,7 @@ func (n *SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, list _ = srv.Close() <-serveDone }) - n.Server = &srv + s.Server = &srv } type NGINXServerOptions struct { diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index 02add7222db68..2d4b12ace5469 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -16,7 +16,7 @@ import ( "github.com/coder/coder/v2/testutil" ) -func doRestart(t *testing.T, serverURL *url.URL, endpoint string) { +func sendRestart(t *testing.T, serverURL *url.URL, endpoint string) { const reqTimeout = 2 * time.Second serverURL, err := url.Parse(serverURL.String() + endpoint) @@ -52,7 +52,7 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - doRestart(t, serverURL, "/derp/restart") + sendRestart(t, serverURL, "/derp/restart") _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after derp restart") }) @@ -61,7 +61,7 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - doRestart(t, serverURL, "/restart") + // TODO(ethan): restart here _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after server restart") }) From f0da669d3a97e342f68843b9f361dabac7777a07 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 11 Jun 2024 08:42:17 +0000 Subject: [PATCH 05/11] WIP --- tailnet/test/integration/suite.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index 2d4b12ace5469..03d9d6cd17545 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -16,10 +16,10 @@ import ( "github.com/coder/coder/v2/testutil" ) -func sendRestart(t *testing.T, serverURL *url.URL, endpoint string) { +func restartDERP(t *testing.T, serverURL *url.URL) { const reqTimeout = 2 * time.Second - serverURL, err := url.Parse(serverURL.String() + endpoint) + serverURL, err := url.Parse(serverURL.String() + "/derp/restart") require.NoError(t, err) client := http.Client{ @@ -52,7 +52,7 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - sendRestart(t, serverURL, "/derp/restart") + restartDERP(t, serverURL) _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after derp restart") }) From 8eea4eb76eae94104e6b3222ff168e2c1fbcfe46 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Tue, 11 Jun 2024 13:30:29 +0000 Subject: [PATCH 06/11] coordinator restart --- tailnet/test/integration/integration.go | 63 +++++++++++--------- tailnet/test/integration/integration_test.go | 14 ++--- tailnet/test/integration/suite.go | 12 ++-- 3 files changed, 47 insertions(+), 42 deletions(-) diff --git a/tailnet/test/integration/integration.go b/tailnet/test/integration/integration.go index dc912c32afdfe..51b391c5dcb56 100644 --- a/tailnet/test/integration/integration.go +++ b/tailnet/test/integration/integration.go @@ -78,7 +78,7 @@ type TestTopology struct { // Server is the server starter for the test. It is executed in the server // subprocess. - Server Server + Server ServerStarter // StartClient gets called in each client subprocess. It's expected to // create the tailnet.Conn and ensure connectivity to it's peer. StartClient func(t *testing.T, logger slog.Logger, serverURL *url.URL, derpMap *tailcfg.DERPMap, me Client, peer Client) *tailnet.Conn @@ -89,13 +89,11 @@ type TestTopology struct { RunTests func(t *testing.T, logger slog.Logger, serverURL *url.URL, conn *tailnet.Conn, me Client, peer Client) } -type Server interface { +type ServerStarter interface { // StartServer should start the server and return once it's listening. It // should not block once it's listening. Cleanup should be handled by // t.Cleanup. StartServer(t *testing.T, logger slog.Logger, listenAddr string) - // Must be called after StartServer. - Stop(t *testing.T) } type SimpleServerOptions struct { @@ -109,14 +107,13 @@ type SimpleServerOptions struct { // fallback, the test will fail. // Incompatible with FailUpgradeDERP. DERPWebsocketOnly bool - *http.Server } -var _ Server = &SimpleServerOptions{} +var _ ServerStarter = SimpleServerOptions{} -func (s SimpleServerOptions) Stop(t *testing.T) { - err := s.Close() - require.NoError(t, err) +type connManager struct { + sync.Mutex + conns map[uuid.UUID]net.Conn } //nolint:revive @@ -126,6 +123,11 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { coordPtr.Store(&coord) t.Cleanup(func() { _ = coord.Close() }) + cm := connManager{ + Mutex: sync.Mutex{}, + conns: make(map[uuid.UUID]net.Conn), + } + csvc, err := tailnet.NewClientService(logger, &coordPtr, 10*time.Minute, func() *tailcfg.DERPMap { return &tailcfg.DERPMap{ // Clients will set their own based on their custom access URL. @@ -191,7 +193,11 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { }) r.Post("/restart", func(w http.ResponseWriter, r *http.Request) { - logger.Info(r.Context(), "restarting server") + logger.Info(r.Context(), "simulating coordinator restart") + for _, c := range cm.conns { + _ = c.Close() + } + cm.conns = make(map[uuid.UUID]net.Conn) w.WriteHeader(http.StatusOK) }) @@ -221,6 +227,15 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) defer wsNetConn.Close() + cm.Lock() + cm.conns[id] = wsNetConn + cm.Unlock() + defer func() { + cm.Lock() + delete(cm.conns, id) + cm.Unlock() + }() + err = csvc.ServeConnV2(ctx, wsNetConn, tailnet.StreamID{ Name: "client-" + id.String(), ID: id, @@ -236,10 +251,10 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { return r } -func (s *SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { +func (o SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { srv := http.Server{ Addr: listenAddr, - Handler: s.Router(t, logger), + Handler: o.Router(t, logger), ReadTimeout: 10 * time.Second, } serveDone := make(chan struct{}) @@ -254,23 +269,15 @@ func (s *SimpleServerOptions) StartServer(t *testing.T, logger slog.Logger, list _ = srv.Close() <-serveDone }) - s.Server = &srv } type NGINXServerOptions struct { - inner SimpleServerOptions - closeFn func() error + SimpleServerOptions } -var _ Server = &NGINXServerOptions{} - -func (n NGINXServerOptions) Stop(t *testing.T) { - err := n.closeFn() - require.NoError(t, err) - n.inner.Stop(t) -} +var _ ServerStarter = NGINXServerOptions{} -func (n *NGINXServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { +func (o NGINXServerOptions) StartServer(t *testing.T, logger slog.Logger, listenAddr string) { host, nginxPortStr, err := net.SplitHostPort(listenAddr) require.NoError(t, err) @@ -280,12 +287,11 @@ func (n *NGINXServerOptions) StartServer(t *testing.T, logger slog.Logger, liste serverPort := nginxPort + 1 serverListenAddr := net.JoinHostPort(host, strconv.Itoa(serverPort)) - n.inner.StartServer(t, logger, serverListenAddr) - closeFn := startNginx(t, nginxPortStr, serverListenAddr) - n.closeFn = closeFn + o.SimpleServerOptions.StartServer(t, logger, serverListenAddr) + startNginx(t, nginxPortStr, serverListenAddr) } -func startNginx(t *testing.T, listenPort, serverAddr string) func() error { +func startNginx(t *testing.T, listenPort, serverAddr string) { cfg := `events {} http { server { @@ -312,8 +318,7 @@ http { require.NoError(t, err) // ExecBackground will handle cleanup. - _, closeFn := ExecBackground(t, "server.nginx", nil, "nginx", []string{"-c", cfgPath}) - return closeFn + _, _ = ExecBackground(t, "server.nginx", nil, "nginx", []string{"-c", cfgPath}) } // StartClientDERP creates a client connection to the server for coordination diff --git a/tailnet/test/integration/integration_test.go b/tailnet/test/integration/integration_test.go index 1cb6f33cd2418..142df60db0d5b 100644 --- a/tailnet/test/integration/integration_test.go +++ b/tailnet/test/integration/integration_test.go @@ -79,7 +79,7 @@ var topologies = []integration.TestTopology{ // Test that DERP over loopback works. Name: "BasicLoopbackDERP", SetupNetworking: integration.SetupNetworkingLoopback, - Server: &integration.SimpleServerOptions{}, + Server: integration.SimpleServerOptions{}, StartClient: integration.StartClientDERP, RunTests: integration.TestSuite, }, @@ -89,7 +89,7 @@ var topologies = []integration.TestTopology{ // by a bridge. Name: "EasyNATDERP", SetupNetworking: integration.SetupNetworkingEasyNAT, - Server: &integration.SimpleServerOptions{}, + Server: integration.SimpleServerOptions{}, StartClient: integration.StartClientDERP, RunTests: integration.TestSuite, }, @@ -98,7 +98,7 @@ var topologies = []integration.TestTopology{ // STUN. Name: "EasyNATDirect", SetupNetworking: integration.SetupNetworkingEasyNATWithSTUN, - Server: &integration.SimpleServerOptions{}, + Server: integration.SimpleServerOptions{}, StartClient: integration.StartClientDirect, RunTests: integration.TestSuite, }, @@ -106,7 +106,7 @@ var topologies = []integration.TestTopology{ // Test that direct over hard NAT <=> easy NAT works. Name: "HardNATEasyNATDirect", SetupNetworking: integration.SetupNetworkingHardNATEasyNATDirect, - Server: &integration.SimpleServerOptions{}, + Server: integration.SimpleServerOptions{}, StartClient: integration.StartClientDirect, RunTests: integration.TestSuite, }, @@ -116,7 +116,7 @@ var topologies = []integration.TestTopology{ // automatic fallback. Name: "DERPForceWebSockets", SetupNetworking: integration.SetupNetworkingEasyNAT, - Server: &integration.SimpleServerOptions{ + Server: integration.SimpleServerOptions{ FailUpgradeDERP: false, DERPWebsocketOnly: true, }, @@ -127,7 +127,7 @@ var topologies = []integration.TestTopology{ // Test that falling back to DERP over WebSocket works. Name: "DERPFallbackWebSockets", SetupNetworking: integration.SetupNetworkingEasyNAT, - Server: &integration.SimpleServerOptions{ + Server: integration.SimpleServerOptions{ FailUpgradeDERP: true, DERPWebsocketOnly: false, }, @@ -138,7 +138,7 @@ var topologies = []integration.TestTopology{ { Name: "BasicLoopbackDERPNGINX", SetupNetworking: integration.SetupNetworkingLoopback, - Server: &integration.NGINXServerOptions{}, + Server: integration.NGINXServerOptions{}, StartClient: integration.StartClientDERP, RunTests: integration.TestSuite, }, diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index 03d9d6cd17545..4ffc684d32147 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -16,10 +16,10 @@ import ( "github.com/coder/coder/v2/testutil" ) -func restartDERP(t *testing.T, serverURL *url.URL) { +func sendRestart(t *testing.T, serverURL *url.URL, endpoint string) { const reqTimeout = 2 * time.Second - serverURL, err := url.Parse(serverURL.String() + "/derp/restart") + serverURL, err := url.Parse(serverURL.String() + endpoint) require.NoError(t, err) client := http.Client{ @@ -52,18 +52,18 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - restartDERP(t, serverURL) + sendRestart(t, serverURL, "/derp/restart") _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after derp restart") }) - t.Run("Restart server", func(t *testing.T) { + t.Run("RestartCoordinator", func(t *testing.T) { peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - // TODO(ethan): restart here + sendRestart(t, serverURL, "/restart") _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) - require.NoError(t, err, "ping peer after server restart") + require.NoError(t, err, "ping peer after coordinator restart") }) // TODO: more From dde80f726585280caa34bea41d2e33e72db22fa0 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Wed, 12 Jun 2024 03:34:14 +0000 Subject: [PATCH 07/11] restart derp & coordinator --- tailnet/test/integration/suite.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index 4ffc684d32147..cb98e6df0ae0b 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -66,5 +66,13 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co require.NoError(t, err, "ping peer after coordinator restart") }) - // TODO: more + t.Run("RestartBoth", func(t *testing.T) { + peerIP := tailnet.IPFromUUID(peer.ID) + _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) + require.NoError(t, err, "ping peer") + sendRestart(t, serverURL, "/derp/restart") + sendRestart(t, serverURL, "/restart") + _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) + require.NoError(t, err, "ping peer after restart") + }) } From aa31cc087c8996d38e78f114cd218e28739de8ca Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Wed, 12 Jun 2024 08:07:14 +0000 Subject: [PATCH 08/11] impl'd review changes --- tailnet/test/integration/integration.go | 47 ++++++++++++++++--------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/tailnet/test/integration/integration.go b/tailnet/test/integration/integration.go index 51b391c5dcb56..1bce93ccf831a 100644 --- a/tailnet/test/integration/integration.go +++ b/tailnet/test/integration/integration.go @@ -116,6 +116,26 @@ type connManager struct { conns map[uuid.UUID]net.Conn } +func (c *connManager) Add(id uuid.UUID, conn net.Conn) func() { + c.Lock() + c.conns[id] = conn + c.Unlock() + return func() { + c.Lock() + delete(c.conns, id) + c.Unlock() + } +} + +func (c *connManager) Clear() { + c.Lock() + defer c.Unlock() + for _, conn := range c.conns { + _ = conn.Close() + } + c.conns = make(map[uuid.UUID]net.Conn) +} + //nolint:revive func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { coord := tailnet.NewCoordinator(logger) @@ -136,8 +156,10 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { }) require.NoError(t, err) - derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp"))) - derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpServer, derphttp.Handler(derpServer)) + derpSrv := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp"))) + derpServer := atomic.Pointer[derp.Server]{} + derpServer.Store(derpSrv) + derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpServer.Load(), derphttp.Handler(derpServer.Load())) t.Cleanup(derpCloseFunc) r := chi.NewRouter() @@ -183,10 +205,10 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { }) r.Post("/restart", func(w http.ResponseWriter, r *http.Request) { logger.Info(r.Context(), "killing DERP server") - derpServer.Close() - derpServer = derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp"))) - derpHandler, derpCloseFunc = tailnet.WithWebsocketSupport(derpServer, derphttp.Handler(derpServer)) + oldServer := derpServer.Swap(derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp")))) + derpHandler, derpCloseFunc = tailnet.WithWebsocketSupport(derpServer.Load(), derphttp.Handler(derpServer.Load())) t.Cleanup(derpCloseFunc) + oldServer.Close() logger.Info(r.Context(), "restarted DERP server") w.WriteHeader(http.StatusOK) }) @@ -194,10 +216,7 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { r.Post("/restart", func(w http.ResponseWriter, r *http.Request) { logger.Info(r.Context(), "simulating coordinator restart") - for _, c := range cm.conns { - _ = c.Close() - } - cm.conns = make(map[uuid.UUID]net.Conn) + cm.Clear() w.WriteHeader(http.StatusOK) }) @@ -227,14 +246,8 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) defer wsNetConn.Close() - cm.Lock() - cm.conns[id] = wsNetConn - cm.Unlock() - defer func() { - cm.Lock() - delete(cm.conns, id) - cm.Unlock() - }() + cleanFn := cm.Add(id, wsNetConn) + defer cleanFn() err = csvc.ServeConnV2(ctx, wsNetConn, tailnet.StreamID{ Name: "client-" + id.String(), From 286660fa1d632e6d4e189f08b4cfcc8004a27262 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Wed, 12 Jun 2024 15:27:59 +0000 Subject: [PATCH 09/11] impl'd review --- tailnet/test/integration/integration.go | 71 +++++++++++++++++-------- tailnet/test/integration/suite.go | 38 +++++++------ 2 files changed, 71 insertions(+), 38 deletions(-) diff --git a/tailnet/test/integration/integration.go b/tailnet/test/integration/integration.go index 1bce93ccf831a..938ed29e8d555 100644 --- a/tailnet/test/integration/integration.go +++ b/tailnet/test/integration/integration.go @@ -112,30 +112,52 @@ type SimpleServerOptions struct { var _ ServerStarter = SimpleServerOptions{} type connManager struct { - sync.Mutex + mu sync.Mutex conns map[uuid.UUID]net.Conn } func (c *connManager) Add(id uuid.UUID, conn net.Conn) func() { - c.Lock() + c.mu.Lock() + defer c.mu.Unlock() c.conns[id] = conn - c.Unlock() return func() { - c.Lock() + c.mu.Lock() + defer c.mu.Unlock() delete(c.conns, id) - c.Unlock() } } -func (c *connManager) Clear() { - c.Lock() - defer c.Unlock() +func (c *connManager) CloseAll() { + c.mu.Lock() + defer c.mu.Unlock() for _, conn := range c.conns { _ = conn.Close() } c.conns = make(map[uuid.UUID]net.Conn) } +type derpServer struct { + http.Handler + srv *derp.Server + closeFn func() +} + +func newDerpServer(t *testing.T, logger slog.Logger) *derpServer { + derpSrv := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp"))) + derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpSrv, derphttp.Handler(derpSrv)) + t.Cleanup(derpCloseFunc) + return &derpServer{ + srv: derpSrv, + Handler: derpHandler, + closeFn: derpCloseFunc, + } +} + +func (s *derpServer) Close() { + s.srv.Close() + s.closeFn() +} + //nolint:revive func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { coord := tailnet.NewCoordinator(logger) @@ -144,7 +166,6 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { t.Cleanup(func() { _ = coord.Close() }) cm := connManager{ - Mutex: sync.Mutex{}, conns: make(map[uuid.UUID]net.Conn), } @@ -156,11 +177,11 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { }) require.NoError(t, err) - derpSrv := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp"))) - derpServer := atomic.Pointer[derp.Server]{} - derpServer.Store(derpSrv) - derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpServer.Load(), derphttp.Handler(derpServer.Load())) - t.Cleanup(derpCloseFunc) + derpServer := atomic.Pointer[derpServer]{} + derpServer.Store(newDerpServer(t, logger)) + t.Cleanup(func() { + derpServer.Load().Close() + }) r := chi.NewRouter() r.Use( @@ -198,25 +219,31 @@ func (o SimpleServerOptions) Router(t *testing.T, logger slog.Logger) *chi.Mux { return } - derpHandler.ServeHTTP(w, r) + derpServer.Load().ServeHTTP(w, r) }) r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) r.Post("/restart", func(w http.ResponseWriter, r *http.Request) { - logger.Info(r.Context(), "killing DERP server") - oldServer := derpServer.Swap(derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp")))) - derpHandler, derpCloseFunc = tailnet.WithWebsocketSupport(derpServer.Load(), derphttp.Handler(derpServer.Load())) - t.Cleanup(derpCloseFunc) + oldServer := derpServer.Swap(newDerpServer(t, logger)) oldServer.Close() - logger.Info(r.Context(), "restarted DERP server") w.WriteHeader(http.StatusOK) }) }) + // /restart?derp=[true|false]&coordinator=[true|false] r.Post("/restart", func(w http.ResponseWriter, r *http.Request) { - logger.Info(r.Context(), "simulating coordinator restart") - cm.Clear() + if r.URL.Query().Get("derp") == "true" { + logger.Info(r.Context(), "killing DERP server") + oldServer := derpServer.Swap(newDerpServer(t, logger)) + oldServer.Close() + logger.Info(r.Context(), "restarted DERP server") + } + + if r.URL.Query().Get("coordinator") == "true" { + logger.Info(r.Context(), "simulating coordinator restart") + cm.CloseAll() + } w.WriteHeader(http.StatusOK) }) diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index cb98e6df0ae0b..5737a0400e8b2 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -16,24 +16,31 @@ import ( "github.com/coder/coder/v2/testutil" ) -func sendRestart(t *testing.T, serverURL *url.URL, endpoint string) { - const reqTimeout = 2 * time.Second +// nolint:revive +func sendRestart(t *testing.T, serverURL *url.URL, derp bool, coordinator bool) { + t.Helper() + ctx := testutil.Context(t, 2*time.Second) - serverURL, err := url.Parse(serverURL.String() + endpoint) - require.NoError(t, err) - - client := http.Client{ - Timeout: reqTimeout, + serverURL, err := url.Parse(serverURL.String() + "/restart") + if derp { + q := serverURL.Query() + q.Set("derp", "true") + serverURL.RawQuery = q.Encode() + } + if coordinator { + q := serverURL.Query() + q.Set("coordinator", "true") + serverURL.RawQuery = q.Encode() } + require.NoError(t, err) - //nolint:noctx - resp, err := client.Post(serverURL.String(), "text/plain", nil) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, serverURL.String(), nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) require.NoError(t, err) defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Fatalf("unexpected status code: %d", resp.StatusCode) - } + require.Equal(t, http.StatusOK, resp.StatusCode, "unexpected status code %d", resp.StatusCode) } // TODO: instead of reusing one conn for each suite, maybe we should make a new @@ -52,7 +59,7 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - sendRestart(t, serverURL, "/derp/restart") + sendRestart(t, serverURL, true, false) _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after derp restart") }) @@ -61,7 +68,7 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - sendRestart(t, serverURL, "/restart") + sendRestart(t, serverURL, false, true) _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after coordinator restart") }) @@ -70,8 +77,7 @@ func TestSuite(t *testing.T, _ slog.Logger, serverURL *url.URL, conn *tailnet.Co peerIP := tailnet.IPFromUUID(peer.ID) _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer") - sendRestart(t, serverURL, "/derp/restart") - sendRestart(t, serverURL, "/restart") + sendRestart(t, serverURL, true, true) _, _, _, err = conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) require.NoError(t, err, "ping peer after restart") }) From 023e0a73f007b68e55dc34b87f12d94cf0209785 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Wed, 12 Jun 2024 15:42:37 +0000 Subject: [PATCH 10/11] cleaner query strings --- tailnet/test/integration/suite.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index 5737a0400e8b2..479f380749101 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -22,16 +22,15 @@ func sendRestart(t *testing.T, serverURL *url.URL, derp bool, coordinator bool) ctx := testutil.Context(t, 2*time.Second) serverURL, err := url.Parse(serverURL.String() + "/restart") + q := serverURL.Query() if derp { - q := serverURL.Query() q.Set("derp", "true") - serverURL.RawQuery = q.Encode() } if coordinator { q := serverURL.Query() q.Set("coordinator", "true") - serverURL.RawQuery = q.Encode() } + serverURL.RawQuery = q.Encode() require.NoError(t, err) req, err := http.NewRequestWithContext(ctx, http.MethodPost, serverURL.String(), nil) From 6872ba157a2311fff893a468cace2ec50790a73e Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Wed, 12 Jun 2024 15:45:20 +0000 Subject: [PATCH 11/11] fix --- tailnet/test/integration/suite.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tailnet/test/integration/suite.go b/tailnet/test/integration/suite.go index 479f380749101..e3403da32b359 100644 --- a/tailnet/test/integration/suite.go +++ b/tailnet/test/integration/suite.go @@ -27,7 +27,6 @@ func sendRestart(t *testing.T, serverURL *url.URL, derp bool, coordinator bool) q.Set("derp", "true") } if coordinator { - q := serverURL.Query() q.Set("coordinator", "true") } serverURL.RawQuery = q.Encode()