Skip to content

Commit 2f11961

Browse files
committed
feat: use tailnet v2 API for coordination
1 parent 64caaac commit 2f11961

30 files changed

+1170
-1107
lines changed

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ gen: \
475475
site/.eslintignore \
476476
site/e2e/provisionerGenerated.ts \
477477
site/src/theme/icons.json \
478-
examples/examples.gen.json
478+
examples/examples.gen.json \
479+
tailnet/tailnettest/coordinatormock.go
479480
.PHONY: gen
480481

481482
# Mark all generated files as fresh so make thinks they're up-to-date. This is
@@ -502,6 +503,7 @@ gen/mark-fresh:
502503
site/e2e/provisionerGenerated.ts \
503504
site/src/theme/icons.json \
504505
examples/examples.gen.json \
506+
tailnet/tailnettest/coordinatormock.go \
505507
"
506508
for file in $$files; do
507509
echo "$$file"
@@ -529,6 +531,9 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $
529531
coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go
530532
go generate ./coderd/database/dbmock/
531533

534+
tailnet/tailnettest/coordinatormock.go: tailnet/coordinator.go
535+
go generate ./tailnet/tailnettest/
536+
532537
tailnet/proto/tailnet.pb.go: tailnet/proto/tailnet.proto
533538
protoc \
534539
--go_out=. \

agent/agent.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"golang.org/x/exp/slices"
3131
"golang.org/x/sync/errgroup"
3232
"golang.org/x/xerrors"
33+
"storj.io/drpc"
3334
"tailscale.com/net/speedtest"
3435
"tailscale.com/tailcfg"
3536
"tailscale.com/types/netlogtype"
@@ -47,6 +48,7 @@ import (
4748
"github.com/coder/coder/v2/codersdk"
4849
"github.com/coder/coder/v2/codersdk/agentsdk"
4950
"github.com/coder/coder/v2/tailnet"
51+
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
5052
)
5153

5254
const (
@@ -86,7 +88,7 @@ type Options struct {
8688

8789
type Client interface {
8890
Manifest(ctx context.Context) (agentsdk.Manifest, error)
89-
Listen(ctx context.Context) (net.Conn, error)
91+
Listen(ctx context.Context) (drpc.Conn, error)
9092
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
9193
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
9294
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
@@ -1058,20 +1060,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10581060
ctx, cancel := context.WithCancel(ctx)
10591061
defer cancel()
10601062

1061-
coordinator, err := a.client.Listen(ctx)
1063+
conn, err := a.client.Listen(ctx)
10621064
if err != nil {
10631065
return err
10641066
}
1065-
defer coordinator.Close()
1067+
defer func() {
1068+
cErr := conn.Close()
1069+
if cErr != nil {
1070+
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
1071+
}
1072+
}()
1073+
1074+
tClient := tailnetproto.NewDRPCTailnetClient(conn)
1075+
coordinate, err := tClient.Coordinate(ctx)
1076+
if err != nil {
1077+
return xerrors.Errorf("failed to connect to the coordinate endpoint: %w", err)
1078+
}
1079+
defer func() {
1080+
cErr := coordinate.Close()
1081+
if cErr != nil {
1082+
a.logger.Debug(ctx, "error closing Coordinate client", slog.Error(err))
1083+
}
1084+
}()
10661085
a.logger.Info(ctx, "connected to coordination endpoint")
1067-
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(nodes []*tailnet.Node) error {
1068-
return network.UpdateNodes(nodes, false)
1069-
})
1070-
network.SetNodeCallback(sendNodes)
1086+
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
10711087
select {
10721088
case <-ctx.Done():
10731089
return ctx.Err()
1074-
case err := <-errChan:
1090+
case err := <-coordination.Error():
10751091
return err
10761092
}
10771093
}

agent/agent_test.go

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,9 +1677,11 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16771677
require.NotNil(t, originalDerpMap)
16781678

16791679
coordinator := tailnet.NewCoordinator(logger)
1680-
defer func() {
1680+
// use t.Cleanup so the coordinator closing doesn't deadlock with in-memory
1681+
// coordination
1682+
t.Cleanup(func() {
16811683
_ = coordinator.Close()
1682-
}()
1684+
})
16831685
agentID := uuid.New()
16841686
statsCh := make(chan *agentsdk.Stats, 50)
16851687
fs := afero.NewMemMapFs()
@@ -1694,41 +1696,42 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16941696
statsCh,
16951697
coordinator,
16961698
)
1697-
closer := agent.New(agent.Options{
1699+
uut := agent.New(agent.Options{
16981700
Client: client,
16991701
Filesystem: fs,
17001702
Logger: logger.Named("agent"),
17011703
ReconnectingPTYTimeout: time.Minute,
17021704
})
1703-
defer func() {
1704-
_ = closer.Close()
1705-
}()
1705+
t.Cleanup(func() {
1706+
_ = uut.Close()
1707+
})
17061708

17071709
// Setup a client connection.
1708-
newClientConn := func(derpMap *tailcfg.DERPMap) *codersdk.WorkspaceAgentConn {
1710+
newClientConn := func(derpMap *tailcfg.DERPMap, name string) *codersdk.WorkspaceAgentConn {
17091711
conn, err := tailnet.NewConn(&tailnet.Options{
17101712
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
17111713
DERPMap: derpMap,
1712-
Logger: logger.Named("client"),
1714+
Logger: logger.Named(name),
17131715
})
17141716
require.NoError(t, err)
1715-
clientConn, serverConn := net.Pipe()
1716-
serveClientDone := make(chan struct{})
17171717
t.Cleanup(func() {
1718-
_ = clientConn.Close()
1719-
_ = serverConn.Close()
1718+
t.Logf("closing conn %s", name)
17201719
_ = conn.Close()
1721-
<-serveClientDone
17221720
})
1723-
go func() {
1724-
defer close(serveClientDone)
1725-
err := coordinator.ServeClient(serverConn, uuid.New(), agentID)
1726-
assert.NoError(t, err)
1727-
}()
1728-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
1729-
return conn.UpdateNodes(nodes, false)
1721+
testCtx, testCtxCancel := context.WithCancel(context.Background())
1722+
t.Cleanup(testCtxCancel)
1723+
clientID := uuid.New()
1724+
coordination := tailnet.NewInMemoryCoordination(
1725+
testCtx, logger,
1726+
clientID, agentID,
1727+
coordinator, conn)
1728+
t.Cleanup(func() {
1729+
t.Logf("closing coordination %s", name)
1730+
err := coordination.Close()
1731+
if err != nil {
1732+
t.Logf("error closing in-memory coordination: %s", err.Error())
1733+
}
17301734
})
1731-
conn.SetNodeCallback(sendNode)
17321735
// Force DERP.
17331736
conn.SetBlockEndpoints(true)
17341737

@@ -1737,6 +1740,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17371740
CloseFunc: func() error { return codersdk.ErrSkipClose },
17381741
})
17391742
t.Cleanup(func() {
1743+
t.Logf("closing sdkConn %s", name)
17401744
_ = sdkConn.Close()
17411745
})
17421746
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
@@ -1747,7 +1751,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17471751

17481752
return sdkConn
17491753
}
1750-
conn1 := newClientConn(originalDerpMap)
1754+
conn1 := newClientConn(originalDerpMap, "client1")
17511755

17521756
// Change the DERP map.
17531757
newDerpMap, _ := tailnettest.RunDERPAndSTUN(t)
@@ -1766,27 +1770,34 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17661770
DERPMap: newDerpMap,
17671771
})
17681772
require.NoError(t, err)
1773+
t.Logf("client Pushed DERPMap update")
17691774

