Skip to content

Commit fe63f6a

Browse files
committed
feat: use tailnet v2 API for coordination
1 parent 04e3985 commit fe63f6a

30 files changed

+1187
-1108
lines changed

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ gen: \
475475
site/.eslintignore \
476476
site/e2e/provisionerGenerated.ts \
477477
site/src/theme/icons.json \
478-
examples/examples.gen.json
478+
examples/examples.gen.json \
479+
tailnet/tailnettest/coordinatormock.go
479480
.PHONY: gen
480481

481482
# Mark all generated files as fresh so make thinks they're up-to-date. This is
@@ -502,6 +503,7 @@ gen/mark-fresh:
502503
site/e2e/provisionerGenerated.ts \
503504
site/src/theme/icons.json \
504505
examples/examples.gen.json \
506+
tailnet/tailnettest/coordinatormock.go \
505507
"
506508
for file in $$files; do
507509
echo "$$file"
@@ -529,6 +531,9 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $
529531
coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go
530532
go generate ./coderd/database/dbmock/
531533

534+
tailnet/tailnettest/coordinatormock.go: tailnet/coordinator.go
535+
go generate ./tailnet/tailnettest/
536+
532537
tailnet/proto/tailnet.pb.go: tailnet/proto/tailnet.proto
533538
protoc \
534539
--go_out=. \

agent/agent.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"time"
2424

