Skip to content

Commit 08216aa

Browse files
authored
feat: add workspace updates controller (#15506)
re: #14730 Adds a protocol controller for WorkspaceUpdates RPC that takes all the agents we learn about over the RPC, and programs them into the Coordination controller, so that we set up tunnels to all the agents. Handling DNS is in a PR up the stack, as is actually wiring it up to anything.
1 parent e7ab3e1 commit 08216aa

File tree

2 files changed

+544
-11
lines changed

2 files changed

+544
-11
lines changed

tailnet/controllers.go

+230-11
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import (
2626
// A Controller connects to the tailnet control plane, and then uses the control protocols to
2727
// program a tailnet.Conn in production (in test it could be an interface simulating the Conn). It
2828
// delegates this task to sub-controllers responsible for the main areas of the tailnet control
29-
// protocol: coordination, DERP map updates, resume tokens, and telemetry.
29+
// protocol: coordination, DERP map updates, resume tokens, telemetry, and workspace updates.
3030
type Controller struct {
31-
Dialer ControlProtocolDialer
32-
CoordCtrl CoordinationController
33-
DERPCtrl DERPController
34-
ResumeTokenCtrl ResumeTokenController
35-
TelemetryCtrl TelemetryController
31+
Dialer ControlProtocolDialer
32+
CoordCtrl CoordinationController
33+
DERPCtrl DERPController
34+
ResumeTokenCtrl ResumeTokenController
35+
TelemetryCtrl TelemetryController
36+
WorkspaceUpdatesCtrl WorkspaceUpdatesController
3637

3738
ctx context.Context
3839
gracefulCtx context.Context
@@ -94,15 +95,25 @@ type TelemetryController interface {
9495
New(TelemetryClient)
9596
}
9697

98+
type WorkspaceUpdatesClient interface {
99+
Close() error
100+
Recv() (*proto.WorkspaceUpdate, error)
101+
}
102+
103+
type WorkspaceUpdatesController interface {
104+
New(WorkspaceUpdatesClient) CloserWaiter
105+
}
106+
97107
// ControlProtocolClients represents an abstract interface to the tailnet control plane via a set
98108
// of protocol clients. The Closer should close all the clients (e.g. by closing the underlying
99109
// connection).
100110
type ControlProtocolClients struct {
101-
Closer io.Closer
102-
Coordinator CoordinatorClient
103-
DERP DERPClient
104-
ResumeToken ResumeTokenClient
105-
Telemetry TelemetryClient
111+
Closer io.Closer
112+
Coordinator CoordinatorClient
113+
DERP DERPClient
114+
ResumeToken ResumeTokenClient
115+
Telemetry TelemetryClient
116+
WorkspaceUpdates WorkspaceUpdatesClient
106117
}
107118

108119
type ControlProtocolDialer interface {
@@ -419,6 +430,7 @@ func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
419430
}
420431
}()
421432
for dest := range toAdd {
433+
c.Coordinatee.SetTunnelDestination(dest)
422434
err = c.coordination.Client.Send(
423435
&proto.CoordinateRequest{
424436
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: UUIDToByteSlice(dest)},
@@ -822,6 +834,213 @@ func (r *basicResumeTokenRefresher) refresh() {
822834
r.timer.Reset(dur, "basicResumeTokenRefresher", "refresh")
823835
}
824836

837+
type tunnelAllWorkspaceUpdatesController struct {
838+
coordCtrl *TunnelSrcCoordController
839+
logger slog.Logger
840+
}
841+
842+
type workspace struct {
843+
id uuid.UUID
844+
name string
845+
agents map[uuid.UUID]agent
846+
}
847+
848+
type agent struct {
849+
id uuid.UUID
850+
name string
851+
}
852+
853+
func (t *tunnelAllWorkspaceUpdatesController) New(client WorkspaceUpdatesClient) CloserWaiter {
854+
updater := &tunnelUpdater{
855+
client: client,
856+
errChan: make(chan error, 1),
857+
logger: t.logger,
858+
coordCtrl: t.coordCtrl,
859+
recvLoopDone: make(chan struct{}),
860+
workspaces: make(map[uuid.UUID]*workspace),
861+
}
862+
go updater.recvLoop()
863+
return updater
864+
}
865+
866+
type tunnelUpdater struct {
867+
errChan chan error
868+
logger slog.Logger
869+
client WorkspaceUpdatesClient
870+
coordCtrl *TunnelSrcCoordController
871+
recvLoopDone chan struct{}
872+
873+
// don't need the mutex since only manipulated by the recvLoop
874+
workspaces map[uuid.UUID]*workspace
875+
876+
sync.Mutex
877+
closed bool
878+
}
879+
880+
func (t *tunnelUpdater) Close(ctx context.Context) error {
881+
t.Lock()
882+
defer t.Unlock()
883+
if t.closed {
884+
select {
885+
case <-ctx.Done():
886+
return ctx.Err()
887+
case <-t.recvLoopDone:
888+
return nil
889+
}
890+
}
891+
t.closed = true
892+
cErr := t.client.Close()
893+
select {
894+
case <-ctx.Done():
895+
return ctx.Err()
896+
case <-t.recvLoopDone:
897+
return cErr
898+
}
899+
}
900+
901+
func (t *tunnelUpdater) Wait() <-chan error {
902+
return t.errChan
903+
}
904+
905+
func (t *tunnelUpdater) recvLoop() {
906+
t.logger.Debug(context.Background(), "tunnel updater recvLoop started")
907+
defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done")
908+
defer close(t.recvLoopDone)
909+
for {
910+
update, err := t.client.Recv()
911+
if err != nil {
912+
t.logger.Debug(context.Background(), "failed to receive workspace Update", slog.Error(err))
913+
select {
914+
case t.errChan <- err:
915+
default:
916+
}
917+
return
918+
}
919+
t.logger.Debug(context.Background(), "got workspace update",
920+
slog.F("workspace_update", update),
921+
)
922+
err = t.handleUpdate(update)
923+
if err != nil {
924+
t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err))
925+
cErr := t.client.Close()
926+
if cErr != nil {
927+
t.logger.Warn(context.Background(), "failed to close client", slog.Error(cErr))
928+
}
929+
select {
930+
case t.errChan <- err:
931+
default:
932+
}
933+
return
934+
}
935+
}
936+
}
937+
938+
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
939+
for _, uw := range update.UpsertedWorkspaces {
940+
workspaceID, err := uuid.FromBytes(uw.Id)
941+
if err != nil {
942+
return xerrors.Errorf("failed to parse workspace ID: %w", err)
943+
}
944+
w := workspace{
945+
id: workspaceID,
946+
name: uw.Name,
947+
agents: make(map[uuid.UUID]agent),
948+
}
949+
t.upsertWorkspace(w)
950+
}
951+
952+
// delete agents before deleting workspaces, since the agents have workspace ID references
953+
for _, da := range update.DeletedAgents {
954+
agentID, err := uuid.FromBytes(da.Id)
955+
if err != nil {
956+
return xerrors.Errorf("failed to parse agent ID: %w", err)
957+
}
958+
workspaceID, err := uuid.FromBytes(da.WorkspaceId)
959+
if err != nil {
960+
return xerrors.Errorf("failed to parse workspace ID: %w", err)
961+
}
962+
err = t.deleteAgent(workspaceID, agentID)
963+
if err != nil {
964+
return xerrors.Errorf("failed to delete agent: %w", err)
965+
}
966+
}
967+
for _, dw := range update.DeletedWorkspaces {
968+
workspaceID, err := uuid.FromBytes(dw.Id)
969+
if err != nil {
970+
return xerrors.Errorf("failed to parse workspace ID: %w", err)
971+
}
972+
t.deleteWorkspace(workspaceID)
973+
}
974+
975+
// upsert agents last, after all workspaces have been added and deleted, since agents reference
976+
// workspace ID.
977+
for _, ua := range update.UpsertedAgents {
978+
agentID, err := uuid.FromBytes(ua.Id)
979+
if err != nil {
980+
return xerrors.Errorf("failed to parse agent ID: %w", err)
981+
}
982+
workspaceID, err := uuid.FromBytes(ua.WorkspaceId)
983+
if err != nil {
984+
return xerrors.Errorf("failed to parse workspace ID: %w", err)
985+
}
986+
a := agent{name: ua.Name, id: agentID}
987+
err = t.upsertAgent(workspaceID, a)
988+
if err != nil {
989+
return xerrors.Errorf("failed to upsert agent: %w", err)
990+
}
991+
}
992+
allAgents := t.allAgentIDs()
993+
t.coordCtrl.SyncDestinations(allAgents)
994+
return nil
995+
}
996+
997+
func (t *tunnelUpdater) upsertWorkspace(w workspace) {
998+
old, ok := t.workspaces[w.id]
999+
if !ok {
1000+
t.workspaces[w.id] = &w
1001+
return
1002+
}
1003+
old.name = w.name
1004+
}
1005+
1006+
func (t *tunnelUpdater) deleteWorkspace(id uuid.UUID) {
1007+
delete(t.workspaces, id)
1008+
}
1009+
1010+
func (t *tunnelUpdater) upsertAgent(workspaceID uuid.UUID, a agent) error {
1011+
w, ok := t.workspaces[workspaceID]
1012+
if !ok {
1013+
return xerrors.Errorf("workspace %s not found", workspaceID)
1014+
}
1015+
w.agents[a.id] = a
1016+
return nil
1017+
}
1018+
1019+
func (t *tunnelUpdater) deleteAgent(workspaceID, id uuid.UUID) error {
1020+
w, ok := t.workspaces[workspaceID]
1021+
if !ok {
1022+
return xerrors.Errorf("workspace %s not found", workspaceID)
1023+
}
1024+
delete(w.agents, id)
1025+
return nil
1026+
}
1027+
1028+
func (t *tunnelUpdater) allAgentIDs() []uuid.UUID {
1029+
out := make([]uuid.UUID, 0, len(t.workspaces))
1030+
for _, w := range t.workspaces {
1031+
for id := range w.agents {
1032+
out = append(out, id)
1033+
}
1034+
}
1035+
return out
1036+
}
1037+
1038+
func NewTunnelAllWorkspaceUpdatesController(
1039+
logger slog.Logger, c *TunnelSrcCoordController,
1040+
) WorkspaceUpdatesController {
1041+
return &tunnelAllWorkspaceUpdatesController{logger: logger, coordCtrl: c}
1042+
}
1043+
8251044
// NewController creates a new Controller without running it
8261045
func NewController(logger slog.Logger, dialer ControlProtocolDialer, opts ...ControllerOpt) *Controller {
8271046
c := &Controller{

0 commit comments

Comments
 (0)