17701775
require.Eventually(t, func() bool {
1771-
conn := closer.TailnetConn()
1776+
conn := uut.TailnetConn()
17721777
if conn == nil {
17731778
return false
17741779
}
17751780
regionIDs := conn.DERPMap().RegionIDs()
1776-
return len(regionIDs) == 1 && regionIDs[0] == 2 && conn.Node().PreferredDERP == 2
1781+
preferredDERP := conn.Node().PreferredDERP
1782+
t.Logf("agent Conn DERPMap with regionIDs %v, PreferredDERP %d", regionIDs, preferredDERP)
1783+
return len(regionIDs) == 1 && regionIDs[0] == 2 && preferredDERP == 2
17771784
}, testutil.WaitLong, testutil.IntervalFast)
1785+
t.Logf("agent got the new DERPMap")
17781786

17791787
// Connect from a second client and make sure it uses the new DERP map.
1780-
conn2 := newClientConn(newDerpMap)
1788+
conn2 := newClientConn(newDerpMap, "client2")
17811789
require.Equal(t, []int{2}, conn2.DERPMap().RegionIDs())
1790+
t.Log("conn2 got the new DERPMap")
17821791

17831792
// If the first client gets a DERP map update, it should be able to
17841793
// reconnect just fine.
17851794
conn1.SetDERPMap(newDerpMap)
17861795
require.Equal(t, []int{2}, conn1.DERPMap().RegionIDs())
1796+
t.Log("set the new DERPMap on conn1")
17871797
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
17881798
defer cancel()
17891799
require.True(t, conn1.AwaitReachable(ctx))
1800+
t.Log("conn1 reached agent with new DERP")
17901801
}
17911802

17921803
func TestAgent_Speedtest(t *testing.T) {
@@ -2063,22 +2074,22 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20632074
Logger: logger.Named("client"),
20642075
})
20652076
require.NoError(t, err)
2066-
clientConn, serverConn := net.Pipe()
2067-
serveClientDone := make(chan struct{})
20682077
t.Cleanup(func() {
2069-
_ = clientConn.Close()
2070-
_ = serverConn.Close()
20712078
_ = conn.Close()
2072-
<-serveClientDone
20732079
})
2074-
go func() {
2075-
defer close(serveClientDone)
2076-
coordinator.ServeClient(serverConn, uuid.New(), metadata.AgentID)
2077-
}()
2078-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
2079-
return conn.UpdateNodes(nodes, false)
2080+
testCtx, testCtxCancel := context.WithCancel(context.Background())
2081+
t.Cleanup(testCtxCancel)
2082+
clientID := uuid.New()
2083+
coordination := tailnet.NewInMemoryCoordination(
2084+
testCtx, logger,
2085+
clientID, metadata.AgentID,
2086+
coordinator, conn)
2087+
t.Cleanup(func() {
2088+
err := coordination.Close()
2089+
if err != nil {
2090+
t.Logf("error closing in-mem coordination: %s", err.Error())
2091+
}
20802092
})
2081-
conn.SetNodeCallback(sendNode)
20822093
agentConn := codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
20832094
AgentID: metadata.AgentID,
20842095
})