25+
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
26+
2527
"github.com/go-chi/chi/v5"
2628
"github.com/google/uuid"
2729
"github.com/prometheus/client_golang/prometheus"
@@ -30,6 +32,7 @@ import (
3032
"golang.org/x/exp/slices"
3133
"golang.org/x/sync/errgroup"
3234
"golang.org/x/xerrors"
35+
"storj.io/drpc"
3336
"tailscale.com/net/speedtest"
3437
"tailscale.com/tailcfg"
3538
"tailscale.com/types/netlogtype"
@@ -86,7 +89,7 @@ type Options struct {
8689

8790
type Client interface {
8891
Manifest(ctx context.Context) (agentsdk.Manifest, error)
89-
Listen(ctx context.Context) (net.Conn, error)
92+
Listen(ctx context.Context) (drpc.Conn, error)
9093
DERPMapUpdates(ctx context.Context) (<-chan agentsdk.DERPMapUpdate, io.Closer, error)
9194
ReportStats(ctx context.Context, log slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error)
9295
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error
@@ -1058,20 +1061,34 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
10581061
ctx, cancel := context.WithCancel(ctx)
10591062
defer cancel()
10601063

1061-
coordinator, err := a.client.Listen(ctx)
1064+
conn, err := a.client.Listen(ctx)
10621065
if err != nil {
10631066
return err
10641067
}
1065-
defer coordinator.Close()
1068+
defer func() {
1069+
cErr := conn.Close()
1070+
if cErr != nil {
1071+
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
1072+
}
1073+
}()
1074+
1075+
tClient := tailnetproto.NewDRPCTailnetClient(conn)
1076+
coordinate, err := tClient.Coordinate(ctx)
1077+
if err != nil {
1078+
return xerrors.Errorf("failed to connect to the coordinate endpoint: %w", err)
1079+
}
1080+
defer func() {
1081+
cErr := coordinate.Close()
1082+
if cErr != nil {
1083+
a.logger.Debug(ctx, "error closing Coordinate client", slog.Error(err))
1084+
}
1085+
}()
10661086
a.logger.Info(ctx, "connected to coordination endpoint")
1067-
sendNodes, errChan := tailnet.ServeCoordinator(coordinator, func(nodes []*tailnet.Node) error {
1068-
return network.UpdateNodes(nodes, false)
1069-
})
1070-
network.SetNodeCallback(sendNodes)
1087+
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
10711088
select {
10721089
case <-ctx.Done():
10731090
return ctx.Err()
1074-
case err := <-errChan:
1091+
case err := <-coordination.Error():
10751092
return err
10761093
}
10771094
}

agent/agent_test.go

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,9 +1677,11 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16771677
require.NotNil(t, originalDerpMap)
16781678

16791679
coordinator := tailnet.NewCoordinator(logger)
1680-
defer func() {
1680+
// use t.Cleanup so the coordinator closing doesn't deadlock with in-memory
1681+
// coordination
1682+
t.Cleanup(func() {
16811683
_ = coordinator.Close()
1682-
}()
1684+
})
16831685
agentID := uuid.New()
16841686
statsCh := make(chan *agentsdk.Stats, 50)
16851687
fs := afero.NewMemMapFs()
@@ -1694,41 +1696,42 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16941696
statsCh,
16951697
coordinator,
16961698
)
1697-
closer := agent.New(agent.Options{
1699+
uut := agent.New(agent.Options{
16981700
Client: client,
16991701
Filesystem: fs,
17001702
Logger: logger.Named("agent"),
17011703
ReconnectingPTYTimeout: time.Minute,
17021704
})
1703-
defer func() {
1704-
_ = closer.Close()
1705-
}()
1705+
t.Cleanup(func() {
1706+
_ = uut.Close()
1707+
})
17061708

17071709
// Setup a client connection.
1708-
newClientConn := func(derpMap *tailcfg.DERPMap) *codersdk.WorkspaceAgentConn {
1710+
newClientConn := func(derpMap *tailcfg.DERPMap, name string) *codersdk.WorkspaceAgentConn {
17091711
conn, err := tailnet.NewConn(&tailnet.Options{
17101712
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
17111713
DERPMap: derpMap,
1712-
Logger: logger.Named("client"),
1714+
Logger: logger.Named(name),
17131715
})
17141716
require.NoError(t, err)
1715-
clientConn, serverConn := net.Pipe()
1716-
serveClientDone := make(chan struct{})
17171717
t.Cleanup(func() {
1718-
_ = clientConn.Close()
1719-
_ = serverConn.Close()
1718+
t.Logf("closing conn %s", name)
17201719
_ = conn.Close()
1721-
<-serveClientDone
17221720
})
1723-
go func() {
1724-
defer close(serveClientDone)
1725-
err := coordinator.ServeClient(serverConn, uuid.New(), agentID)
1726-
assert.NoError(t, err)
1727-
}()
1728-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
1729-
return conn.UpdateNodes(nodes, false)
1721+
testCtx, testCtxCancel := context.WithCancel(context.Background())
1722+
t.Cleanup(testCtxCancel)
1723+
clientID := uuid.New()
1724+
coordination := tailnet.NewInMemoryCoordination(
1725+
testCtx, logger,
1726+
clientID, agentID,
1727+
coordinator, conn)
1728+
t.Cleanup(func() {
1729+
t.Logf("closing coordination %s", name)
1730+
err := coordination.Close()
1731+
if err != nil {
1732+
t.Logf("error closing in-memory coordination: %s", err.Error())
1733+
}
17301734
})
1731-
conn.SetNodeCallback(sendNode)
17321735
// Force DERP.
17331736
conn.SetBlockEndpoints(true)
17341737

@@ -1737,6 +1740,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17371740
CloseFunc: func() error { return codersdk.ErrSkipClose },
17381741
})
17391742
t.Cleanup(func() {
1743+
t.Logf("closing sdkConn %s", name)
17401744
_ = sdkConn.Close()
17411745
})
17421746
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
@@ -1747,7 +1751,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17471751

17481752
return sdkConn
17491753
}
1750-
conn1 := newClientConn(originalDerpMap)
1754+
conn1 := newClientConn(originalDerpMap, "client1")
17511755

17521756
// Change the DERP map.
17531757
newDerpMap, _ := tailnettest.RunDERPAndSTUN(t)
@@ -1766,27 +1770,34 @@ func TestAgent_UpdatedDERP(t *testing.T) {
17661770
DERPMap: newDerpMap,
17671771
})
17681772
require.NoError(t, err)
1773+
t.Logf("client Pushed DERPMap update")
17691774

17701775
require.Eventually(t, func() bool {
1771-
conn := closer.TailnetConn()
1776+
conn := uut.TailnetConn()
17721777
if conn == nil {
17731778
return false
17741779
}
17751780
regionIDs := conn.DERPMap().RegionIDs()
1776-
return len(regionIDs) == 1 && regionIDs[0] == 2 && conn.Node().PreferredDERP == 2
1781+
preferredDERP := conn.Node().PreferredDERP
1782+
t.Logf("agent Conn DERPMap with regionIDs %v, PreferredDERP %d", regionIDs, preferredDERP)
1783+
return len(regionIDs) == 1 && regionIDs[0] == 2 && preferredDERP == 2
17771784
}, testutil.WaitLong, testutil.IntervalFast)
1785+
t.Logf("agent got the new DERPMap")
17781786

17791787
// Connect from a second client and make sure it uses the new DERP map.
1780-
conn2 := newClientConn(newDerpMap)
1788+
conn2 := newClientConn(newDerpMap, "client2")
17811789
require.Equal(t, []int{2}, conn2.DERPMap().RegionIDs())
1790+
t.Log("conn2 got the new DERPMap")
17821791

17831792
// If the first client gets a DERP map update, it should be able to
17841793
// reconnect just fine.
17851794
conn1.SetDERPMap(newDerpMap)
17861795
require.Equal(t, []int{2}, conn1.DERPMap().RegionIDs())
1796+
t.Log("set the new DERPMap on conn1")
17871797
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
17881798
defer cancel()
17891799
require.True(t, conn1.AwaitReachable(ctx))
1800+
t.Log("conn1 reached agent with new DERP")
17901801
}
17911802

