@@ -26,13 +26,14 @@ import (
26
26
// A Controller connects to the tailnet control plane, and then uses the control protocols to
27
27
// program a tailnet.Conn in production (in test it could be an interface simulating the Conn). It
28
28
// delegates this task to sub-controllers responsible for the main areas of the tailnet control
29
- // protocol: coordination, DERP map updates, resume tokens, and telemetry .
29
+ // protocol: coordination, DERP map updates, resume tokens, telemetry, and workspace updates .
30
30
type Controller struct {
31
- Dialer ControlProtocolDialer
32
- CoordCtrl CoordinationController
33
- DERPCtrl DERPController
34
- ResumeTokenCtrl ResumeTokenController
35
- TelemetryCtrl TelemetryController
31
+ Dialer ControlProtocolDialer
32
+ CoordCtrl CoordinationController
33
+ DERPCtrl DERPController
34
+ ResumeTokenCtrl ResumeTokenController
35
+ TelemetryCtrl TelemetryController
36
+ WorkspaceUpdatesCtrl WorkspaceUpdatesController
36
37
37
38
ctx context.Context
38
39
gracefulCtx context.Context
@@ -94,15 +95,25 @@ type TelemetryController interface {
94
95
New (TelemetryClient )
95
96
}
96
97
98
+ type WorkspaceUpdatesClient interface {
99
+ Close () error
100
+ Recv () (* proto.WorkspaceUpdate , error )
101
+ }
102
+
103
+ type WorkspaceUpdatesController interface {
104
+ New (WorkspaceUpdatesClient ) CloserWaiter
105
+ }
106
+
97
107
// ControlProtocolClients represents an abstract interface to the tailnet control plane via a set
98
108
// of protocol clients. The Closer should close all the clients (e.g. by closing the underlying
99
109
// connection).
100
110
type ControlProtocolClients struct {
101
- Closer io.Closer
102
- Coordinator CoordinatorClient
103
- DERP DERPClient
104
- ResumeToken ResumeTokenClient
105
- Telemetry TelemetryClient
111
+ Closer io.Closer
112
+ Coordinator CoordinatorClient
113
+ DERP DERPClient
114
+ ResumeToken ResumeTokenClient
115
+ Telemetry TelemetryClient
116
+ WorkspaceUpdates WorkspaceUpdatesClient
106
117
}
107
118
108
119
type ControlProtocolDialer interface {
@@ -419,6 +430,7 @@ func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) {
419
430
}
420
431
}()
421
432
for dest := range toAdd {
433
+ c .Coordinatee .SetTunnelDestination (dest )
422
434
err = c .coordination .Client .Send (
423
435
& proto.CoordinateRequest {
424
436
AddTunnel : & proto.CoordinateRequest_Tunnel {Id : UUIDToByteSlice (dest )},
@@ -822,6 +834,213 @@ func (r *basicResumeTokenRefresher) refresh() {
822
834
r .timer .Reset (dur , "basicResumeTokenRefresher" , "refresh" )
823
835
}
824
836
837
+ type tunnelAllWorkspaceUpdatesController struct {
838
+ coordCtrl * TunnelSrcCoordController
839
+ logger slog.Logger
840
+ }
841
+
842
+ type workspace struct {
843
+ id uuid.UUID
844
+ name string
845
+ agents map [uuid.UUID ]agent
846
+ }
847
+
848
+ type agent struct {
849
+ id uuid.UUID
850
+ name string
851
+ }
852
+
853
+ func (t * tunnelAllWorkspaceUpdatesController ) New (client WorkspaceUpdatesClient ) CloserWaiter {
854
+ updater := & tunnelUpdater {
855
+ client : client ,
856
+ errChan : make (chan error , 1 ),
857
+ logger : t .logger ,
858
+ coordCtrl : t .coordCtrl ,
859
+ recvLoopDone : make (chan struct {}),
860
+ workspaces : make (map [uuid.UUID ]* workspace ),
861
+ }
862
+ go updater .recvLoop ()
863
+ return updater
864
+ }
865
+
866
+ type tunnelUpdater struct {
867
+ errChan chan error
868
+ logger slog.Logger
869
+ client WorkspaceUpdatesClient
870
+ coordCtrl * TunnelSrcCoordController
871
+ recvLoopDone chan struct {}
872
+
873
+ // don't need the mutex since only manipulated by the recvLoop
874
+ workspaces map [uuid.UUID ]* workspace
875
+
876
+ sync.Mutex
877
+ closed bool
878
+ }
879
+
880
+ func (t * tunnelUpdater ) Close (ctx context.Context ) error {
881
+ t .Lock ()
882
+ defer t .Unlock ()
883
+ if t .closed {
884
+ select {
885
+ case <- ctx .Done ():
886
+ return ctx .Err ()
887
+ case <- t .recvLoopDone :
888
+ return nil
889
+ }
890
+ }
891
+ t .closed = true
892
+ cErr := t .client .Close ()
893
+ select {
894
+ case <- ctx .Done ():
895
+ return ctx .Err ()
896
+ case <- t .recvLoopDone :
897
+ return cErr
898
+ }
899
+ }
900
+
901
+ func (t * tunnelUpdater ) Wait () <- chan error {
902
+ return t .errChan
903
+ }
904
+
905
+ func (t * tunnelUpdater ) recvLoop () {
906
+ t .logger .Debug (context .Background (), "tunnel updater recvLoop started" )
907
+ defer t .logger .Debug (context .Background (), "tunnel updater recvLoop done" )
908
+ defer close (t .recvLoopDone )
909
+ for {
910
+ update , err := t .client .Recv ()
911
+ if err != nil {
912
+ t .logger .Debug (context .Background (), "failed to receive workspace Update" , slog .Error (err ))
913
+ select {
914
+ case t .errChan <- err :
915
+ default :
916
+ }
917
+ return
918
+ }
919
+ t .logger .Debug (context .Background (), "got workspace update" ,
920
+ slog .F ("workspace_update" , update ),
921
+ )
922
+ err = t .handleUpdate (update )
923
+ if err != nil {
924
+ t .logger .Critical (context .Background (), "failed to handle workspace Update" , slog .Error (err ))
925
+ cErr := t .client .Close ()
926
+ if cErr != nil {
927
+ t .logger .Warn (context .Background (), "failed to close client" , slog .Error (cErr ))
928
+ }
929
+ select {
930
+ case t .errChan <- err :
931
+ default :
932
+ }
933
+ return
934
+ }
935
+ }
936
+ }
937
+
938
+ func (t * tunnelUpdater ) handleUpdate (update * proto.WorkspaceUpdate ) error {
939
+ for _ , uw := range update .UpsertedWorkspaces {
940
+ workspaceID , err := uuid .FromBytes (uw .Id )
941
+ if err != nil {
942
+ return xerrors .Errorf ("failed to parse workspace ID: %w" , err )
943
+ }
944
+ w := workspace {
945
+ id : workspaceID ,
946
+ name : uw .Name ,
947
+ agents : make (map [uuid.UUID ]agent ),
948
+ }
949
+ t .upsertWorkspace (w )
950
+ }
951
+
952
+ // delete agents before deleting workspaces, since the agents have workspace ID references
953
+ for _ , da := range update .DeletedAgents {
954
+ agentID , err := uuid .FromBytes (da .Id )
955
+ if err != nil {
956
+ return xerrors .Errorf ("failed to parse agent ID: %w" , err )
957
+ }
958
+ workspaceID , err := uuid .FromBytes (da .WorkspaceId )
959
+ if err != nil {
960
+ return xerrors .Errorf ("failed to parse workspace ID: %w" , err )
961
+ }
962
+ err = t .deleteAgent (workspaceID , agentID )
963
+ if err != nil {
964
+ return xerrors .Errorf ("failed to delete agent: %w" , err )
965
+ }
966
+ }
967
+ for _ , dw := range update .DeletedWorkspaces {
968
+ workspaceID , err := uuid .FromBytes (dw .Id )
969
+ if err != nil {
970
+ return xerrors .Errorf ("failed to parse workspace ID: %w" , err )
971
+ }
972
+ t .deleteWorkspace (workspaceID )
973
+ }
974
+
975
+ // upsert agents last, after all workspaces have been added and deleted, since agents reference
976
+ // workspace ID.
977
+ for _ , ua := range update .UpsertedAgents {
978
+ agentID , err := uuid .FromBytes (ua .Id )
979
+ if err != nil {
980
+ return xerrors .Errorf ("failed to parse agent ID: %w" , err )
981
+ }
982
+ workspaceID , err := uuid .FromBytes (ua .WorkspaceId )
983
+ if err != nil {
984
+ return xerrors .Errorf ("failed to parse workspace ID: %w" , err )
985
+ }
986
+ a := agent {name : ua .Name , id : agentID }
987
+ err = t .upsertAgent (workspaceID , a )
988
+ if err != nil {
989
+ return xerrors .Errorf ("failed to upsert agent: %w" , err )
990
+ }
991
+ }
992
+ allAgents := t .allAgentIDs ()
993
+ t .coordCtrl .SyncDestinations (allAgents )
994
+ return nil
995
+ }
996
+
997
+ func (t * tunnelUpdater ) upsertWorkspace (w workspace ) {
998
+ old , ok := t .workspaces [w .id ]
999
+ if ! ok {
1000
+ t .workspaces [w .id ] = & w
1001
+ return
1002
+ }
1003
+ old .name = w .name
1004
+ }
1005
+
1006
+ func (t * tunnelUpdater ) deleteWorkspace (id uuid.UUID ) {
1007
+ delete (t .workspaces , id )
1008
+ }
1009
+
1010
+ func (t * tunnelUpdater ) upsertAgent (workspaceID uuid.UUID , a agent ) error {
1011
+ w , ok := t .workspaces [workspaceID ]
1012
+ if ! ok {
1013
+ return xerrors .Errorf ("workspace %s not found" , workspaceID )
1014
+ }
1015
+ w .agents [a .id ] = a
1016
+ return nil
1017
+ }
1018
+
1019
+ func (t * tunnelUpdater ) deleteAgent (workspaceID , id uuid.UUID ) error {
1020
+ w , ok := t .workspaces [workspaceID ]
1021
+ if ! ok {
1022
+ return xerrors .Errorf ("workspace %s not found" , workspaceID )
1023
+ }
1024
+ delete (w .agents , id )
1025
+ return nil
1026
+ }
1027
+
1028
+ func (t * tunnelUpdater ) allAgentIDs () []uuid.UUID {
1029
+ out := make ([]uuid.UUID , 0 , len (t .workspaces ))
1030
+ for _ , w := range t .workspaces {
1031
+ for id := range w .agents {
1032
+ out = append (out , id )
1033
+ }
1034
+ }
1035
+ return out
1036
+ }
1037
+
1038
+ func NewTunnelAllWorkspaceUpdatesController (
1039
+ logger slog.Logger , c * TunnelSrcCoordController ,
1040
+ ) WorkspaceUpdatesController {
1041
+ return & tunnelAllWorkspaceUpdatesController {logger : logger , coordCtrl : c }
1042
+ }
1043
+
825
1044
// NewController creates a new Controller without running it
826
1045
func NewController (logger slog.Logger , dialer ControlProtocolDialer , opts ... ControllerOpt ) * Controller {
827
1046
c := & Controller {
0 commit comments