@@ -41,6 +41,7 @@ import (
41
41
"github.com/coder/coder/v2/agent/agentproc"
42
42
"github.com/coder/coder/v2/agent/agentscripts"
43
43
"github.com/coder/coder/v2/agent/agentssh"
44
+ "github.com/coder/coder/v2/agent/proto"
44
45
"github.com/coder/coder/v2/agent/reconnectingpty"
45
46
"github.com/coder/coder/v2/buildinfo"
46
47
"github.com/coder/coder/v2/cli/gitauth"
@@ -87,15 +88,12 @@ type Options struct {
87
88
}
88
89
89
90
type Client interface {
90
- Manifest (ctx context.Context ) (agentsdk.Manifest , error )
91
91
Listen (ctx context.Context ) (drpc.Conn , error )
92
92
ReportStats (ctx context.Context , log slog.Logger , statsChan <- chan * agentsdk.Stats , setInterval func (time.Duration )) (io.Closer , error )
93
93
PostLifecycle (ctx context.Context , state agentsdk.PostLifecycleRequest ) error
94
- PostAppHealth (ctx context.Context , req agentsdk.PostAppHealthsRequest ) error
95
- PostStartup (ctx context.Context , req agentsdk.PostStartupRequest ) error
96
94
PostMetadata (ctx context.Context , req agentsdk.PostMetadataRequest ) error
97
95
PatchLogs (ctx context.Context , req agentsdk.PatchLogs ) error
98
- GetServiceBanner ( ctx context. Context ) (codersdk. ServiceBannerConfig , error )
96
+ RewriteDERPMap ( derpMap * tailcfg. DERPMap )
99
97
}
100
98
101
99
type Agent interface {
@@ -269,7 +267,6 @@ func (a *agent) init(ctx context.Context) {
269
267
func (a * agent ) runLoop (ctx context.Context ) {
270
268
go a .reportLifecycleLoop (ctx )
271
269
go a .reportMetadataLoop (ctx )
272
- go a .fetchServiceBannerLoop (ctx )
273
270
go a .manageProcessPriorityLoop (ctx )
274
271
275
272
for retrier := retry .New (100 * time .Millisecond , 10 * time .Second ); retrier .Wait (ctx ); {
@@ -662,22 +659,23 @@ func (a *agent) setLifecycle(ctx context.Context, state codersdk.WorkspaceAgentL
662
659
// fetchServiceBannerLoop fetches the service banner on an interval. It will
663
660
// not be fetched immediately; the expectation is that it is primed elsewhere
664
661
// (and must be done before the session actually starts).
665
- func (a * agent ) fetchServiceBannerLoop (ctx context.Context ) {
662
+ func (a * agent ) fetchServiceBannerLoop (ctx context.Context , aAPI proto. DRPCAgentClient ) error {
666
663
ticker := time .NewTicker (a .serviceBannerRefreshInterval )
667
664
defer ticker .Stop ()
668
665
for {
669
666
select {
670
667
case <- ctx .Done ():
671
- return
668
+ return ctx . Err ()
672
669
case <- ticker .C :
673
- serviceBanner , err := a . client . GetServiceBanner (ctx )
670
+ sbp , err := aAPI . GetServiceBanner (ctx , & proto. GetServiceBannerRequest {} )
674
671
if err != nil {
675
672
if ctx .Err () != nil {
676
- return
673
+ return ctx . Err ()
677
674
}
678
675
a .logger .Error (ctx , "failed to update service banner" , slog .Error (err ))
679
- continue
676
+ return err
680
677
}
678
+ serviceBanner := agentsdk .ServiceBannerFromProto (sbp )
681
679
a .serviceBanner .Store (& serviceBanner )
682
680
}
683
681
}
@@ -693,21 +691,40 @@ func (a *agent) run(ctx context.Context) error {
693
691
}
694
692
a .sessionToken .Store (& sessionToken )
695
693
696
- serviceBanner , err := a .client .GetServiceBanner (ctx )
694
+ // Listen returns the dRPC connection we use for the Agent v2+ API
695
+ conn , err := a .client .Listen (ctx )
696
+ if err != nil {
697
+ return err
698
+ }
699
+ defer func () {
700
+ cErr := conn .Close ()
701
+ if cErr != nil {
702
+ a .logger .Debug (ctx , "error closing drpc connection" , slog .Error (err ))
703
+ }
704
+ }()
705
+
706
+ aAPI := proto .NewDRPCAgentClient (conn )
707
+ sbp , err := aAPI .GetServiceBanner (ctx , & proto.GetServiceBannerRequest {})
697
708
if err != nil {
698
709
return xerrors .Errorf ("fetch service banner: %w" , err )
699
710
}
711
+ serviceBanner := agentsdk .ServiceBannerFromProto (sbp )
700
712
a .serviceBanner .Store (& serviceBanner )
701
713
702
- manifest , err := a . client . Manifest (ctx )
714
+ mp , err := aAPI . GetManifest (ctx , & proto. GetManifestRequest {} )
703
715
if err != nil {
704
716
return xerrors .Errorf ("fetch metadata: %w" , err )
705
717
}
706
- a .logger .Info (ctx , "fetched manifest" , slog .F ("manifest" , manifest ))
707
-
718
+ a .logger .Info (ctx , "fetched manifest" , slog .F ("manifest" , mp ))
719
+ manifest , err := agentsdk .ManifestFromProto (mp )
720
+ if err != nil {
721
+ a .logger .Critical (ctx , "failed to convert manifest" , slog .F ("manifest" , mp ), slog .Error (err ))
722
+ return xerrors .Errorf ("convert manifest: %w" , err )
723
+ }
708
724
if manifest .AgentID == uuid .Nil {
709
725
return xerrors .New ("nil agentID returned by manifest" )
710
726
}
727
+ a .client .RewriteDERPMap (manifest .DERPMap )
711
728
712
729
// Expand the directory and send it back to coderd so external
713
730
// applications that rely on the directory can use it.
@@ -718,13 +735,18 @@ func (a *agent) run(ctx context.Context) error {
718
735
if err != nil {
719
736
return xerrors .Errorf ("expand directory: %w" , err )
720
737
}
721
- err = a .client .PostStartup (ctx , agentsdk.PostStartupRequest {
738
+ subsys , err := agentsdk .ProtoFromSubsystems (a .subsystems )
739
+ if err != nil {
740
+ a .logger .Critical (ctx , "failed to convert subsystems" , slog .Error (err ))
741
+ return xerrors .Errorf ("failed to convert subsystems: %w" , err )
742
+ }
743
+ _ , err = aAPI .UpdateStartup (ctx , & proto.UpdateStartupRequest {Startup : & proto.Startup {
722
744
Version : buildinfo .Version (),
723
745
ExpandedDirectory : manifest .Directory ,
724
- Subsystems : a . subsystems ,
725
- })
746
+ Subsystems : subsys ,
747
+ }} )
726
748
if err != nil {
727
- return xerrors .Errorf ("update workspace agent version : %w" , err )
749
+ return xerrors .Errorf ("update workspace agent startup : %w" , err )
728
750
}
729
751
730
752
oldManifest := a .manifest .Swap (& manifest )
@@ -785,7 +807,7 @@ func (a *agent) run(ctx context.Context) error {
785
807
appReporterCtx , appReporterCtxCancel := context .WithCancel (ctx )
786
808
defer appReporterCtxCancel ()
787
809
go NewWorkspaceAppHealthReporter (
788
- a .logger , manifest .Apps , a . client . PostAppHealth )(appReporterCtx )
810
+ a .logger , manifest .Apps , agentsdk . AppHealthPoster ( aAPI ) )(appReporterCtx )
789
811
790
812
a .closeMutex .Lock ()
791
813
network := a .network
@@ -821,18 +843,6 @@ func (a *agent) run(ctx context.Context) error {
821
843
network .SetBlockEndpoints (manifest .DisableDirectConnections )
822
844
}
823
845
824
- // Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
825
- conn , err := a .client .Listen (ctx )
826
- if err != nil {
827
- return err
828
- }
829
- defer func () {
830
- cErr := conn .Close ()
831
- if cErr != nil {
832
- a .logger .Debug (ctx , "error closing drpc connection" , slog .Error (err ))
833
- }
834
- }()
835
-
836
846
eg , egCtx := errgroup .WithContext (ctx )
837
847
eg .Go (func () error {
838
848
a .logger .Debug (egCtx , "running tailnet connection coordinator" )
@@ -852,6 +862,15 @@ func (a *agent) run(ctx context.Context) error {
852
862
return nil
853
863
})
854
864
865
+ eg .Go (func () error {
866
+ a .logger .Debug (egCtx , "running fetch server banner loop" )
867
+ err := a .fetchServiceBannerLoop (egCtx , aAPI )
868
+ if err != nil {
869
+ return xerrors .Errorf ("fetch server banner loop: %w" , err )
870
+ }
871
+ return nil
872
+ })
873
+
855
874
return eg .Wait ()
856
875
}
857
876
@@ -1113,6 +1132,7 @@ func (a *agent) runDERPMapSubscriber(ctx context.Context, conn drpc.Conn, networ
1113
1132
return xerrors .Errorf ("recv DERPMap error: %w" , err )
1114
1133
}
1115
1134
dm := tailnet .DERPMapFromProto (dmp )
1135
+ a .client .RewriteDERPMap (dm )
1116
1136
network .SetDERPMap (dm )
1117
1137
}
1118
1138
}
0 commit comments