17921803
func TestAgent_Speedtest(t *testing.T) {
@@ -2063,22 +2074,22 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20632074
Logger: logger.Named("client"),
20642075
})
20652076
require.NoError(t, err)
2066-
clientConn, serverConn := net.Pipe()
2067-
serveClientDone := make(chan struct{})
20682077
t.Cleanup(func() {
2069-
_ = clientConn.Close()
2070-
_ = serverConn.Close()
20712078
_ = conn.Close()
2072-
<-serveClientDone
20732079
})
2074-
go func() {
2075-
defer close(serveClientDone)
2076-
coordinator.ServeClient(serverConn, uuid.New(), metadata.AgentID)
2077-
}()
2078-
sendNode, _ := tailnet.ServeCoordinator(clientConn, func(nodes []*tailnet.Node) error {
2079-
return conn.UpdateNodes(nodes, false)
2080+
testCtx, testCtxCancel := context.WithCancel(context.Background())
2081+
t.Cleanup(testCtxCancel)
2082+
clientID := uuid.New()
2083+
coordination := tailnet.NewInMemoryCoordination(
2084+
testCtx, logger,
2085+
clientID, metadata.AgentID,
2086+
coordinator, conn)
2087+
t.Cleanup(func() {
2088+
err := coordination.Close()
2089+
if err != nil {
2090+
t.Logf("error closing in-mem coordination: %s", err.Error())
2091+
}
20802092
})
2081-
conn.SetNodeCallback(sendNode)
20822093
agentConn := codersdk.NewWorkspaceAgentConn(conn, codersdk.WorkspaceAgentConnOptions{
20832094
AgentID: metadata.AgentID,
20842095
})

agent/agenttest/client.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,28 @@ package agenttest
33
import (
44
"context"
55
"io"
6-
"net"
76
"sync"
7+
"sync/atomic"
88
"testing"
99
"time"
1010

11+
"github.com/stretchr/testify/require"
12+
13+
"storj.io/drpc"
14+
"storj.io/drpc/drpcmux"
15+
"storj.io/drpc/drpcserver"
16+
"tailscale.com/tailcfg"
17+
18+
"github.com/coder/coder/v2/tailnet/proto"
19+
1120
"github.com/google/uuid"
1221
"golang.org/x/exp/maps"
1322
"golang.org/x/xerrors"
1423

1524
"cdr.dev/slog"
1625
"github.com/coder/coder/v2/codersdk"
1726
"github.com/coder/coder/v2/codersdk/agentsdk"
27+
drpcsdk "github.com/coder/coder/v2/codersdk/drpc"
1828
"github.com/coder/coder/v2/tailnet"
1929
"github.com/coder/coder/v2/testutil"
2030
)
@@ -24,18 +34,39 @@ func NewClient(t testing.TB,
2434
agentID uuid.UUID,
2535
manifest agentsdk.Manifest,
2636
statsChan chan *agentsdk.Stats,
27-
coordinator tailnet.CoordinatorV1,
37+
coordinator tailnet.Coordinator,
2838
) *Client {
2939
if manifest.AgentID == uuid.Nil {
3040
manifest.AgentID = agentID
3141
}
42+
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
43+
coordPtr.Store(&coordinator)
44+
mux := drpcmux.New()
45+
drpcService := &tailnet.DRPCService{
46+
CoordPtr: &coordPtr,
47+
Logger: logger,
48+
// TODO: handle DERPMap too!
49+
DerpMapUpdateFrequency: time.Hour,
50+
DerpMapFn: func() *tailcfg.DERPMap { panic("not implemented") },
51+
}
52+
err := proto.DRPCRegisterTailnet(mux, drpcService)
53+
require.NoError(t, err)
54+
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
55+
Log: func(err error) {
56+
if xerrors.Is(err, io.EOF) {
57+
return
58+
}
59+
logger.Debug(context.Background(), "drpc server error", slog.Error(err))
60+
},
61+
})
3262
return &Client{
3363
t: t,
3464
logger: logger.Named("client"),
3565
agentID: agentID,
3666
manifest: manifest,
3767
statsChan: statsChan,
3868
coordinator: coordinator,
69+
server: server,
3970
derpMapUpdates: make(chan agentsdk.DERPMapUpdate),
4071
}
4172
}
@@ -47,7 +78,8 @@ type Client struct {
4778
manifest agentsdk.Manifest
4879
metadata map[string]agentsdk.Metadata
4980
statsChan chan *agentsdk.Stats
50-
coordinator tailnet.CoordinatorV1
81+
coordinator tailnet.Coordinator
82+
server *drpcserver.Server
5183
LastWorkspaceAgent func()
5284
PatchWorkspaceLogs func() error
5385
GetServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
@@ -63,20 +95,29 @@ func (c *Client) Manifest(_ context.Context) (agentsdk.Manifest, error) {
6395
return c.manifest, nil
6496
}
6597

66-
func (c *Client) Listen(_ context.Context) (net.Conn, error) {
67-
clientConn, serverConn := net.Pipe()
98+
func (c *Client) Listen(_ context.Context) (drpc.Conn, error) {
99+
conn, lis := drpcsdk.MemTransportPipe()
68100
closed := make(chan struct{})
69101
c.LastWorkspaceAgent = func() {
70-
_ = serverConn.Close()
71-
_ = clientConn.Close()
102+
_ = conn.Close()
103+
_ = lis.Close()
72104
<-closed
73105
}
74106
c.t.Cleanup(c.LastWorkspaceAgent)
107+
serveCtx, cancel := context.WithCancel(context.Background())
108+
c.t.Cleanup(cancel)
109+
auth := tailnet.AgentTunnelAuth{}
110+
streamID := tailnet.StreamID{
111+
Name: "agenttest",
112+
ID: c.agentID,
113+
Auth: auth,
114+
}
115+
serveCtx = tailnet.WithStreamID(serveCtx, streamID)
75116
go func() {
76-
_ = c.coordinator.ServeAgent(serverConn, c.agentID, "")
117+
_ = c.server.Serve(serveCtx, lis)
77118
close(closed)
78119
}()
79-
return clientConn, nil
120+
return conn, nil
80121
}
81122

82123
func (c *Client) ReportStats(ctx context.Context, _ slog.Logger, statsChan <-chan *agentsdk.Stats, setInterval func(time.Duration)) (io.Closer, error) {

0 commit comments

Comments
 (0)