Skip to content

Commit 80e59bd

Browse files
committed
feat: use tailnet v2 API for coordination
1 parent 4177202 commit 80e59bd

31 files changed

+1189
-1109
lines changed

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ gen: \
475475
site/.eslintignore \
476476
site/e2e/provisionerGenerated.ts \
477477
site/src/theme/icons.json \
478-
examples/examples.gen.json
478+
examples/examples.gen.json \
479+
tailnet/tailnettest/coordinatormock.go
479480
.PHONY: gen
480481

481482
# Mark all generated files as fresh so make thinks they're up-to-date. This is
@@ -502,6 +503,7 @@ gen/mark-fresh:
502503
site/e2e/provisionerGenerated.ts \
503504
site/src/theme/icons.json \
504505
examples/examples.gen.json \
506+
tailnet/tailnettest/coordinatormock.go \
505507
"
506508
for file in $$files; do
507509
echo "$$file"
@@ -529,6 +531,9 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $
529531
coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go
530532
go generate ./coderd/database/dbmock/
531533

534+
tailnet/tailnettest/coordinatormock.go: tailnet/coordinator.go
535+
go generate ./tailnet/tailnettest/
536+
532537
tailnet/proto/tailnet.pb.go: tailnet/proto/tailnet.proto
533538
protoc \
534539
--go_out=. \

agent/agent.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"time"
2424

25+
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
26+
2527
"github.com/go-chi/chi/v5"
2628
"github.com/google/uuid"
2729
"github.com/prometheus/client_golang/prometheus"
@@ -30,6 +32,7 @@ import (
3032
"golang.org/x/exp/slices"
3133
"golang.org/x/sync/errgroup"
3234
"golang.org/x/xerrors"
35+
"storj.io/drpc"
3336
"tailscale.com/net/speedtest"
3437
"tailscale.com/tailcfg"
3538
"tailscale.com/types/netlogtype"
@@ -86,7 +89,7 @@ type Options struct {
8689

8790
type Client interface {
8891
Manifest(ctx context.Context) (agentsdk.Manifest, error)
89-
Listen(ctx context.Context) (net.Conn, error)
92+
Listen(ctx context.Context) (drpc.Conn, error)
9093
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
9194
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
9295
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
@@ -1058,20 +1061,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10581061
ctx, cancel := context.WithCancel(ctx)
10591062
defer cancel()
10601063

1061-
coordinator, err := a.client.Listen(ctx)
1064+
conn, err := a.client.Listen(ctx)
10621065
if err != nil {
10631066
return err
10641067
}
1065-
defer coordinator.Close()
1068+
defer func() {
1069+
cErr := conn.Close()
1070+
if cErr != nil {
1071+
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
1072+
}
1073+
}()
1074+
1075+
tClient := tailnetproto.NewDRPCTailnetClient(conn)
1076+
coordinate, err := tClient.Coordinate(ctx)
1077+
if err != nil {
1078+
return xerrors.Errorf("failed to connect to the coordinate endpoint: %w", err)
1079+
}
1080+
defer func() {
1081+
cErr := coordinate.Close()
1082+
if cErr != nil {
1083+
a.logger.Debug(ctx, "error closing Coordinate client", slog.Error(err))
1084+
}
1085+
}()
10661086
a.logger.Info(ctx, "connected to coordination endpoint")
1067-
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(nodes []*tailnet.Node) error {
1068-
return network.UpdateNodes(nodes, false)
1069-
})
1070-
network.SetNodeCallback(sendNodes)
1087+
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
10711088
select {
10721089
case <-ctx.Done():
10731090
return ctx.Err()
1074-
case err := <-errChan:
1091+
case err := <-coordination.Error():
10751092
return err
10761093
}
10771094
}

agent/agent_test.go

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,9 +1664,11 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16641664
require.NotNil(t, originalDerpMap)
16651665

16661666
coordinator := tailnet.NewCoordinator(logger)
1667-
defer func() {
1667+
// use t.Cleanup so the coordinator closing doesn't deadlock with in-memory
1668+
// coordination
1669+
t.Cleanup(func() {
16681670
_ = coordinator.Close()
1669-
}()
1671+
})
16701672
agentID := uuid.New()
16711673
statsCh := make(chan *agentsdk.Stats, 50)
16721674
fs := afero.NewMemMapFs()
@@ -1681,41 +1683,42 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16811683
statsCh,
16821684
coordinator,
16831685
)
1684-
closer := agent.New(agent.Options{
1686+
uut := agent.New(agent.Options{
16851687
Client: client,
16861688
Filesystem: fs,
16871689
Logger: logger.Named("agent"),
16881690
ReconnectingPTYTimeout: time.Minute,
16891691
})
1690-
defer func() {
1691-
_ = closer.Close()
1692-
}()
1692+
t.Cleanup(func() {
1693+
_ = uut.Close()
1694+
})
16931695

16941696
// Setup a client connection.
1695-
newClientConn := func(derpMap *tailcfg.DERPMap) *codersdk.WorkspaceAgentConn {
1697+
newClientConn := func(derpMap *tailcfg.DERPMap, name string) *codersdk.WorkspaceAgentConn {
16961698
conn, err := tailnet.NewConn(&tailnet.Options{
16971699
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
16981700
DERPMap: derpMap,
1699-
Logger: logger.Named("client"),
1701+
Logger: logger.Named(name),
17001702
})
17011703
require.NoError(t, err)
1702-
clientConn, serverConn := net.Pipe()
1703-
serveClientDone := make(chan struct{})
17041704
t.Cleanup(func() {
1705-
_ = clientConn.Close()
1706-
_ = serverConn.Close()
1705+
t.Logf("closing conn %s", name)
17071706
_ = conn.Close()
1708-
<-serveClientDone
17091707
})
1710-
go func() {
1711-
defer close(serveClientDone)
1712-
err := coordinator.ServeClient(serverConn, uuid.New(), agentID)
1713-
assert.NoError(t, err)
1714-
}()
1715-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
1716-
return conn.UpdateNodes(nodes, false)
1708+
testCtx, testCtxCancel := context.WithCancel(context.Background())
1709+
t.Cleanup(testCtxCancel)
1710+
clientID := uuid.New()
1711+
coordination := tailnet.NewInMemoryCoordination(
1712+
testCtx, logger,
1713+
clientID, agentID,
1714+
coordinator, conn)
1715+
t.Cleanup(func() {
1716+
t.Logf("closing coordination %s", name)
1717+
err := coordination.Close()
1718+
if err != nil {
1719+
t.Logf("error closing in-memory coordination: %s", err.Error())
1720+
}
17171721
})
1718-
conn.SetNodeCallback(sendNode)
17191722
// Force DERP.
17201723
conn.SetBlockEndpoints(true)
17211724

@@ -1724,6 +1727,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17241727
CloseFunc: func() error { return codersdk.ErrSkipClose },
17251728
})
17261729
t.Cleanup(func() {
1730+
t.Logf("closing sdkConn %s", name)
17271731
_ = sdkConn.Close()
17281732
})
17291733
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
@@ -1734,7 +1738,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17341738

17351739
return sdkConn
17361740
}
1737-
conn1 := newClientConn(originalDerpMap)
1741+
conn1 := newClientConn(originalDerpMap, "client1")
17381742

17391743
// Change the DERP map.
17401744
newDerpMap, _ := tailnettest.RunDERPAndSTUN(t)
@@ -1753,27 +1757,34 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17531757
DERPMap: newDerpMap,
17541758
})
17551759
require.NoError(t, err)
1760+
t.Logf("client Pushed DERPMap update")
17561761

