@@ -38,15 +38,18 @@ import (
38
38
"github.com/coder/coder/v2/agent/agentscripts"
39
39
"github.com/coder/coder/v2/agent/agentssh"
40
40
"github.com/coder/coder/v2/agent/proto"
41
+ "github.com/coder/coder/v2/agent/proto/resourcesmonitor"
41
42
"github.com/coder/coder/v2/agent/reconnectingpty"
42
43
"github.com/coder/coder/v2/buildinfo"
44
+ "github.com/coder/coder/v2/cli/clistat"
43
45
"github.com/coder/coder/v2/cli/gitauth"
44
46
"github.com/coder/coder/v2/coderd/database/dbtime"
45
47
"github.com/coder/coder/v2/codersdk"
46
48
"github.com/coder/coder/v2/codersdk/agentsdk"
47
49
"github.com/coder/coder/v2/codersdk/workspacesdk"
48
50
"github.com/coder/coder/v2/tailnet"
49
51
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
52
+ "github.com/coder/quartz"
50
53
"github.com/coder/retry"
51
54
)
52
55
@@ -87,8 +90,8 @@ type Options struct {
87
90
}
88
91
89
92
type Client interface {
90
- ConnectRPC23 (ctx context.Context ) (
91
- proto.DRPCAgentClient23 , tailnetproto.DRPCTailnetClient23 , error ,
93
+ ConnectRPC24 (ctx context.Context ) (
94
+ proto.DRPCAgentClient24 , tailnetproto.DRPCTailnetClient24 , error ,
92
95
)
93
96
RewriteDERPMap (derpMap * tailcfg.DERPMap )
94
97
}
@@ -406,7 +409,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
406
409
fn ()
407
410
}
408
411
409
- func (a * agent ) reportMetadata (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
412
+ func (a * agent ) reportMetadata (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
410
413
tickerDone := make (chan struct {})
411
414
collectDone := make (chan struct {})
412
415
ctx , cancel := context .WithCancel (ctx )
@@ -622,7 +625,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23
622
625
623
626
// reportLifecycle reports the current lifecycle state once. All state
624
627
// changes are reported in order.
625
- func (a * agent ) reportLifecycle (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
628
+ func (a * agent ) reportLifecycle (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
626
629
for {
627
630
select {
628
631
case <- a .lifecycleUpdate :
@@ -704,7 +707,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
704
707
// fetchServiceBannerLoop fetches the service banner on an interval. It will
705
708
// not be fetched immediately; the expectation is that it is primed elsewhere
706
709
// (and must be done before the session actually starts).
707
- func (a * agent ) fetchServiceBannerLoop (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
710
+ func (a * agent ) fetchServiceBannerLoop (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
708
711
ticker := time .NewTicker (a .announcementBannersRefreshInterval )
709
712
defer ticker .Stop ()
710
713
for {
@@ -740,7 +743,7 @@ func (a *agent) run() (retErr error) {
740
743
a .sessionToken .Store (& sessionToken )
741
744
742
745
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
743
- aAPI , tAPI , err := a .client .ConnectRPC23 (a .hardCtx )
746
+ aAPI , tAPI , err := a .client .ConnectRPC24 (a .hardCtx )
744
747
if err != nil {
745
748
return err
746
749
}
@@ -757,7 +760,7 @@ func (a *agent) run() (retErr error) {
757
760
connMan := newAPIConnRoutineManager (a .gracefulCtx , a .hardCtx , a .logger , aAPI , tAPI )
758
761
759
762
connMan .startAgentAPI ("init notification banners" , gracefulShutdownBehaviorStop ,
760
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
763
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
761
764
bannersProto , err := aAPI .GetAnnouncementBanners (ctx , & proto.GetAnnouncementBannersRequest {})
762
765
if err != nil {
763
766
return xerrors .Errorf ("fetch service banner: %w" , err )
@@ -774,7 +777,7 @@ func (a *agent) run() (retErr error) {
774
777
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
775
778
// shutdown scripts.
776
779
connMan .startAgentAPI ("send logs" , gracefulShutdownBehaviorRemain ,
777
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
780
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
778
781
err := a .logSender .SendLoop (ctx , aAPI )
779
782
if xerrors .Is (err , agentsdk .LogLimitExceededError ) {
780
783
// we don't want this error to tear down the API connection and propagate to the
@@ -792,6 +795,25 @@ func (a *agent) run() (retErr error) {
792
795
// metadata reporting can cease as soon as we start gracefully shutting down
793
796
connMan .startAgentAPI ("report metadata" , gracefulShutdownBehaviorStop , a .reportMetadata )
794
797
798
+ // resources monitor can cease as soon as we start gracefully shutting down.
799
+ connMan .startAgentAPI ("resources monitor" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
800
+ logger := a .logger .Named ("resources_monitor" )
801
+ clk := quartz .NewReal ()
802
+ config , err := aAPI .GetResourcesMonitoringConfiguration (ctx , & proto.GetResourcesMonitoringConfigurationRequest {})
803
+ if err != nil {
804
+ return xerrors .Errorf ("failed to get resources monitoring configuration: %w" , err )
805
+ }
806
+
807
+ statfetcher , err := clistat .New ()
808
+ if err != nil {
809
+ return xerrors .Errorf ("failed to create resources fetcher: %w" , err )
810
+ }
811
+ resourcesFetcher := resourcesmonitor .NewFetcher (statfetcher )
812
+
813
+ resourcesmonitor := resourcesmonitor .NewResourcesMonitor (logger , clk , config , resourcesFetcher , aAPI )
814
+ return resourcesmonitor .Start (ctx )
815
+ })
816
+
795
817
// channels to sync goroutines below
796
818
// handle manifest
797
819
// |
@@ -814,7 +836,7 @@ func (a *agent) run() (retErr error) {
814
836
connMan .startAgentAPI ("handle manifest" , gracefulShutdownBehaviorStop , a .handleManifest (manifestOK ))
815
837
816
838
connMan .startAgentAPI ("app health reporter" , gracefulShutdownBehaviorStop ,
817
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
839
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
818
840
if err := manifestOK .wait (ctx ); err != nil {
819
841
return xerrors .Errorf ("no manifest: %w" , err )
820
842
}
@@ -829,7 +851,7 @@ func (a *agent) run() (retErr error) {
829
851
a .createOrUpdateNetwork (manifestOK , networkOK ))
830
852
831
853
connMan .startTailnetAPI ("coordination" , gracefulShutdownBehaviorStop ,
832
- func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient23 ) error {
854
+ func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient24 ) error {
833
855
if err := networkOK .wait (ctx ); err != nil {
834
856
return xerrors .Errorf ("no network: %w" , err )
835
857
}
@@ -838,7 +860,7 @@ func (a *agent) run() (retErr error) {
838
860
)
839
861
840
862
connMan .startTailnetAPI ("derp map subscriber" , gracefulShutdownBehaviorStop ,
841
- func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient23 ) error {
863
+ func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient24 ) error {
842
864
if err := networkOK .wait (ctx ); err != nil {
843
865
return xerrors .Errorf ("no network: %w" , err )
844
866
}
@@ -847,7 +869,7 @@ func (a *agent) run() (retErr error) {
847
869
848
870
connMan .startAgentAPI ("fetch service banner loop" , gracefulShutdownBehaviorStop , a .fetchServiceBannerLoop )
849
871
850
- connMan .startAgentAPI ("stats report loop" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
872
+ connMan .startAgentAPI ("stats report loop" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
851
873
if err := networkOK .wait (ctx ); err != nil {
852
874
return xerrors .Errorf ("no network: %w" , err )
853
875
}
@@ -858,8 +880,8 @@ func (a *agent) run() (retErr error) {
858
880
}
859
881
860
882
// handleManifest returns a function that fetches and processes the manifest
861
- func (a * agent ) handleManifest (manifestOK * checkpoint ) func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
862
- return func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
883
+ func (a * agent ) handleManifest (manifestOK * checkpoint ) func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
884
+ return func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
863
885
var (
864
886
sentResult = false
865
887
err error
@@ -968,8 +990,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
968
990
969
991
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
970
992
// the tailnet using the information in the manifest
971
- func (a * agent ) createOrUpdateNetwork (manifestOK , networkOK * checkpoint ) func (context.Context , proto.DRPCAgentClient23 ) error {
972
- return func (ctx context.Context , _ proto.DRPCAgentClient23 ) (retErr error ) {
993
+ func (a * agent ) createOrUpdateNetwork (manifestOK , networkOK * checkpoint ) func (context.Context , proto.DRPCAgentClient24 ) error {
994
+ return func (ctx context.Context , _ proto.DRPCAgentClient24 ) (retErr error ) {
973
995
if err := manifestOK .wait (ctx ); err != nil {
974
996
return xerrors .Errorf ("no manifest: %w" , err )
975
997
}
@@ -1273,7 +1295,7 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
1273
1295
1274
1296
// runCoordinator runs a coordinator and returns whether a reconnect
1275
1297
// should occur.
1276
- func (a * agent ) runCoordinator (ctx context.Context , tClient tailnetproto.DRPCTailnetClient23 , network * tailnet.Conn ) error {
1298
+ func (a * agent ) runCoordinator (ctx context.Context , tClient tailnetproto.DRPCTailnetClient24 , network * tailnet.Conn ) error {
1277
1299
defer a .logger .Debug (ctx , "disconnected from coordination RPC" )
1278
1300
// we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we
1279
1301
// gracefully shut down.
@@ -1320,7 +1342,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
1320
1342
}
1321
1343
1322
1344
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1323
- func (a * agent ) runDERPMapSubscriber (ctx context.Context , tClient tailnetproto.DRPCTailnetClient23 , network * tailnet.Conn ) error {
1345
+ func (a * agent ) runDERPMapSubscriber (ctx context.Context , tClient tailnetproto.DRPCTailnetClient24 , network * tailnet.Conn ) error {
1324
1346
defer a .logger .Debug (ctx , "disconnected from derp map RPC" )
1325
1347
ctx , cancel := context .WithCancel (ctx )
1326
1348
defer cancel ()
@@ -1690,16 +1712,16 @@ const (
1690
1712
1691
1713
type apiConnRoutineManager struct {
1692
1714
logger slog.Logger
1693
- aAPI proto.DRPCAgentClient23
1694
- tAPI tailnetproto.DRPCTailnetClient23
1715
+ aAPI proto.DRPCAgentClient24
1716
+ tAPI tailnetproto.DRPCTailnetClient24
1695
1717
eg * errgroup.Group
1696
1718
stopCtx context.Context
1697
1719
remainCtx context.Context
1698
1720
}
1699
1721
1700
1722
func newAPIConnRoutineManager (
1701
1723
gracefulCtx , hardCtx context.Context , logger slog.Logger ,
1702
- aAPI proto.DRPCAgentClient23 , tAPI tailnetproto.DRPCTailnetClient23 ,
1724
+ aAPI proto.DRPCAgentClient24 , tAPI tailnetproto.DRPCTailnetClient24 ,
1703
1725
) * apiConnRoutineManager {
1704
1726
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
1705
1727
// exit if the errgroup hits an error, which usually means a problem with the conn.
@@ -1732,7 +1754,7 @@ func newAPIConnRoutineManager(
1732
1754
// but for Tailnet.
1733
1755
func (a * apiConnRoutineManager ) startAgentAPI (
1734
1756
name string , behavior gracefulShutdownBehavior ,
1735
- f func (context.Context , proto.DRPCAgentClient23 ) error ,
1757
+ f func (context.Context , proto.DRPCAgentClient24 ) error ,
1736
1758
) {
1737
1759
logger := a .logger .With (slog .F ("name" , name ))
1738
1760
var ctx context.Context
@@ -1769,7 +1791,7 @@ func (a *apiConnRoutineManager) startAgentAPI(
1769
1791
// but for the Agent API.
1770
1792
func (a * apiConnRoutineManager ) startTailnetAPI (
1771
1793
name string , behavior gracefulShutdownBehavior ,
1772
- f func (context.Context , tailnetproto.DRPCTailnetClient23 ) error ,
1794
+ f func (context.Context , tailnetproto.DRPCTailnetClient24 ) error ,
1773
1795
) {
1774
1796
logger := a .logger .With (slog .F ("name" , name ))
1775
1797
var ctx context.Context
0 commit comments