Skip to content

Commit 85bb13d

Browse files
committed
chore: do network integration tests in isolated net ns
1 parent 3ff9cef commit 85bb13d

File tree

6 files changed

+566
-242
lines changed

6 files changed

+566
-242
lines changed

tailnet/configmaps.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,10 +498,14 @@ func (c *configMaps) setAllPeersLost() {
498498
lc.setLostTimer(c)
499499
// it's important to drop a log here so that we see it get marked lost if grepping thru
500500
// the logs for a specific peer
501+
keyID := "(nil node)"
502+
if lc.node != nil {
503+
keyID = lc.node.Key.ShortString()
504+
}
501505
c.logger.Debug(context.Background(),
502506
"setAllPeersLost marked peer lost",
503507
slog.F("peer_id", lc.peerID),
504-
slog.F("key_id", lc.node.Key.ShortString()),
508+
slog.F("key_id", keyID),
505509
)
506510
}
507511
}

tailnet/conn.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ type Options struct {
9393
BlockEndpoints bool
9494
Logger slog.Logger
9595
ListenPort uint16
96+
// ForceNetworkUp forces the network to be considered up. magicsock will not
97+
// do anything if it thinks it can't reach the internet.
98+
ForceNetworkUp bool
9699
}
97100

98101
// NodeID creates a Tailscale NodeID from the last 8 bytes of a UUID. It ensures
@@ -171,6 +174,9 @@ func NewConn(options *Options) (conn *Conn, err error) {
171174
if options.DERPHeader != nil {
172175
magicConn.SetDERPHeader(options.DERPHeader.Clone())
173176
}
177+
if options.ForceNetworkUp {
178+
magicConn.SetNetworkUp(true)
179+
}
174180

175181
if v, ok := os.LookupEnv(EnvMagicsockDebugLogging); ok {
176182
vBool, err := strconv.ParseBool(v)
Lines changed: 205 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,162 @@
1+
//go:build linux
2+
// +build linux
3+
14
package integration
25

36
import (
47
"context"
5-
"encoding/json"
8+
"fmt"
69
"io"
710
"net/http"
8-
"net/http/httptest"
911
"net/netip"
10-
"strings"
12+
"net/url"
13+
"strconv"
1114
"sync/atomic"
1215
"testing"
1316
"time"
1417

18+
"github.com/go-chi/chi/v5"
1519
"github.com/google/uuid"
1620
"github.com/stretchr/testify/require"
1721
"golang.org/x/xerrors"
1822
"nhooyr.io/websocket"
23+
"tailscale.com/derp"
24+
"tailscale.com/derp/derphttp"
1925
"tailscale.com/tailcfg"
26+
"tailscale.com/types/key"
2027

2128
"cdr.dev/slog"
2229
"github.com/coder/coder/v2/coderd/httpapi"
30+
"github.com/coder/coder/v2/coderd/httpmw"
31+
"github.com/coder/coder/v2/coderd/tracing"
2332
"github.com/coder/coder/v2/codersdk"
33+
"github.com/coder/coder/v2/cryptorand"
2434
"github.com/coder/coder/v2/tailnet"
25-
"github.com/coder/coder/v2/testutil"
2635
)
2736

28-
func NetworkSetupDefault(*testing.T) {}
37+
// IDs used in tests.
38+
var (
39+
Client1ID = uuid.MustParse("00000000-0000-0000-0000-000000000001")
40+
Client2ID = uuid.MustParse("00000000-0000-0000-0000-000000000002")
41+
)
2942

30-
func DERPMapTailscale(ctx context.Context, t *testing.T) *tailcfg.DERPMap {
31-
ctx, cancel := context.WithTimeout(ctx, testutil.WaitShort)
32-
defer cancel()
43+
type TestTopology struct {
44+
Name string
45+
// SetupNetworking creates interfaces and network namespaces for the test.
46+
// The most simple implementation is NetworkSetupDefault, which only creates
47+
// a network namespace shared for all tests.
48+
SetupNetworking func(t *testing.T, log slog.Logger) TestNetworking
3349

34-
req, err := http.NewRequestWithContext(ctx, "GET", "https://controlplane.tailscale.com/derpmap/default", nil)
35-
require.NoError(t, err)
50+
// StartServer gets called in the server subprocess. It's expected to start
51+
// the coordinator server in the background and return.
52+
StartServer func(t *testing.T, log slog.Logger, listenAddr string)
53+
// StartClient gets called in each client subprocess. It's expected to
54+
// create the tailnet.Conn and ensure connectivity to it's peer.
55+
StartClient func(t *testing.T, log slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn
3656

37-
res, err := http.DefaultClient.Do(req)
38-
require.NoError(t, err)
39-
defer res.Body.Close()
57+
// RunTests is the main test function. It's called in each of the client
58+
// subprocesses. If tests can only run once, they should check the client ID
59+
// and return early if it's not the expected one.
60+
RunTests func(t *testing.T, log slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID, conn *tailnet.Conn)
61+
}
4062

41-
dm := &tailcfg.DERPMap{}
42-
dec := json.NewDecoder(res.Body)
43-
err = dec.Decode(dm)
44-
require.NoError(t, err)
63+
type TestNetworking struct {
64+
// ServerListenAddr is the IP address and port that the server listens on,
65+
// passed to StartServer.
66+
ServerListenAddr string
67+
// ServerAccessURLClient1 is the hostname and port that the first client
68+
// uses to access the server.
69+
ServerAccessURLClient1 string
70+
// ServerAccessURLClient2 is the hostname and port that the second client
71+
// uses to access the server.
72+
ServerAccessURLClient2 string
73+
74+
// Networking settings for each subprocess.
75+
ProcessServer TestNetworkingProcess
76+
ProcessClient1 TestNetworkingProcess
77+
ProcessClient2 TestNetworkingProcess
78+
}
79+
80+
type TestNetworkingProcess struct {
81+
// NetNS to enter. If zero, the current network namespace is used.
82+
NetNSFd int
83+
}
4584

46-
return dm
85+
func SetupNetworkingLoopback(t *testing.T, log slog.Logger) TestNetworking {
86+
netNSName := "codertest_netns_"
87+
randStr, err := cryptorand.String(4)
88+
require.NoError(t, err, "generate random string for netns name")
89+
netNSName += randStr
90+
91+
// Create a single network namespace for all tests so we can have an
92+
// isolated loopback interface.
93+
netNSFile, err := createNetNS(netNSName)
94+
require.NoError(t, err, "create network namespace")
95+
t.Cleanup(func() {
96+
_ = netNSFile.Close()
97+
})
98+
99+
var (
100+
listenAddr = "127.0.0.1:8080"
101+
process = TestNetworkingProcess{
102+
NetNSFd: int(netNSFile.Fd()),
103+
}
104+
)
105+
return TestNetworking{
106+
ServerListenAddr: listenAddr,
107+
ServerAccessURLClient1: "http://" + listenAddr,
108+
ServerAccessURLClient2: "http://" + listenAddr,
109+
ProcessServer: process,
110+
ProcessClient1: process,
111+
ProcessClient2: process,
112+
}
47113
}
48114

49-
func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap) (coord tailnet.Coordinator, url string) {
50-
coord = tailnet.NewCoordinator(logger)
115+
func StartServerBasic(t *testing.T, logger slog.Logger, listenAddr string) {
116+
coord := tailnet.NewCoordinator(logger)
51117
var coordPtr atomic.Pointer[tailnet.Coordinator]
52118
coordPtr.Store(&coord)
53119
t.Cleanup(func() { _ = coord.Close() })
54120

55121
csvc, err := tailnet.NewClientService(logger, &coordPtr, 10*time.Minute, func() *tailcfg.DERPMap {
56-
return dm
122+
return &tailcfg.DERPMap{
123+
// Clients will set their own based on their custom access URL.
124+
Regions: map[int]*tailcfg.DERPRegion{},
125+
}
57126
})
58127
require.NoError(t, err)
59128

60-
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
61-
idStr := strings.TrimPrefix(r.URL.Path, "/")
129+
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(logger.Named("derp")))
130+
derpHandler, derpCloseFunc := tailnet.WithWebsocketSupport(derpServer, derphttp.Handler(derpServer))
131+
t.Cleanup(derpCloseFunc)
132+
133+
r := chi.NewRouter()
134+
r.Use(
135+
func(next http.Handler) http.Handler {
136+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
137+
logger.Debug(r.Context(), "start "+r.Method, slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
138+
next.ServeHTTP(w, r)
139+
})
140+
},
141+
tracing.StatusWriterMiddleware,
142+
httpmw.Logger(logger),
143+
)
144+
r.Route("/derp", func(r chi.Router) {
145+
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
146+
logger.Info(r.Context(), "start derp request", slog.F("path", r.URL.Path), slog.F("remote_ip", r.RemoteAddr))
147+
derpHandler.ServeHTTP(w, r)
148+
})
149+
r.Get("/latency-check", func(w http.ResponseWriter, r *http.Request) {
150+
w.WriteHeader(http.StatusOK)
151+
})
152+
})
153+
r.Get("/api/v2/workspaceagents/{id}/coordinate", func(w http.ResponseWriter, r *http.Request) {
154+
ctx := r.Context()
155+
idStr := chi.URLParamFromCtx(ctx, "id")
62156
id, err := uuid.Parse(idStr)
63157
if err != nil {
64-
httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{
158+
logger.Warn(ctx, "bad agent ID passed in URL params", slog.F("id_str", idStr), slog.Error(err))
159+
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
65160
Message: "Bad agent id.",
66161
Detail: err.Error(),
67162
})
@@ -70,14 +165,15 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap)
70165

71166
conn, err := websocket.Accept(w, r, nil)
72167
if err != nil {
73-
httpapi.Write(r.Context(), w, http.StatusBadRequest, codersdk.Response{
168+
logger.Warn(ctx, "failed to accept websocket", slog.Error(err))
169+
httpapi.Write(ctx, w, http.StatusBadRequest, codersdk.Response{
74170
Message: "Failed to accept websocket.",
75171
Detail: err.Error(),
76172
})
77173
return
78174
}
79175

80-
ctx, wsNetConn := codersdk.WebsocketNetConn(r.Context(), conn, websocket.MessageBinary)
176+
ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageBinary)
81177
defer wsNetConn.Close()
82178

83179
err = csvc.ServeConnV2(ctx, wsNetConn, tailnet.StreamID{
@@ -86,43 +182,105 @@ func CoordinatorInMemory(t *testing.T, logger slog.Logger, dm *tailcfg.DERPMap)
86182
Auth: tailnet.SingleTailnetCoordinateeAuth{},
87183
})
88184
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
185+
logger.Warn(ctx, "failed to serve conn", slog.Error(err))
89186
_ = conn.Close(websocket.StatusInternalError, err.Error())
90187
return
91188
}
92-
}))
93-
t.Cleanup(srv.Close)
189+
})
94190

