@@ -501,8 +501,8 @@ func (a *agent) reportMetadata(ctx context.Context, conn drpc.Conn) error {
501
501
// mutex logic and overloading the API.
502
502
for _ , md := range manifest .Metadata {
503
503
md := md
504
- // We send the result to the channel in the goroutine to avoid
505
- // sending the same result multiple times. So, we don't care about
504
+ // We send the complete to the channel in the goroutine to avoid
505
+ // sending the same complete multiple times. So, we don't care about
506
506
// the return values.
507
507
go flight .Do (md .Key , func () {
508
508
ctx := slog .With (ctx , slog .F ("key" , md .Key ))
@@ -807,56 +807,48 @@ func (a *agent) run() (retErr error) {
807
807
// coordination <--------------------------+
808
808
// derp map subscriber <----------------+
809
809
// stats report loop <---------------+
810
- networkOK := make ( chan struct {} )
811
- manifestOK := make ( chan struct {} )
810
+ networkOK := newCheckpoint ( )
811
+ manifestOK := newCheckpoint ( )
812
812
813
813
connMan .start ("handle manifest" , gracefulShutdownBehaviorStop , a .handleManifest (manifestOK ))
814
814
815
815
connMan .start ("app health reporter" , gracefulShutdownBehaviorStop ,
816
816
func (ctx context.Context , conn drpc.Conn ) error {
817
- select {
818
- case <- ctx .Done ():
819
- return nil
820
- case <- manifestOK :
821
- manifest := a .manifest .Load ()
822
- NewWorkspaceAppHealthReporter (
823
- a .logger , manifest .Apps , agentsdk .AppHealthPoster (proto .NewDRPCAgentClient (conn )),
824
- )(ctx )
825
- return nil
817
+ if err := manifestOK .waitCtx (ctx ); err != nil {
818
+ return xerrors .Errorf ("no manifest: %w" , err )
826
819
}
820
+ manifest := a .manifest .Load ()
821
+ NewWorkspaceAppHealthReporter (
822
+ a .logger , manifest .Apps , agentsdk .AppHealthPoster (proto .NewDRPCAgentClient (conn )),
823
+ )(ctx )
824
+ return nil
827
825
})
828
826
829
827
connMan .start ("create or update network" , gracefulShutdownBehaviorStop ,
830
828
a .createOrUpdateNetwork (manifestOK , networkOK ))
831
829
832
830
connMan .start ("coordination" , gracefulShutdownBehaviorStop ,
833
831
func (ctx context.Context , conn drpc.Conn ) error {
834
- select {
835
- case <- ctx .Done ():
836
- return nil
837
- case <- networkOK :
832
+ if err := networkOK .waitCtx (ctx ); err != nil {
833
+ return xerrors .Errorf ("no network: %w" , err )
838
834
}
839
835
return a .runCoordinator (ctx , conn , a .network )
840
836
},
841
837
)
842
838
843
839
connMan .start ("derp map subscriber" , gracefulShutdownBehaviorStop ,
844
840
func (ctx context.Context , conn drpc.Conn ) error {
845
- select {
846
- case <- ctx .Done ():
847
- return nil
848
- case <- networkOK :
841
+ if err := networkOK .waitCtx (ctx ); err != nil {
842
+ return xerrors .Errorf ("no network: %w" , err )
849
843
}
850
844
return a .runDERPMapSubscriber (ctx , conn , a .network )
851
845
})
852
846
853
847
connMan .start ("fetch service banner loop" , gracefulShutdownBehaviorStop , a .fetchServiceBannerLoop )
854
848
855
849
connMan .start ("stats report loop" , gracefulShutdownBehaviorStop , func (ctx context.Context , conn drpc.Conn ) error {
856
- select {
857
- case <- ctx .Done ():
858
- return nil
859
- case <- networkOK :
850
+ if networkOK .waitCtx (ctx ); err != nil {
851
+ return xerrors .Errorf ("no network: %w" , err )
860
852
}
861
853
return a .statsReporter .reportLoop (ctx , proto .NewDRPCAgentClient (conn ))
862
854
})
@@ -865,8 +857,17 @@ func (a *agent) run() (retErr error) {
865
857
}
866
858
867
859
// handleManifest returns a function that fetches and processes the manifest
868
- func (a * agent ) handleManifest (manifestOK chan <- struct {} ) func (ctx context.Context , conn drpc.Conn ) error {
860
+ func (a * agent ) handleManifest (manifestOK * checkpoint ) func (ctx context.Context , conn drpc.Conn ) error {
869
861
return func (ctx context.Context , conn drpc.Conn ) error {
862
+ var (
863
+ sentResult = false
864
+ err error
865
+ )
866
+ defer func () {
867
+ if ! sentResult {
868
+ manifestOK .complete (err )
869
+ }
870
+ }()
870
871
aAPI := proto .NewDRPCAgentClient (conn )
871
872
mp , err := aAPI .GetManifest (ctx , & proto.GetManifestRequest {})
872
873
if err != nil {
@@ -907,7 +908,8 @@ func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Cont
907
908
}
908
909
909
910
oldManifest := a .manifest .Swap (& manifest )
910
- close (manifestOK )
911
+ manifestOK .complete (nil )
912
+ sentResult = true
911
913
912
914
// The startup script should only execute on the first run!
913
915
if oldManifest == nil {
@@ -968,14 +970,15 @@ func (a *agent) handleManifest(manifestOK chan<- struct{}) func(ctx context.Cont
968
970
969
971
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
970
972
// the tailnet using the information in the manifest
971
- func (a * agent ) createOrUpdateNetwork (manifestOK <- chan struct {}, networkOK chan <- struct {}) func (context.Context , drpc.Conn ) error {
972
- return func (ctx context.Context , _ drpc.Conn ) error {
973
- select {
974
- case <- ctx .Done ():
975
- return nil
976
- case <- manifestOK :
973
+ func (a * agent ) createOrUpdateNetwork (manifestOK , networkOK * checkpoint ) func (context.Context , drpc.Conn ) error {
974
+ return func (ctx context.Context , _ drpc.Conn ) (retErr error ) {
975
+ if err := manifestOK .waitCtx (ctx ); err != nil {
976
+ return xerrors .Errorf ("no manifest: %w" , err )
977
977
}
978
978
var err error
979
+ defer func () {
980
+ networkOK .complete (retErr )
981
+ }()
979
982
manifest := a .manifest .Load ()
980
983
a .closeMutex .Lock ()
981
984
network := a .network
@@ -1011,7 +1014,6 @@ func (a *agent) createOrUpdateNetwork(manifestOK <-chan struct{}, networkOK chan
1011
1014
network .SetDERPForceWebSockets (manifest .DERPForceWebSockets )
1012
1015
network .SetBlockEndpoints (manifest .DisableDirectConnections )
1013
1016
}
1014
- close (networkOK )
1015
1017
return nil
1016
1018
}
1017
1019
}
0 commit comments