Skip to content

Commit a3721bb

Browse files
committed
feat: use tailnet v2 API for coordination
1 parent 5a2cf7c commit a3721bb

31 files changed

+1193
-1115
lines changed

Makefile

+6-1
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ gen: \
475475
site/.eslintignore \
476476
site/e2e/provisionerGenerated.ts \
477477
site/src/theme/icons.json \
478-
examples/examples.gen.json
478+
examples/examples.gen.json \
479+
tailnet/tailnettest/coordinatormock.go
479480
.PHONY: gen
480481

481482
# Mark all generated files as fresh so make thinks they're up-to-date. This is
@@ -502,6 +503,7 @@ gen/mark-fresh:
502503
site/e2e/provisionerGenerated.ts \
503504
site/src/theme/icons.json \
504505
examples/examples.gen.json \
506+
tailnet/tailnettest/coordinatormock.go \
505507
"
506508
for file in $$files; do
507509
echo "$$file"
@@ -529,6 +531,9 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $
529531
coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go
530532
go generate ./coderd/database/dbmock/
531533

534+
tailnet/tailnettest/coordinatormock.go: tailnet/coordinator.go
535+
go generate ./tailnet/tailnettest/
536+
532537
tailnet/proto/tailnet.pb.go: tailnet/proto/tailnet.proto
533538
protoc \
534539
--go_out=. \

agent/agent.go

+24-8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"golang.org/x/exp/slices"
3131
"golang.org/x/sync/errgroup"
3232
"golang.org/x/xerrors"
33+
"storj.io/drpc"
3334
"tailscale.com/net/speedtest"
3435
"tailscale.com/tailcfg"
3536
"tailscale.com/types/netlogtype"
@@ -47,6 +48,7 @@ import (
4748
"github.com/coder/coder/v2/codersdk"
4849
"github.com/coder/coder/v2/codersdk/agentsdk"
4950
"github.com/coder/coder/v2/tailnet"
51+
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
5052
)
5153

5254
const (
@@ -86,7 +88,7 @@ type Options struct {
8688

8789
type Client interface {
8890
Manifest(ctx context.Context) (agentsdk.Manifest, error)
89-
Listen(ctx context.Context) (net.Conn, error)
91+
Listen(ctx context.Context) (drpc.Conn, error)
9092
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
9193
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
9294
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
@@ -1058,20 +1060,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10581060
ctx, cancel := context.WithCancel(ctx)
10591061
defer cancel()
10601062

1061-
coordinator, err := a.client.Listen(ctx)
1063+
conn, err := a.client.Listen(ctx)
10621064
if err != nil {
10631065
return err
10641066
}
1065-
defer coordinator.Close()
1067+
defer func() {
1068+
cErr := conn.Close()
1069+
if cErr != nil {
1070+
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
1071+
}
1072+
}()
1073+
1074+
tClient := tailnetproto.NewDRPCTailnetClient(conn)
1075+
coordinate, err := tClient.Coordinate(ctx)
1076+
if err != nil {
1077+
return xerrors.Errorf("failed to connect to the coordinate endpoint: %w", err)
1078+
}
1079+
defer func() {
1080+
cErr := coordinate.Close()
1081+
if cErr != nil {
1082+
a.logger.Debug(ctx, "error closing Coordinate client", slog.Error(err))
1083+
}
1084+
}()
10661085
a.logger.Info(ctx, "connected to coordination endpoint")
1067-
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(nodes []*tailnet.Node) error {
1068-
return network.UpdateNodes(nodes, false)
1069-
})
1070-
network.SetNodeCallback(sendNodes)
1086+
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
10711087
select {
10721088
case <-ctx.Done():
10731089
return ctx.Err()
1074-
case err := <-errChan:
1090+
case err := <-coordination.Error():
10751091
return err
10761092
}
10771093
}

agent/agent_test.go

+48-37
Original file line numberDiff line numberDiff line change
@@ -1664,9 +1664,11 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16641664
require.NotNil(t, originalDerpMap)
16651665

16661666
coordinator := tailnet.NewCoordinator(logger)
1667-
defer func() {
1667+
// use t.Cleanup so the coordinator closing doesn't deadlock with in-memory
1668+
// coordination
1669+
t.Cleanup(func() {
16681670
_ = coordinator.Close()
1669-
}()
1671+
})
16701672
agentID := uuid.New()
16711673
statsCh := make(chan *agentsdk.Stats, 50)
16721674
fs := afero.NewMemMapFs()
@@ -1681,41 +1683,42 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16811683
statsCh,
16821684
coordinator,
16831685
)
1684-
closer := agent.New(agent.Options{
1686+
uut := agent.New(agent.Options{
16851687
Client: client,
16861688
Filesystem: fs,
16871689
Logger: logger.Named("agent"),
16881690
ReconnectingPTYTimeout: time.Minute,
16891691
})
1690-
defer func() {
1691-
_ = closer.Close()
1692-
}()
1692+
t.Cleanup(func() {
1693+
_ = uut.Close()
1694+
})
16931695

16941696
// Setup a client connection.
1695-
newClientConn := func(derpMap *tailcfg.DERPMap) *codersdk.WorkspaceAgentConn {
1697+
newClientConn := func(derpMap *tailcfg.DERPMap, name string) *codersdk.WorkspaceAgentConn {
16961698
conn, err := tailnet.NewConn(&tailnet.Options{
16971699
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
16981700
DERPMap: derpMap,
1699-
Logger: logger.Named("client"),
1701+
Logger: logger.Named(name),
17001702
})
17011703
require.NoError(t, err)
1702-
clientConn, serverConn := net.Pipe()
1703-
serveClientDone := make(chan struct{})
17041704
t.Cleanup(func() {
1705-
_ = clientConn.Close()
1706-
_ = serverConn.Close()
1705+
t.Logf("closing conn %s", name)
17071706
_ = conn.Close()
1708-
<-serveClientDone
17091707
})
1710-
go func() {
1711-
defer close(serveClientDone)
1712-
err := coordinator.ServeClient(serverConn, uuid.New(), agentID)
1713-
assert.NoError(t, err)
1714-
}()
1715-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
1716-
return conn.UpdateNodes(nodes, false)
1708+
testCtx, testCtxCancel := context.WithCancel(context.Background())
1709+
t.Cleanup(testCtxCancel)
1710+
clientID := uuid.New()
1711+
coordination := tailnet.NewInMemoryCoordination(
1712+
testCtx, logger,
1713+
clientID, agentID,
1714+
coordinator, conn)
1715+
t.Cleanup(func() {
1716+
t.Logf("closing coordination %s", name)
1717+
err := coordination.Close()
1718+
if err != nil {
1719+
t.Logf("error closing in-memory coordination: %s", err.Error())
1720+
}
17171721
})
1718-
conn.SetNodeCallback(sendNode)
17191722
// Force DERP.
17201723
conn.SetBlockEndpoints(true)
17211724

@@ -1724,6 +1727,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17241727
CloseFunc: func() error { return codersdk.ErrSkipClose },
17251728
})
17261729
t.Cleanup(func() {
1730+
t.Logf("closing sdkConn %s", name)
17271731
_ = sdkConn.Close()
17281732
})
17291733
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
@@ -1734,7 +1738,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17341738

17351739
return sdkConn
17361740
}
1737-
conn1 := newClientConn(originalDerpMap)
1741+
conn1 := newClientConn(originalDerpMap, "client1")
17381742

17391743
// Change the DERP map.
17401744
newDerpMap, _ := tailnettest.RunDERPAndSTUN(t)
@@ -1753,27 +1757,34 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17531757
DERPMap: newDerpMap,
17541758
})
17551759
require.NoError(t, err)
1760+
t.Logf("client Pushed DERPMap update")
17561761

