@@ -89,7 +89,6 @@ type Options struct {
89
89
type Client interface {
90
90
Manifest (ctx context.Context ) (agentsdk.Manifest , error )
91
91
Listen (ctx context.Context ) (drpc.Conn , error )
92
- DERPMapUpdates (ctx context.Context ) (<- chan agentsdk.DERPMapUpdate , io.Closer , error )
93
92
ReportStats (ctx context.Context , log slog.Logger , statsChan <- chan * agentsdk.Stats , setInterval func (time.Duration )) (io.Closer , error )
94
93
PostLifecycle (ctx context.Context , state agentsdk.PostLifecycleRequest ) error
95
94
PostAppHealth (ctx context.Context , req agentsdk.PostAppHealthsRequest ) error
@@ -822,10 +821,22 @@ func (a *agent) run(ctx context.Context) error {
822
821
network .SetBlockEndpoints (manifest .DisableDirectConnections )
823
822
}
824
823
824
+ // Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
825
+ conn , err := a .client .Listen (ctx )
826
+ if err != nil {
827
+ return err
828
+ }
829
+ defer func () {
830
+ cErr := conn .Close ()
831
+ if cErr != nil {
832
+ a .logger .Debug (ctx , "error closing drpc connection" , slog .Error (err ))
833
+ }
834
+ }()
835
+
825
836
eg , egCtx := errgroup .WithContext (ctx )
826
837
eg .Go (func () error {
827
838
a .logger .Debug (egCtx , "running tailnet connection coordinator" )
828
- err := a .runCoordinator (egCtx , network )
839
+ err := a .runCoordinator (egCtx , conn , network )
829
840
if err != nil {
830
841
return xerrors .Errorf ("run coordinator: %w" , err )
831
842
}
@@ -834,7 +845,7 @@ func (a *agent) run(ctx context.Context) error {
834
845
835
846
eg .Go (func () error {
836
847
a .logger .Debug (egCtx , "running derp map subscriber" )
837
- err := a .runDERPMapSubscriber (egCtx , network )
848
+ err := a .runDERPMapSubscriber (egCtx , conn , network )
838
849
if err != nil {
839
850
return xerrors .Errorf ("run derp map subscriber: %w" , err )
840
851
}
@@ -1056,21 +1067,8 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
1056
1067
1057
1068
// runCoordinator runs a coordinator and returns whether a reconnect
1058
1069
// should occur.
1059
- func (a * agent ) runCoordinator (ctx context.Context , network * tailnet.Conn ) error {
1060
- ctx , cancel := context .WithCancel (ctx )
1061
- defer cancel ()
1062
-
1063
- conn , err := a .client .Listen (ctx )
1064
- if err != nil {
1065
- return err
1066
- }
1067
- defer func () {
1068
- cErr := conn .Close ()
1069
- if cErr != nil {
1070
- a .logger .Debug (ctx , "error closing drpc connection" , slog .Error (err ))
1071
- }
1072
- }()
1073
-
1070
+ func (a * agent ) runCoordinator (ctx context.Context , conn drpc.Conn , network * tailnet.Conn ) error {
1071
+ defer a .logger .Debug (ctx , "disconnected from coordination RPC" )
1074
1072
tClient := tailnetproto .NewDRPCTailnetClient (conn )
1075
1073
coordinate , err := tClient .Coordinate (ctx )
1076
1074
if err != nil {
@@ -1082,7 +1080,7 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
1082
1080
a .logger .Debug (ctx , "error closing Coordinate client" , slog .Error (err ))
1083
1081
}
1084
1082
}()
1085
- a .logger .Info (ctx , "connected to coordination endpoint " )
1083
+ a .logger .Info (ctx , "connected to coordination RPC " )
1086
1084
coordination := tailnet .NewRemoteCoordination (a .logger , coordinate , network , uuid .Nil )
1087
1085
select {
1088
1086
case <- ctx .Done ():
@@ -1093,30 +1091,29 @@ func (a *agent) runCoordinator(ctx context.Context, network *tailnet.Conn) error
1093
1091
}
1094
1092
1095
1093
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1096
- func (a * agent ) runDERPMapSubscriber (ctx context.Context , network * tailnet.Conn ) error {
1094
+ func (a * agent ) runDERPMapSubscriber (ctx context.Context , conn drpc.Conn , network * tailnet.Conn ) error {
1095
+ defer a .logger .Debug (ctx , "disconnected from derp map RPC" )
1097
1096
ctx , cancel := context .WithCancel (ctx )
1098
1097
defer cancel ()
1099
-
1100
- updates , closer , err := a . client . DERPMapUpdates (ctx )
1098
+ tClient := tailnetproto . NewDRPCTailnetClient ( conn )
1099
+ stream , err := tClient . StreamDERPMaps (ctx , & tailnetproto. StreamDERPMapsRequest {} )
1101
1100
if err != nil {
1102
- return err
1101
+ return xerrors . Errorf ( "stream DERP Maps: %w" , err )
1103
1102
}
1104
- defer closer .Close ()
1105
-
1106
- a .logger .Info (ctx , "connected to derp map endpoint" )
1103
+ defer func () {
1104
+ cErr := stream .Close ()
1105
+ if cErr != nil {
1106
+ a .logger .Debug (ctx , "error closing DERPMap stream" , slog .Error (err ))
1107
+ }
1108
+ }()
1109
+ a .logger .Info (ctx , "connected to derp map RPC" )
1107
1110
for {
1108
- select {
1109
- case <- ctx .Done ():
1110
- return ctx .Err ()
1111
- case update := <- updates :
1112
- if update .Err != nil {
1113
- return update .Err
1114
- }
1115
- if update .DERPMap != nil && ! tailnet .CompareDERPMaps (network .DERPMap (), update .DERPMap ) {
1116
- a .logger .Info (ctx , "updating derp map due to detected changes" )
1117
- network .SetDERPMap (update .DERPMap )
1118
- }
1111
+ dmp , err := stream .Recv ()
1112
+ if err != nil {
1113
+ return xerrors .Errorf ("recv DERPMap error: %w" , err )
1119
1114
}
1115
+ dm := tailnet .DERPMapFromProto (dmp )
1116
+ network .SetDERPMap (dm )
1120
1117
}
1121
1118
}
1122
1119
0 commit comments