diff --git a/tailnet/configmaps.go b/tailnet/configmaps.go index 8b3aee1585e47..0a784eeab73dc 100644 --- a/tailnet/configmaps.go +++ b/tailnet/configmaps.go @@ -498,10 +498,14 @@ func (c *configMaps) setAllPeersLost() { lc.setLostTimer(c) // it's important to drop a log here so that we see it get marked lost if grepping thru // the logs for a specific peer + keyID := "(nil node)" + if lc.node != nil { + keyID = lc.node.Key.ShortString() + } c.logger.Debug(context.Background(), "setAllPeersLost marked peer lost", slog.F("peer_id", lc.peerID), - slog.F("key_id", lc.node.Key.ShortString()), + slog.F("key_id", keyID), ) } } diff --git a/tailnet/conn.go b/tailnet/conn.go index 1f4c543c933a1..8b82c455e4788 100644 --- a/tailnet/conn.go +++ b/tailnet/conn.go @@ -96,6 +96,9 @@ type Options struct { // CaptureHook is a callback that captures Disco packets and packets sent // into the tailnet tunnel. CaptureHook capture.Callback + // ForceNetworkUp forces the network to be considered up. magicsock will not + // do anything if it thinks it can't reach the internet. + ForceNetworkUp bool } // NodeID creates a Tailscale NodeID from the last 8 bytes of a UUID. It ensures @@ -175,6 +178,9 @@ func NewConn(options *Options) (conn *Conn, err error) { if options.DERPHeader != nil { magicConn.SetDERPHeader(options.DERPHeader.Clone()) } + if options.ForceNetworkUp { + magicConn.SetNetworkUp(true) + } if v, ok := os.LookupEnv(EnvMagicsockDebugLogging); ok { vBool, err := strconv.ParseBool(v) diff --git a/tailnet/test/integration/integration.go b/tailnet/test/integration/integration.go index 993142dd0e59f..f4d884b36c35a 100644 --- a/tailnet/test/integration/integration.go +++ b/tailnet/test/integration/integration.go @@ -1,67 +1,162 @@ +//go:build linux +// +build linux + package integration import ( "context" - "encoding/json" + "fmt" "io" "net/http" - "net/http/httptest" "net/netip" - "strings" + "net/url" + "strconv" "sync/atomic" "testing" "time" + "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/stretchr/testify/require" "golang.org/x/xerrors" "nhooyr.io/websocket" + "tailscale.com/derp" + "tailscale.com/derp/derphttp" "tailscale.com/tailcfg" + "tailscale.com/types/key" "cdr.dev/slog" "github.com/coder/coder/v2/coderd/httpapi" + "github.com/coder/coder/v2/coderd/httpmw" + "github.com/coder/coder/v2/coderd/tracing" "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/tailnet" - "github.com/coder/coder/v2/testutil" ) -func NetworkSetupDefault(*testing.T) {} +// IDs used in tests. +var ( + Client1ID = uuid.MustParse("00000000-0000-0000-0000-000000000001") + Client2ID = uuid.MustParse("00000000-0000-0000-0000-000000000002") +) -func DERPMapTailscale(ctx context.Context, t *testing.T) *tailcfg.DERPMap { - ctx, cancel := context.WithTimeout(ctx, testutil.WaitShort) - defer cancel() +type TestTopology struct { + Name string + // SetupNetworking creates interfaces and network namespaces for the test. + // The most simple implementation is NetworkSetupDefault, which only creates + // a network namespace shared for all tests. + SetupNetworking func(t *testing.T, logger slog.Logger) TestNetworking - req, err := http.NewRequestWithContext(ctx, "GET", "https://controlplane.tailscale.com/derpmap/default", nil) - require.NoError(t, err) + // StartServer gets called in the server subprocess. It's expected to start + // the coordinator server in the background and return. + StartServer func(t *testing.T, logger slog.Logger, listenAddr string) + // StartClient gets called in each client subprocess. It's expected to + // create the tailnet.Conn and ensure connectivity to it's peer. + StartClient func(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn - res, err := http.DefaultClient.Do(req) - require.NoError(t, err) - defer res.Body.Close() + // RunTests is the main test function. It's called in each of the client + // subprocesses. If tests can only run once, they should check the client ID + // and return early if it's not the expected one. + RunTests func(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID, conn *tailnet.Conn) +} - dm := &tailcfg.DERPMap{} - dec := json.NewDecoder(res.Body) - err = dec.Decode(dm) - require.NoError(t, err) +type TestNetworking struct { + // ServerListenAddr is the IP address and port that the server listens on, + // passed to StartServer. + ServerListenAddr string + // ServerAccessURLClient1 is the hostname and port that the first client + // uses to access the server. + ServerAccessURLClient1 string + // ServerAccessURLClient2 is the hostname and port that the second client + // uses to access the server. + ServerAccessURLClient2 string + + // Networking settings for each subprocess. + ProcessServer TestNetworkingProcess + ProcessClient1 TestNetworkingProcess + ProcessClient2 TestNetworkingProcess +} + +type TestNetworkingProcess struct { + // NetNS to enter. If zero, the current network namespace is used. + NetNSFd int +} - return dm +func SetupNetworkingLoopback(t *testing.T, _ slog.Logger) TestNetworking { + netNSName := "codertest_netns_" + randStr, err := cryptorand.String(4) + require.NoError(t, err, "generate random string for netns name") + netNSName += randStr + + // Create a single network namespace for all tests so we can have an + // isolated loopback interface. + netNSFile, err := createNetNS(netNSName) + require.NoError(t, err, "create network namespace") + t.Cleanup(func() { + _ = netNSFile.Close() + }) + + var ( + listenAddr = "127.0.0.1:8080" + process = TestNetworkingProcess{ + NetNSFd: int(netNSFile.Fd()), + } + ) + return TestNetworking{ + ServerListenAddr: listenAddr, + ServerAccessURLClient1: "http://" + listenAddr, + ServerAccessURLClient2: "http://" + listenAddr, + ProcessServer: process, + ProcessClient1: process, + ProcessClient2: process, + } } -func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) (coord tailnet.Coordinator, url string) { - coord = tailnet.NewCoordinator(logger) +func StartServerBasic(t *testing.T, logger slog.Logger, listenAddr string) { + coord := tailnet.NewCoordinator(logger) var coordPtr atomic.Pointer[tailnet.Coordinator] coordPtr.Store(&coord) t.Cleanup(func() { _ = coord.Close() }) csvc, err := tailnet.NewClientService(logger, &coordPtr, 10*time.Minute, func() *tailcfg.DERPMap { - return dm + return &tailcfg.DERPMap{ + // Clients will set their own based on their custom access URL. + Regions: map[int]*tailcfg.DERPRegion{}, + } }) require.NoError(t, err) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - idStr := strings.TrimPrefix(r.URL.Path, "/") + derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp"))) + derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpServer, derphttp.Handler(derpServer)) + t.Cleanup(derpCloseFunc) + + r := chi.NewRouter() + r.Use( + func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger.Debug(r.Context(), "start "+r.Method, slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr)) + next.ServeHTTP(w, r) + }) + }, + tracing.StatusWriterMiddleware, + httpmw.Logger(logger), + ) + r.Route("/derp", func(r chi.Router) { + r.Get("/", func(w http.ResponseWriter, r *http.Request) { + logger.Info(r.Context(), "start derp request", slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr)) + derpHandler.ServeHTTP(w, r) + }) + r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + }) + r.Get("/api/v2/workspaceagents/{id}/coordinate", func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + idStr := chi.URLParamFromCtx(ctx, "id") id, err := uuid.Parse(idStr) if err != nil { - httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{ + logger.Warn(ctx, "bad agent ID passed in URL params", slog.F("id_str", idStr), slog.Error(err)) + httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{ Message: "Bad agent id.", Detail: err.Error(), }) @@ -70,14 +165,15 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) conn, err := websocket.Accept(w, r, nil) if err != nil { - httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{ + logger.Warn(ctx, "failed to accept websocket", slog.Error(err)) + httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{ Message: "Failed to accept websocket.", Detail: err.Error(), }) return } - ctx, wsNetConn := codersdk.WebsocketNetConn(r.Context(), conn, websocket.MessageBinary) + ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary) defer wsNetConn.Close() err = csvc.ServeConnV2(ctx, wsNetConn, tailnet.StreamID{ @@ -86,43 +182,105 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) Auth: tailnet.SingleTailnetCoordinateeAuth{}, }) if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) { + logger.Warn(ctx, "failed to serve conn", slog.Error(err)) _ = conn.Close(websocket.StatusInternalError, err.Error()) return } - })) - t.Cleanup(srv.Close) + }) - return coord, srv.URL + // We have a custom listen address. + srv := http.Server{ + Addr: listenAddr, + Handler: r, + ReadTimeout: 10 * time.Second, + } + serveDone := make(chan struct{}) + go func() { + defer close(serveDone) + err := srv.ListenAndServe() + if err != nil && !xerrors.Is(err, http.ErrServerClosed) { + t.Error("HTTP server error:", err) + } + }() + t.Cleanup(func() { + _ = srv.Close() + <-serveDone + }) } -func TailnetSetupDRPC(ctx context.Context, t *testing.T, logger slog.Logger, - id, agentID uuid.UUID, - coordinateURL string, - dm *tailcfg.DERPMap, -) *tailnet.Conn { - ip := tailnet.IPFromUUID(id) - conn, err := tailnet.NewConn(&tailnet.Options{ - Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)}, - DERPMap: dm, - Logger: logger, - }) - require.NoError(t, err) - t.Cleanup(func() { _ = conn.Close() }) +func basicDERPMap(t *testing.T, serverURL *url.URL) *tailcfg.DERPMap { + portStr := serverURL.Port() + port, err := strconv.Atoi(portStr) + require.NoError(t, err, "parse server port") + + hostname := serverURL.Hostname() + ipv4 := "" + ip, err := netip.ParseAddr(hostname) + if err == nil { + hostname = "" + ipv4 = ip.String() + } + + return &tailcfg.DERPMap{ + Regions: map[int]*tailcfg.DERPRegion{ + 1: { + RegionID: 1, + RegionCode: "test", + RegionName: "test server", + Nodes: []*tailcfg.DERPNode{ + { + Name: "test0", + RegionID: 1, + HostName: hostname, + IPv4: ipv4, + IPv6: "none", + DERPPort: port, + ForceHTTP: true, + InsecureForTests: true, + }, + }, + }, + }, + } +} +func StartClientBasic(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn { + u, err := serverURL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/coordinate", myID.String())) + require.NoError(t, err) //nolint:bodyclose - ws, _, err := websocket.Dial(ctx, coordinateURL+"/"+id.String(), nil) + ws, _, err := websocket.Dial(context.Background(), u.String(), nil) require.NoError(t, err) + t.Cleanup(func() { + _ = ws.Close(websocket.StatusNormalClosure, "closing websocket") + }) client, err := tailnet.NewDRPCClient( - websocket.NetConn(ctx, ws, websocket.MessageBinary), + websocket.NetConn(context.Background(), ws, websocket.MessageBinary), logger, ) require.NoError(t, err) - coord, err := client.Coordinate(ctx) + coord, err := client.Coordinate(context.Background()) + require.NoError(t, err) + + conn, err := tailnet.NewConn(&tailnet.Options{ + Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IPFromUUID(myID), 128)}, + DERPMap: basicDERPMap(t, serverURL), + BlockEndpoints: true, + Logger: logger, + // These tests don't have internet connection, so we need to force + // magicsock to do anything. + ForceNetworkUp: true, + }) require.NoError(t, err) + t.Cleanup(func() { + _ = conn.Close() + }) + + coordination := tailnet.NewRemoteCoordination(logger, coord, conn, peerID) + t.Cleanup(func() { + _ = coordination.Close() + }) - coordination := tailnet.NewRemoteCoordination(logger, coord, conn, agentID) - t.Cleanup(func() { _ = coordination.Close() }) return conn } diff --git a/tailnet/test/integration/integration_internal_test.go b/tailnet/test/integration/integration_internal_test.go deleted file mode 100644 index 07ba30bd84738..0000000000000 --- a/tailnet/test/integration/integration_internal_test.go +++ /dev/null @@ -1,194 +0,0 @@ -package integration - -import ( - "context" - "flag" - "fmt" - "os" - "os/exec" - "strconv" - "syscall" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "tailscale.com/tailcfg" - - "cdr.dev/slog" - "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/v2/tailnet" - "github.com/coder/coder/v2/testutil" -) - -var ( - isChild = flag.Bool("child", false, "Run tests as a child") - childTestID = flag.Int("child-test-id", 0, "Which test is being run") - childCoordinateURL = flag.String("child-coordinate-url", "", "The coordinate url to connect back to") - childAgentID = flag.String("child-agent-id", "", "The agent id of the child") -) - -func TestMain(m *testing.M) { - if run := os.Getenv("CODER_TAILNET_TESTS"); run == "" { - _, _ = fmt.Println("skipping tests...") - return - } - if os.Getuid() != 0 { - _, _ = fmt.Println("networking integration tests must run as root") - return - } - flag.Parse() - os.Exit(m.Run()) -} - -var tests = []Test{{ - Name: "Normal", - DERPMap: DERPMapTailscale, - Coordinator: CoordinatorInMemory, - Parent: Parent{ - NetworkSetup: NetworkSetupDefault, - TailnetSetup: TailnetSetupDRPC, - Run: func(ctx context.Context, t *testing.T, opts ParentOpts) { - reach := opts.Conn.AwaitReachable(ctx, tailnet.IPFromUUID(opts.AgentID)) - assert.True(t, reach) - }, - }, - Child: Child{ - NetworkSetup: NetworkSetupDefault, - TailnetSetup: TailnetSetupDRPC, - Run: func(ctx context.Context, t *testing.T, opts ChildOpts) { - // wait until the parent kills us - <-make(chan struct{}) - }, - }, -}} - -//nolint:paralleltest -func TestIntegration(t *testing.T) { - if *isChild { - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong) - t.Cleanup(cancel) - - agentID, err := uuid.Parse(*childAgentID) - require.NoError(t, err) - - test := tests[*childTestID] - test.Child.NetworkSetup(t) - dm := test.DERPMap(ctx, t) - conn := test.Child.TailnetSetup(ctx, t, logger, agentID, uuid.Nil, *childCoordinateURL, dm) - test.Child.Run(ctx, t, ChildOpts{ - Logger: logger, - Conn: conn, - AgentID: agentID, - }) - return - } - - for id, test := range tests { - t.Run(test.Name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong) - t.Cleanup(cancel) - - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - parentID, childID := uuid.New(), uuid.New() - dm := test.DERPMap(ctx, t) - _, coordURL := test.Coordinator(t, logger, dm) - - child, waitChild := execChild(ctx, id, coordURL, childID) - test.Parent.NetworkSetup(t) - conn := test.Parent.TailnetSetup(ctx, t, logger, parentID, childID, coordURL, dm) - test.Parent.Run(ctx, t, ParentOpts{ - Logger: logger, - Conn: conn, - ClientID: parentID, - AgentID: childID, - }) - child.Process.Signal(syscall.SIGINT) - <-waitChild - }) - } -} - -type Test struct { - // Name is the name of the test. - Name string - - // DERPMap returns the DERP map to use for both the parent and child. It is - // called once at the beginning of the test. - DERPMap func(ctx context.Context, t *testing.T) *tailcfg.DERPMap - // Coordinator returns a running tailnet coordinator, and the url to reach - // it on. - Coordinator func(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) (coord tailnet.Coordinator, url string) - - Parent Parent - Child Child -} - -// Parent is the struct containing all of the parent specific configurations. -// Functions are invoked in order of struct definition. -type Parent struct { - // NetworkSetup is run before all test code. It can be used to setup - // networking scenarios. - NetworkSetup func(t *testing.T) - - // TailnetSetup creates a tailnet network. - TailnetSetup func( - ctx context.Context, t *testing.T, logger slog.Logger, - id, agentID uuid.UUID, coordURL string, dm *tailcfg.DERPMap, - ) *tailnet.Conn - - Run func(ctx context.Context, t *testing.T, opts ParentOpts) -} - -// Child is the struct containing all of the child specific configurations. -// Functions are invoked in order of struct definition. -type Child struct { - // NetworkSetup is run before all test code. It can be used to setup - // networking scenarios. - NetworkSetup func(t *testing.T) - - // TailnetSetup creates a tailnet network. - TailnetSetup func( - ctx context.Context, t *testing.T, logger slog.Logger, - id, agentID uuid.UUID, coordURL string, dm *tailcfg.DERPMap, - ) *tailnet.Conn - - // Run runs the actual test. Parents and children run in separate processes, - // so it's important to ensure no communication happens over memory between - // run functions of parents and children. - Run func(ctx context.Context, t *testing.T, opts ChildOpts) -} - -type ParentOpts struct { - Logger slog.Logger - Conn *tailnet.Conn - ClientID uuid.UUID - AgentID uuid.UUID -} - -type ChildOpts struct { - Logger slog.Logger - Conn *tailnet.Conn - AgentID uuid.UUID -} - -func execChild(ctx context.Context, testID int, coordURL string, agentID uuid.UUID) (*exec.Cmd, <-chan error) { - ch := make(chan error) - binary := os.Args[0] - args := os.Args[1:] - args = append(args, - "--child=true", - "--child-test-id="+strconv.Itoa(testID), - "--child-coordinate-url="+coordURL, - "--child-agent-id="+agentID.String(), - ) - - cmd := exec.CommandContext(ctx, binary, args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - go func() { - ch <- cmd.Run() - }() - return cmd, ch -} diff --git a/tailnet/test/integration/integration_test.go b/tailnet/test/integration/integration_test.go new file mode 100644 index 0000000000000..1678016c4af78 --- /dev/null +++ b/tailnet/test/integration/integration_test.go @@ -0,0 +1,295 @@ +//go:build linux +// +build linux + +package integration_test + +import ( + "flag" + "fmt" + "net/http" + "net/url" + "os" + "os/exec" + "os/signal" + "runtime" + "syscall" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/v2/tailnet" + "github.com/coder/coder/v2/tailnet/test/integration" + "github.com/coder/coder/v2/testutil" +) + +const runTestEnv = "CODER_TAILNET_TESTS" + +var ( + isSubprocess = flag.Bool("subprocess", false, "Signifies that this is a test subprocess") + testID = flag.String("test-name", "", "Which test is being run") + role = flag.String("role", "", "The role of the test subprocess: server, client") + + // Role: server + serverListenAddr = flag.String("server-listen-addr", "", "The address to listen on for the server") + + // Role: client + clientName = flag.String("client-name", "", "The name of the client for logs") + clientServerURL = flag.String("client-server-url", "", "The url to connect to the server") + clientMyID = flag.String("client-id", "", "The id of the client") + clientPeerID = flag.String("client-peer-id", "", "The id of the other client") + clientRunTests = flag.Bool("client-run-tests", false, "Run the tests in the client subprocess") +) + +func TestMain(m *testing.M) { + if run := os.Getenv(runTestEnv); run == "" { + _, _ = fmt.Printf("skipping tests as %q is not set...\n", runTestEnv) + return + } + if runtime.GOOS != "linux" { + _, _ = fmt.Printf("GOOS %q is not linux", runtime.GOOS) + os.Exit(1) + return + } + if os.Getuid() != 0 { + _, _ = fmt.Println("UID is not 0") + os.Exit(1) + return + } + + flag.Parse() + os.Exit(m.Run()) +} + +var topologies = []integration.TestTopology{ + { + Name: "BasicLoopback", + SetupNetworking: integration.SetupNetworkingLoopback, + StartServer: integration.StartServerBasic, + StartClient: integration.StartClientBasic, + RunTests: func(t *testing.T, log slog.Logger, serverURL *url.URL, myID, peerID uuid.UUID, conn *tailnet.Conn) { + // Test basic connectivity + peerIP := tailnet.IPFromUUID(peerID) + _, _, _, err := conn.Ping(testutil.Context(t, testutil.WaitLong), peerIP) + require.NoError(t, err, "ping peer") + }, + }, +} + +//nolint:paralleltest +func TestIntegration(t *testing.T) { + if *isSubprocess { + handleTestSubprocess(t) + return + } + + for _, topo := range topologies { + //nolint:paralleltest + t.Run(topo.Name, func(t *testing.T) { + log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + networking := topo.SetupNetworking(t, log) + + // Fork the three child processes. + serverErrCh, closeServer := startServerSubprocess(t, topo.Name, networking) + // client1 runs the tests. + client1ErrCh, _ := startClientSubprocess(t, topo.Name, networking, 1) + client2ErrCh, closeClient2 := startClientSubprocess(t, topo.Name, networking, 2) + + // Wait for client1 to exit. + require.NoError(t, <-client1ErrCh) + + // Close client2 and the server. + closeClient2() + require.NoError(t, <-client2ErrCh) + closeServer() + require.NoError(t, <-serverErrCh) + }) + } +} + +func handleTestSubprocess(t *testing.T) { + // Find the specific topology. + var topo integration.TestTopology + for _, t := range topologies { + if t.Name == *testID { + topo = t + break + } + } + require.NotEmptyf(t, topo.Name, "unknown test topology %q", *testID) + + testName := topo.Name + "/" + if *role == "server" { + testName += "server" + } else { + testName += *clientName + } + + //nolint:parralleltest + t.Run(testName, func(t *testing.T) { + log := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + switch *role { + case "server": + log = log.Named("server") + topo.StartServer(t, log, *serverListenAddr) + // no exit + + case "client": + log = log.Named(*clientName) + serverURL, err := url.Parse(*clientServerURL) + require.NoErrorf(t, err, "parse server url %q", *clientServerURL) + myID, err := uuid.Parse(*clientMyID) + require.NoErrorf(t, err, "parse client id %q", *clientMyID) + peerID, err := uuid.Parse(*clientPeerID) + require.NoErrorf(t, err, "parse peer id %q", *clientPeerID) + + waitForServerAvailable(t, serverURL) + + conn := topo.StartClient(t, log, serverURL, myID, peerID) + + if *clientRunTests { + topo.RunTests(t, log, serverURL, myID, peerID, conn) + // and exit + return + } + } + + // Wait for signals. + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + <-signals + }) +} + +func waitForServerAvailable(t *testing.T, serverURL *url.URL) { + const delay = 100 * time.Millisecond + const reqTimeout = 2 * time.Second + const timeout = 30 * time.Second + client := http.Client{ + Timeout: reqTimeout, + } + + u, err := url.Parse(serverURL.String() + "/derp/latency-check") + require.NoError(t, err) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(delay) { + //nolint:noctx + resp, err := client.Get(u.String()) + if err != nil { + t.Logf("waiting for server to be available: %v", err) + continue + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Logf("waiting for server to be available: got status %d", resp.StatusCode) + continue + } + return + } + + t.Fatalf("server did not become available after %v", timeout) +} + +func startServerSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking) (<-chan error, func()) { + return startSubprocess(t, networking.ProcessServer.NetNSFd, []string{ + "--subprocess", + "--test-name=" + topologyName, + "--role=server", + "--server-listen-addr=" + networking.ServerListenAddr, + }) +} + +func startClientSubprocess(t *testing.T, topologyName string, networking integration.TestNetworking, clientNumber int) (<-chan error, func()) { + require.True(t, clientNumber == 1 || clientNumber == 2) + + var ( + clientName = fmt.Sprintf("client%d", clientNumber) + myID = integration.Client1ID + peerID = integration.Client2ID + accessURL = networking.ServerAccessURLClient1 + ) + if clientNumber == 2 { + myID, peerID = peerID, myID + accessURL = networking.ServerAccessURLClient2 + } + + flags := []string{ + "--subprocess", + "--test-name=" + topologyName, + "--role=client", + "--client-name=" + clientName, + "--client-server-url=" + accessURL, + "--client-id=" + myID.String(), + "--client-peer-id=" + peerID.String(), + } + if clientNumber == 1 { + flags = append(flags, "--client-run-tests") + } + + return startSubprocess(t, networking.ProcessClient1.NetNSFd, flags) +} + +func startSubprocess(t *testing.T, netNSFd int, flags []string) (<-chan error, func()) { + name := os.Args[0] + args := append(os.Args[1:], flags...) + + if netNSFd > 0 { + // We use nsenter to enter the namespace. + // We can't use `setns` easily from Golang in the parent process because + // you can't execute the syscall in the forked child thread before it + // execs. + // We can't use `setns` easily from Golang in the child process because + // by the time you call it, the process has already created multiple + // threads. + args = append([]string{"--net=/proc/self/fd/3", name}, args...) + name = "nsenter" + } + + cmd := exec.Command(name, args...) + if netNSFd > 0 { + cmd.ExtraFiles = []*os.File{os.NewFile(uintptr(netNSFd), "")} + } + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.SysProcAttr = &syscall.SysProcAttr{ + Pdeathsig: syscall.SIGTERM, + } + err := cmd.Start() + require.NoError(t, err) + + waitErr := make(chan error, 1) + go func() { + err := cmd.Wait() + waitErr <- err + close(waitErr) + }() + + closeFn := func() { + _ = cmd.Process.Signal(syscall.SIGTERM) + select { + case <-time.After(5 * time.Second): + _ = cmd.Process.Kill() + case <-waitErr: + return + } + <-waitErr + } + + t.Cleanup(func() { + select { + case err := <-waitErr: + if err != nil { + t.Logf("subprocess exited: " + err.Error()) + } + return + default: + } + + closeFn() + }) + + return waitErr, closeFn +} diff --git a/tailnet/test/integration/network.go b/tailnet/test/integration/network.go new file mode 100644 index 0000000000000..95d68ca8e7d6b --- /dev/null +++ b/tailnet/test/integration/network.go @@ -0,0 +1,55 @@ +//go:build linux +// +build linux + +package integration + +import ( + "fmt" + "os" + "os/exec" + + "golang.org/x/xerrors" +) + +// createNetNS creates a new network namespace with the given name. The returned +// file is a file descriptor to the network namespace. +func createNetNS(name string) (*os.File, error) { + // We use ip-netns here because it handles the process of creating a + // disowned netns for us. + // The only way to create a network namespace is by calling unshare(2) or + // clone(2) with the CLONE_NEWNET flag, and as soon as the last process in a + // network namespace exits, the namespace is destroyed. + // However, if you create a bind mount of /proc/$PID/ns/net to a file, it + // will keep the namespace alive until the mount is removed. + // ip-netns does this for us. Without it, we would have to fork anyways. + // Later, we will use nsenter to enter this network namespace. + err := exec.Command("ip", "netns", "add", name).Run() + if err != nil { + return nil, xerrors.Errorf("create network namespace via ip-netns: %w", err) + } + + // Open /run/netns/$name to get a file descriptor to the network namespace + // so it stays active after we soft-delete it. + path := fmt.Sprintf("/run/netns/%s", name) + file, err := os.OpenFile(path, os.O_RDONLY, 0) + if err != nil { + return nil, xerrors.Errorf("open network namespace file %q: %w", path, err) + } + + // Exec "ip link set lo up" in the namespace to bring up loopback + // networking. + //nolint:gosec + err = exec.Command("ip", "netns", "exec", name, "ip", "link", "set", "lo", "up").Run() + if err != nil { + return nil, xerrors.Errorf("bring up loopback interface in network namespace: %w", err) + } + + // Remove the network namespace. The kernel will keep it around until the + // file descriptor is closed. + err = exec.Command("ip", "netns", "delete", name).Run() + if err != nil { + return nil, xerrors.Errorf("soft delete network namespace via ip-netns: %w", err) + } + + return file, nil +}