95-
return coord, srv.URL
191+
// We have a custom listen address.
192+
srv := http.Server{
193+
Addr: listenAddr,
194+
Handler: r,
195+
ReadTimeout: 10 * time.Second,
196+
}
197+
serveDone := make(chan struct{})
198+
go func() {
199+
defer close(serveDone)
200+
err := srv.ListenAndServe()
201+
if err != nil && !xerrors.Is(err, http.ErrServerClosed) {
202+
t.Error("HTTP server error:", err)
203+
}
204+
}()
205+
t.Cleanup(func() {
206+
_ = srv.Close()
207+
<-serveDone
208+
})
96209
}
97210

98-
func TailnetSetupDRPC(ctx context.Context, t *testing.T, logger slog.Logger,
99-
id, agentID uuid.UUID,
100-
coordinateURL string,
101-
dm *tailcfg.DERPMap,
102-
) *tailnet.Conn {
103-
ip := tailnet.IPFromUUID(id)
104-
conn, err := tailnet.NewConn(&tailnet.Options{
105-
Addresses: []netip.Prefix{netip.PrefixFrom(ip, 128)},
106-
DERPMap: dm,
107-
Logger: logger,
108-
})
109-
require.NoError(t, err)
110-
t.Cleanup(func() { _ = conn.Close() })
211+
func basicDERPMap(t *testing.T, serverURL *url.URL) *tailcfg.DERPMap {
212+
portStr := serverURL.Port()
213+
port, err := strconv.Atoi(portStr)
214+
require.NoError(t, err, "parse server port")
215+
216+
hostname := serverURL.Hostname()
217+
ipv4 := ""
218+
ip, err := netip.ParseAddr(hostname)
219+
if err == nil {
220+
hostname = ""
221+
ipv4 = ip.String()
222+
}
223+
224+
return &tailcfg.DERPMap{
225+
Regions: map[int]*tailcfg.DERPRegion{
226+
1: {
227+
RegionID: 1,
228+
RegionCode: "test",
229+
RegionName: "test server",
230+
Nodes: []*tailcfg.DERPNode{
231+
{
232+
Name: "test0",
233+
RegionID: 1,
234+
HostName: hostname,
235+
IPv4: ipv4,
236+
IPv6: "none",
237+
DERPPort: port,
238+
ForceHTTP: true,
239+
InsecureForTests: true,
240+
},
241+
},
242+
},
243+
},
244+
}
245+
}
111246

247+
func StartClientBasic(t *testing.T, logger slog.Logger, serverURL *url.URL, myID uuid.UUID, peerID uuid.UUID) *tailnet.Conn {
248+
u, err := serverURL.Parse(fmt.Sprintf("/api/v2/workspaceagents/%s/coordinate", myID.String()))
249+
require.NoError(t, err)
112250
//nolint:bodyclose
113-
ws, _, err := websocket.Dial(ctx, coordinateURL+"/"+id.String(), nil)
251+
ws, _, err := websocket.Dial(context.Background(), u.String(), nil)
114252
require.NoError(t, err)
253+
t.Cleanup(func() {
254+
_ = ws.Close(websocket.StatusNormalClosure, "closing websocket")
255+
})
115256

116257
client, err := tailnet.NewDRPCClient(
117-
websocket.NetConn(ctx, ws, websocket.MessageBinary),
258+
websocket.NetConn(context.Background(), ws, websocket.MessageBinary),
118259
logger,
119260
)
120261
require.NoError(t, err)
121262

122-
coord, err := client.Coordinate(ctx)
263+
coord, err := client.Coordinate(context.Background())
264+
require.NoError(t, err)
265+
266+
conn, err := tailnet.NewConn(&tailnet.Options{
267+
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IPFromUUID(myID), 128)},
268+
DERPMap: basicDERPMap(t, serverURL),
269+
BlockEndpoints: true,
270+
Logger: logger,
271+
// These tests don't have internet connection, so we need to force
272+
// magicsock to do anything.
273+
ForceNetworkUp: true,
274+
})
123275
require.NoError(t, err)
276+
t.Cleanup(func() {
277+
_ = conn.Close()
278+
})
279+
280+
coordination := tailnet.NewRemoteCoordination(logger, coord, conn, peerID)
281+
t.Cleanup(func() {
282+
_ = coordination.Close()
283+
})
124284

125-
coordination := tailnet.NewRemoteCoordination(logger, coord, conn, agentID)
126-
t.Cleanup(func() { _ = coordination.Close() })
127285
return conn
128286
}

0 commit comments

Comments
 (0)