17571762
require.Eventually(t, func() bool {
1758-
conn := closer.TailnetConn()
1763+
conn := uut.TailnetConn()
17591764
if conn == nil {
17601765
return false
17611766
}
17621767
regionIDs := conn.DERPMap().RegionIDs()
1763-
return len(regionIDs) == 1 && regionIDs[0] == 2 && conn.Node().PreferredDERP == 2
1768+
preferredDERP := conn.Node().PreferredDERP
1769+
t.Logf("agent Conn DERPMap with regionIDs %v, PreferredDERP %d", regionIDs, preferredDERP)
1770+
return len(regionIDs) == 1 && regionIDs[0] == 2 && preferredDERP == 2
17641771
}, testutil.WaitLong, testutil.IntervalFast)
1772+
t.Logf("agent got the new DERPMap")
17651773

17661774
// Connect from a second client and make sure it uses the new DERP map.
1767-
conn2 := newClientConn(newDerpMap)
1775+
conn2 := newClientConn(newDerpMap, "client2")
17681776
require.Equal(t, []int{2}, conn2.DERPMap().RegionIDs())
1777+
t.Log("conn2 got the new DERPMap")
17691778

17701779
// If the first client gets a DERP map update, it should be able to
17711780
// reconnect just fine.
17721781
conn1.SetDERPMap(newDerpMap)
17731782
require.Equal(t, []int{2}, conn1.DERPMap().RegionIDs())
1783+
t.Log("set the new DERPMap on conn1")
17741784
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
17751785
defer cancel()
17761786
require.True(t, conn1.AwaitReachable(ctx))
1787+
t.Log("conn1 reached agent with new DERP")
17771788
}
17781789

17791790
func TestAgent_Speedtest(t *testing.T) {
@@ -2050,22 +2061,22 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20502061
Logger: logger.Named("client"),
20512062
})
20522063
require.NoError(t, err)
2053-
clientConn, serverConn := net.Pipe()
2054-
serveClientDone := make(chan struct{})
20552064
t.Cleanup(func() {
2056-
_ = clientConn.Close()
2057-
_ = serverConn.Close()
20582065
_ = conn.Close()
2059-
<-serveClientDone
20602066
})
2061-
go func() {
2062-
defer close(serveClientDone)
2063-
coordinator.ServeClient(serverConn, uuid.New(), metadata.AgentID)
2064-
}()
2065-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
2066-
return conn.UpdateNodes(nodes, false)
2067+
testCtx, testCtxCancel := context.WithCancel(context.Background())
2068+
t.Cleanup(testCtxCancel)
2069+
clientID := uuid.New()
2070+
coordination := tailnet.NewInMemoryCoordination(
2071+
testCtx, logger,
2072+
clientID, metadata.AgentID,
2073+
coordinator, conn)
2074+
t.Cleanup(func() {
2075+
err := coordination.Close()
2076+
if err != nil {
2077+
t.Logf("error closing in-mem coordination: %s", err.Error())
2078+
}
20672079
})
2068-
conn.SetNodeCallback(sendNode)
20692080
agentConn := codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
20702081
AgentID: metadata.AgentID,
20712082
})