17571762
require.Eventually(t, func() bool {
1758-
conn := closer.TailnetConn()
1763+
conn := uut.TailnetConn()
17591764
if conn == nil {
17601765
return false
17611766
}
17621767
regionIDs := conn.DERPMap().RegionIDs()
1763-
return len(regionIDs) == 1 && regionIDs[0] == 2 && conn.Node().PreferredDERP == 2
1768+
preferredDERP := conn.Node().PreferredDERP
1769+
t.Logf("agent Conn DERPMap with regionIDs %v, PreferredDERP %d", regionIDs, preferredDERP)
1770+
return len(regionIDs) == 1 && regionIDs[0] == 2 && preferredDERP == 2
17641771
}, testutil.WaitLong, testutil.IntervalFast)
1772+
t.Logf("agent got the new DERPMap")
17651773

17661774
// Connect from a second client and make sure it uses the new DERP map.
1767-
conn2 := newClientConn(newDerpMap)
1775+
conn2 := newClientConn(newDerpMap, "client2")
17681776
require.Equal(t, []int{2}, conn2.DERPMap().RegionIDs())
1777+
t.Log("conn2 got the new DERPMap")
17691778

17701779
// If the first client gets a DERP map update, it should be able to
17711780
// reconnect just fine.
17721781
conn1.SetDERPMap(newDerpMap)
17731782
require.Equal(t, []int{2}, conn1.DERPMap().RegionIDs())
1783+
t.Log("set the new DERPMap on conn1")
17741784
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
17751785
defer cancel()
17761786
require.True(t, conn1.AwaitReachable(ctx))
1787+
t.Log("conn1 reached agent with new DERP")
17771788
}
17781789

