Skip to content

Commit 22c178f

Browse files
committed
chore: refactor coordination
1 parent 005ea53 commit 22c178f

File tree

9 files changed

+658
-578
lines changed

9 files changed

+658
-578
lines changed

agent/agent.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,7 +1352,8 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
13521352
defer close(disconnected)
13531353
a.closeMutex.Unlock()
13541354

1355-
coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
1355+
ctrl := tailnet.NewAgentCoordinationController(a.logger, network)
1356+
coordination := ctrl.New(coordinate)
13561357

13571358
errCh := make(chan error, 1)
13581359
go func() {
@@ -1364,7 +1365,7 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
13641365
a.logger.Warn(ctx, "failed to close remote coordination", slog.Error(err))
13651366
}
13661367
return
1367-
case err := <-coordination.Error():
1368+
case err := <-coordination.Wait():
13681369
errCh <- err
13691370
}
13701371
}()

agent/agent_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,10 +1918,8 @@ func TestAgent_UpdatedDERP(t *testing.T) {
19181918
testCtx, testCtxCancel := context.WithCancel(context.Background())
19191919
t.Cleanup(testCtxCancel)
19201920
clientID := uuid.New()
1921-
coordination := tailnet.NewInMemoryCoordination(
1922-
testCtx, logger,
1923-
clientID, agentID,
1924-
coordinator, conn)
1921+
ctrl := tailnet.NewSingleDestController(logger, conn, agentID)
1922+
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(logger, clientID, agentID, coordinator))
19251923
t.Cleanup(func() {
19261924
t.Logf("closing coordination %s", name)
19271925
cctx, ccancel := context.WithTimeout(testCtx, testutil.WaitShort)
@@ -2409,10 +2407,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
24092407
testCtx, testCtxCancel := context.WithCancel(context.Background())
24102408
t.Cleanup(testCtxCancel)
24112409
clientID := uuid.New()
2412-
coordination := tailnet.NewInMemoryCoordination(
2413-
testCtx, logger,
2414-
clientID, metadata.AgentID,
2415-
coordinator, conn)
2410+
ctrl := tailnet.NewSingleDestController(logger, conn, metadata.AgentID)
2411+
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(
2412+
logger, clientID, metadata.AgentID, coordinator))
24162413
t.Cleanup(func() {
24172414
cctx, ccancel := context.WithTimeout(testCtx, testutil.WaitShort)
24182415
defer ccancel()

agent/agenttest/client.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ func NewClient(t testing.TB,
7171
t: t,
7272
logger: logger.Named("client"),
7373
agentID: agentID,
74-
coordinator: coordinator,
7574
server: server,
7675
fakeAgentAPI: fakeAAPI,
7776
derpMapUpdates: derpMapUpdates,
@@ -82,7 +81,6 @@ type Client struct {
8281
t testing.TB
8382
logger slog.Logger
8483
agentID uuid.UUID
85-
coordinator tailnet.Coordinator
8684
server *drpcserver.Server
8785
fakeAgentAPI *FakeAgentAPI
8886
LastWorkspaceAgent func()

codersdk/workspacesdk/connector.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type tailnetAPIConnector struct {
6666
clock quartz.Clock
6767
dialOptions *websocket.DialOptions
6868
conn tailnetConn
69+
coordCtrl tailnet.CoordinationController
6970
customDialFn func() (proto.DRPCTailnetClient, error)
7071

7172
clientMu sync.RWMutex
@@ -112,6 +113,7 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
112113
// Runs a tailnetAPIConnector using the provided connection
113114
func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
114115
tac.conn = conn
116+
tac.coordCtrl = tailnet.NewSingleDestController(tac.logger, conn, tac.agentID)
115117
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
116118
go tac.manageGracefulTimeout()
117119
go func() {
@@ -272,7 +274,7 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
272274
tac.logger.Debug(tac.ctx, "error closing Coordinate RPC", slog.Error(cErr))
273275
}
274276
}()
275-
coordination := tailnet.NewRemoteCoordination(tac.logger, coord, tac.conn, tac.agentID)
277+
coordination := tac.coordCtrl.New(coord)
276278
tac.logger.Debug(tac.ctx, "serving coordinator")
277279
select {
278280
case <-tac.ctx.Done():
@@ -281,7 +283,7 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
281283
if crdErr != nil {
282284
tac.logger.Warn(tac.ctx, "failed to close remote coordination", slog.Error(err))
283285
}
284-
case err = <-coordination.Error():
286+
case err = <-coordination.Wait():
285287
if err != nil &&
286288
!xerrors.Is(err, io.EOF) &&
287289
!xerrors.Is(err, context.Canceled) &&

0 commit comments

Comments
 (0)