agent/agenttest/client.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,26 @@ package agenttest
33
import (
44
"context"
55
"io"
6-
"net"
76
"sync"
7+
"sync/atomic"
88
"testing"
99
"time"
1010

1111
"github.com/google/uuid"
12+
"github.com/stretchr/testify/require"
1213
"golang.org/x/exp/maps"
1314
"golang.org/x/xerrors"
15+
"storj.io/drpc"
16+
"storj.io/drpc/drpcmux"
17+
"storj.io/drpc/drpcserver"
18+
"tailscale.com/tailcfg"
1419

1520
"cdr.dev/slog"
1621
"github.com/coder/coder/v2/codersdk"
1722
"github.com/coder/coder/v2/codersdk/agentsdk"
23+
drpcsdk "github.com/coder/coder/v2/codersdk/drpc"
1824
"github.com/coder/coder/v2/tailnet"
25+
"github.com/coder/coder/v2/tailnet/proto"
1926
"github.com/coder/coder/v2/testutil"
2027
)
2128

@@ -24,18 +31,39 @@ func NewClient(t testing.TB,
2431
agentID uuid.UUID,
2532
manifest agentsdk.Manifest,
2633
statsChan chan *agentsdk.Stats,
27-
coordinator tailnet.CoordinatorV1,
34+
coordinator tailnet.Coordinator,
2835
) *Client {
2936
if manifest.AgentID == uuid.Nil {
3037
manifest.AgentID = agentID
3138
}
39+
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
40+
coordPtr.Store(&coordinator)
41+
mux := drpcmux.New()
42+
drpcService := &tailnet.DRPCService{
43+
CoordPtr: &coordPtr,
44+
Logger: logger,
45+
// TODO: handle DERPMap too!
46+
DerpMapUpdateFrequency: time.Hour,
47+
DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
48+
}
49+
err := proto.DRPCRegisterTailnet(mux, drpcService)
50+
require.NoError(t, err)
51+
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
52+
Log: func(err error) {
53+
if xerrors.Is(err, io.EOF) {
54+
return
55+
}
56+
logger.Debug(context.Background(), "drpc server error", slog.Error(err))
57+
},
58+
})
3259
return &Client{
3360
t: t,
3461
logger: logger.Named("client"),
3562
agentID: agentID,
3663
manifest: manifest,
3764
statsChan: statsChan,
3865
coordinator: coordinator,
66+
server: server,
3967
derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
4068
}
4169
}
@@ -47,7 +75,8 @@ type Client struct {
4775
manifest agentsdk.Manifest
4876
metadata map[string]agentsdk.Metadata
4977
statsChan chan *agentsdk.Stats
50-
coordinator tailnet.CoordinatorV1
78+
coordinator tailnet.Coordinator
79+
server *drpcserver.Server
5180
LastWorkspaceAgent func()
5281
PatchWorkspaceLogs func() error
5382
GetServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
@@ -63,20 +92,29 @@ func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
6392
return c.manifest, nil
6493
}
6594

66-
func (c *Client) Listen(_ context.Context) (net.Conn, error) {
67-
clientConn, serverConn := net.Pipe()
95+
func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
96+
conn, lis := drpcsdk.MemTransportPipe()
6897
closed := make(chan struct{})
6998
c.LastWorkspaceAgent = func() {
70-
_ = serverConn.Close()
71-
_ = clientConn.Close()
99+
_ = conn.Close()
100+
_ = lis.Close()
72101
<-closed
73102
}
74103
c.t.Cleanup(c.LastWorkspaceAgent)
104+
serveCtx, cancel := context.WithCancel(context.Background())
105+
c.t.Cleanup(cancel)
106+
auth := tailnet.AgentTunnelAuth{}
107+
streamID := tailnet.StreamID{
108+
Name: "agenttest",
109+
ID: c.agentID,
110+
Auth: auth,
111+
}
112+
serveCtx = tailnet.WithStreamID(serveCtx, streamID)
75113
go func() {
76-
_ = c.coordinator.ServeAgent(serverConn, c.agentID, "")
114+
_ = c.server.Serve(serveCtx, lis)
77115
close(closed)
78116
}()
79-
return clientConn, nil
117+
return conn, nil
80118
}
81119

82120
func (c *Client) ReportStats(ctx context.Context, _ slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error) {

0 commit comments

Comments
 (0)