17791790
func TestAgent_Speedtest(t *testing.T) {
@@ -2050,22 +2061,22 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20502061
Logger: logger.Named("client"),
20512062
})
20522063
require.NoError(t, err)
2053-
clientConn, serverConn := net.Pipe()
2054-
serveClientDone := make(chan struct{})
20552064
t.Cleanup(func() {
2056-
_ = clientConn.Close()
2057-
_ = serverConn.Close()
20582065
_ = conn.Close()
2059-
<-serveClientDone
20602066
})
2061-
go func() {
2062-
defer close(serveClientDone)
2063-
coordinator.ServeClient(serverConn, uuid.New(), metadata.AgentID)
2064-
}()
2065-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
2066-
return conn.UpdateNodes(nodes, false)
2067+
testCtx, testCtxCancel := context.WithCancel(context.Background())
2068+
t.Cleanup(testCtxCancel)
2069+
clientID := uuid.New()
2070+
coordination := tailnet.NewInMemoryCoordination(
2071+
testCtx, logger,
2072+
clientID, metadata.AgentID,
2073+
coordinator, conn)
2074+
t.Cleanup(func() {
2075+
err := coordination.Close()
2076+
if err != nil {
2077+
t.Logf("error closing in-mem coordination: %s", err.Error())
2078+
}
20672079
})
2068-
conn.SetNodeCallback(sendNode)
20692080
agentConn := codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
20702081
AgentID: metadata.AgentID,
20712082
})

agent/agenttest/client.go

+47-9
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,26 @@ package agenttest
33
import (
44
"context"
55
"io"
6-
"net"
76
"sync"
7+
"sync/atomic"
88
"testing"
99
"time"
1010

1111
"github.com/google/uuid"
12+
"github.com/stretchr/testify/require"
1213
"golang.org/x/exp/maps"
1314
"golang.org/x/xerrors"
15+
"storj.io/drpc"
16+
"storj.io/drpc/drpcmux"
17+
"storj.io/drpc/drpcserver"
18+
"tailscale.com/tailcfg"
1419

1520
"cdr.dev/slog"
1621
"github.com/coder/coder/v2/codersdk"
1722
"github.com/coder/coder/v2/codersdk/agentsdk"
23+
drpcsdk "github.com/coder/coder/v2/codersdk/drpc"
1824
"github.com/coder/coder/v2/tailnet"
25+
"github.com/coder/coder/v2/tailnet/proto"
1926
"github.com/coder/coder/v2/testutil"
2027
)
2128

@@ -24,18 +31,39 @@ func NewClient(t testing.TB,
2431
agentID uuid.UUID,
2532
manifest agentsdk.Manifest,
2633
statsChan chan *agentsdk.Stats,
27-
coordinator tailnet.CoordinatorV1,
34+
coordinator tailnet.Coordinator,
2835
) *Client {
2936
if manifest.AgentID == uuid.Nil {
3037
manifest.AgentID = agentID
3138
}
39+
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
40+
coordPtr.Store(&coordinator)
41+
mux := drpcmux.New()
42+
drpcService := &tailnet.DRPCService{
43+
CoordPtr: &coordPtr,
44+
Logger: logger,
45+
// TODO: handle DERPMap too!
46+
DerpMapUpdateFrequency: time.Hour,
47+
DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
48+
}
49+
err := proto.DRPCRegisterTailnet(mux, drpcService)
50+
require.NoError(t, err)
51+
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
52+
Log: func(err error) {
53+
if xerrors.Is(err, io.EOF) {
54+
return
55+
}
56+
logger.Debug(context.Background(), "drpc server error", slog.Error(err))
57+
},
58+
})
3259
return &Client{
3360
t: t,
3461
logger: logger.Named("client"),
3562
agentID: agentID,
3663
manifest: manifest,
3764
statsChan: statsChan,
3865
coordinator: coordinator,
66+
server: server,
3967
derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
4068
}
4169
}
@@ -47,7 +75,8 @@ type Client struct {
4775
manifest agentsdk.Manifest
4876
metadata map[string]agentsdk.Metadata
4977
statsChan chan *agentsdk.Stats
50-
coordinator tailnet.CoordinatorV1
78+
coordinator tailnet.Coordinator
79+
server *drpcserver.Server
5180
LastWorkspaceAgent func()
5281
PatchWorkspaceLogs func() error
5382
GetServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
@@ -63,20 +92,29 @@ func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
6392
return c.manifest, nil
6493
}
6594

66-
func (c *Client) Listen(_ context.Context) (net.Conn, error) {
67-
clientConn, serverConn := net.Pipe()
95+
func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
96+
conn, lis := drpcsdk.MemTransportPipe()
6897
closed := make(chan struct{})
6998
c.LastWorkspaceAgent = func() {
70-
_ = serverConn.Close()
71-
_ = clientConn.Close()
99+
_ = conn.Close()
100+
_ = lis.Close()
72101
<-closed
73102
}
74103
c.t.Cleanup(c.LastWorkspaceAgent)
104+
serveCtx, cancel := context.WithCancel(context.Background())
105+
c.t.Cleanup(cancel)
106+
auth := tailnet.AgentTunnelAuth{}
107+
streamID := tailnet.StreamID{
108+
Name: "agenttest",
109+
ID: c.agentID,
110+
Auth: auth,
111+
}
112+
serveCtx = tailnet.WithStreamID(serveCtx, streamID)
75113
go func() {
76-
_ = c.coordinator.ServeAgent(serverConn, c.agentID, "")
114+
_ = c.server.Serve(serveCtx, lis)
77115
close(closed)
78116
}()
79-
return clientConn, nil
117+
return conn, nil
80118
}
81119

82120
func (c *Client) ReportStats(ctx context.Context, _ slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error) {

0 commit comments

Comments
 (0)