agent/agenttest/client.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,28 @@ package agenttest
33
import (
44
"context"
55
"io"
6-
"net"
76
"sync"
7+
"sync/atomic"
88
"testing"
99
"time"
1010

11+
"github.com/stretchr/testify/require"
12+
13+
"storj.io/drpc"
14+
"storj.io/drpc/drpcmux"
15+
"storj.io/drpc/drpcserver"
16+
"tailscale.com/tailcfg"
17+
18+
"github.com/coder/coder/v2/tailnet/proto"
19+
1120
"github.com/google/uuid"
1221
"golang.org/x/exp/maps"
1322
"golang.org/x/xerrors"
1423

1524
"cdr.dev/slog"
1625
"github.com/coder/coder/v2/codersdk"
1726
"github.com/coder/coder/v2/codersdk/agentsdk"
27+
drpcsdk "github.com/coder/coder/v2/codersdk/drpc"
1828
"github.com/coder/coder/v2/tailnet"
1929
"github.com/coder/coder/v2/testutil"
2030
)
@@ -24,18 +34,39 @@ func NewClient(t testing.TB,
2434
agentID uuid.UUID,
2535
manifest agentsdk.Manifest,
2636
statsChan chan *agentsdk.Stats,
27-
coordinator tailnet.CoordinatorV1,
37+
coordinator tailnet.Coordinator,
2838
) *Client {
2939
if manifest.AgentID == uuid.Nil {
3040
manifest.AgentID = agentID
3141
}
42+
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
43+
coordPtr.Store(&coordinator)
44+
mux := drpcmux.New()
45+
drpcService := &tailnet.DRPCService{
46+
CoordPtr: &coordPtr,
47+
Logger: logger,
48+
// TODO: handle DERPMap too!
49+
DerpMapUpdateFrequency: time.Hour,
50+
DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
51+
}
52+
err := proto.DRPCRegisterTailnet(mux, drpcService)
53+
require.NoError(t, err)
54+
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
55+
Log: func(err error) {
56+
if xerrors.Is(err, io.EOF) {
57+
return
58+
}
59+
logger.Debug(context.Background(), "drpc server error", slog.Error(err))
60+
},
61+
})
3262
return &Client{
3363
t: t,
3464
logger: logger.Named("client"),
3565
agentID: agentID,
3666
manifest: manifest,
3767
statsChan: statsChan,
3868
coordinator: coordinator,
69+
server: server,
3970
derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
4071
}
4172
}
@@ -47,7 +78,8 @@ type Client struct {
4778
manifest agentsdk.Manifest
4879
metadata map[string]agentsdk.Metadata
4980
statsChan chan *agentsdk.Stats
50-
coordinator tailnet.CoordinatorV1
81+
coordinator tailnet.Coordinator
82+
server *drpcserver.Server
5183
LastWorkspaceAgent func()
5284
PatchWorkspaceLogs func() error
5385
GetServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
@@ -63,20 +95,29 @@ func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
6395
return c.manifest, nil
6496
}
6597

66-
func (c *Client) Listen(_ context.Context) (net.Conn, error) {
67-
clientConn, serverConn := net.Pipe()
98+
func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
99+
conn, lis := drpcsdk.MemTransportPipe()
68100
closed := make(chan struct{})
69101
c.LastWorkspaceAgent = func() {
70-
_ = serverConn.Close()
71-
_ = clientConn.Close()
102+
_ = conn.Close()
103+
_ = lis.Close()
72104
<-closed
73105
}
74106
c.t.Cleanup(c.LastWorkspaceAgent)
107+
serveCtx, cancel := context.WithCancel(context.Background())
108+
c.t.Cleanup(cancel)
109+
auth := tailnet.AgentTunnelAuth{}
110+
streamID := tailnet.StreamID{
111+
Name: "agenttest",
112+
ID: c.agentID,
113+
Auth: auth,
114+
}
115+
serveCtx = tailnet.WithStreamID(serveCtx, streamID)
75116
go func() {
76-
_ = c.coordinator.ServeAgent(serverConn, c.agentID, "")
117+
_ = c.server.Serve(serveCtx, lis)
77118
close(closed)
78119
}()
79-
return clientConn, nil
120+
return conn, nil
80121
}
81122

82123
func (c *Client) ReportStats(ctx context.Context, _ slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error) {

0 commit comments

Comments
 (0)