Skip to content

Commit 886dcbe

Browse files
authored
chore: refactor coordination (#15343)
Refactors the way clients of the Tailnet API (clients of the API, which include both workspace "agents" and "clients") interact with the API. Introduces the idea of abstract "controllers" for each of the RPCs in the API, and implements a Coordination controller by refactoring from `workspacesdk`. chore re: #14729
1 parent 765314c commit 886dcbe

File tree

9 files changed

+658
-578
lines changed

9 files changed

+658
-578
lines changed

agent/agent.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,7 +1352,8 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
13521352
defer close(disconnected)
13531353
a.closeMutex.Unlock()
13541354

1355-
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
1355+
ctrl := tailnet.NewAgentCoordinationController(a.logger, network)
1356+
coordination := ctrl.New(coordinate)
13561357

13571358
errCh := make(chan error, 1)
13581359
go func() {
@@ -1364,7 +1365,7 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
13641365
a.logger.Warn(ctx, "failed to close remote coordination", slog.Error(err))
13651366
}
13661367
return
1367-
case err := <-coordination.Error():
1368+
case err := <-coordination.Wait():
13681369
errCh <- err
13691370
}
13701371
}()

agent/agent_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,10 +1918,8 @@ func TestAgent_UpdatedDERP(t *testing.T) {
19181918
testCtx, testCtxCancel := context.WithCancel(context.Background())
19191919
t.Cleanup(testCtxCancel)
19201920
clientID := uuid.New()
1921-
coordination := tailnet.NewInMemoryCoordination(
1922-
testCtx, logger,
1923-
clientID, agentID,
1924-
coordinator, conn)
1921+
ctrl := tailnet.NewSingleDestController(logger, conn, agentID)
1922+
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(logger, clientID, agentID, coordinator))
19251923
t.Cleanup(func() {
19261924
t.Logf("closing coordination %s", name)
19271925
cctx, ccancel := context.WithTimeout(testCtx, testutil.WaitShort)
@@ -2409,10 +2407,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
24092407
testCtx, testCtxCancel := context.WithCancel(context.Background())
24102408
t.Cleanup(testCtxCancel)
24112409
clientID := uuid.New()
2412-
coordination := tailnet.NewInMemoryCoordination(
2413-
testCtx, logger,
2414-
clientID, metadata.AgentID,
2415-
coordinator, conn)
2410+
ctrl := tailnet.NewSingleDestController(logger, conn, metadata.AgentID)
2411+
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(
2412+
logger, clientID, metadata.AgentID, coordinator))
24162413
t.Cleanup(func() {
24172414
cctx, ccancel := context.WithTimeout(testCtx, testutil.WaitShort)
24182415
defer ccancel()

agent/agenttest/client.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ func NewClient(t testing.TB,
7171
t: t,
7272
logger: logger.Named("client"),
7373
agentID: agentID,
74-
coordinator: coordinator,
7574
server: server,
7675
fakeAgentAPI: fakeAAPI,
7776
derpMapUpdates: derpMapUpdates,
@@ -82,7 +81,6 @@ type Client struct {
8281
t testing.TB
8382
logger slog.Logger
8483
agentID uuid.UUID
85-
coordinator tailnet.Coordinator
8684
server *drpcserver.Server
8785
fakeAgentAPI *FakeAgentAPI
8886
LastWorkspaceAgent func()

codersdk/workspacesdk/connector.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type tailnetAPIConnector struct {
6666
clock quartz.Clock
6767
dialOptions *websocket.DialOptions
6868
conn tailnetConn
69+
coordCtrl tailnet.CoordinationController
6970
customDialFn func() (proto.DRPCTailnetClient, error)
7071

7172
clientMu sync.RWMutex
@@ -112,6 +113,7 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
112113
// Runs a tailnetAPIConnector using the provided connection
113114
func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
114115
tac.conn = conn
116+
tac.coordCtrl = tailnet.NewSingleDestController(tac.logger, conn, tac.agentID)
115117
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
116118
go tac.manageGracefulTimeout()
117119
go func() {
@@ -272,7 +274,7 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
272274
tac.logger.Debug(tac.ctx, "error closing Coordinate RPC", slog.Error(cErr))
273275
}
274276
}()
275-
coordination := tailnet.NewRemoteCoordination(tac.logger, coord, tac.conn, tac.agentID)
277+
coordination := tac.coordCtrl.New(coord)
276278
tac.logger.Debug(tac.ctx, "serving coordinator")
277279
select {
278280
case <-tac.ctx.Done():
@@ -281,7 +283,7 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
281283
if crdErr != nil {
282284
tac.logger.Warn(tac.ctx, "failed to close remote coordination", slog.Error(err))
283285
}
284-
case err = <-coordination.Error():
286+
case err = <-coordination.Wait():
285287
if err != nil &&
286288
!xerrors.Is(err, io.EOF) &&
287289
!xerrors.Is(err, context.Canceled) &&

0 commit comments

Comments
 (0)