@@ -41,15 +41,18 @@ import (
41
41
"github.com/coder/coder/v2/agent/agentscripts"
42
42
"github.com/coder/coder/v2/agent/agentssh"
43
43
"github.com/coder/coder/v2/agent/proto"
44
+ "github.com/coder/coder/v2/agent/proto/resourcesmonitor"
44
45
"github.com/coder/coder/v2/agent/reconnectingpty"
45
46
"github.com/coder/coder/v2/buildinfo"
47
+ "github.com/coder/coder/v2/cli/clistat"
46
48
"github.com/coder/coder/v2/cli/gitauth"
47
49
"github.com/coder/coder/v2/coderd/database/dbtime"
48
50
"github.com/coder/coder/v2/codersdk"
49
51
"github.com/coder/coder/v2/codersdk/agentsdk"
50
52
"github.com/coder/coder/v2/codersdk/workspacesdk"
51
53
"github.com/coder/coder/v2/tailnet"
52
54
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
55
+ "github.com/coder/quartz"
53
56
)
54
57
55
58
const (
@@ -89,8 +92,8 @@ type Options struct {
89
92
}
90
93
91
94
type Client interface {
92
- ConnectRPC23 (ctx context.Context ) (
93
- proto.DRPCAgentClient23 , tailnetproto.DRPCTailnetClient23 , error ,
95
+ ConnectRPC24 (ctx context.Context ) (
96
+ proto.DRPCAgentClient24 , tailnetproto.DRPCTailnetClient24 , error ,
94
97
)
95
98
RewriteDERPMap (derpMap * tailcfg.DERPMap )
96
99
}
@@ -410,7 +413,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
410
413
fn ()
411
414
}
412
415
413
- func (a * agent ) reportMetadata (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
416
+ func (a * agent ) reportMetadata (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
414
417
tickerDone := make (chan struct {})
415
418
collectDone := make (chan struct {})
416
419
ctx , cancel := context .WithCancel (ctx )
@@ -626,7 +629,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23
626
629
627
630
// reportLifecycle reports the current lifecycle state once. All state
628
631
// changes are reported in order.
629
- func (a * agent ) reportLifecycle (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
632
+ func (a * agent ) reportLifecycle (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
630
633
for {
631
634
select {
632
635
case <- a .lifecycleUpdate :
@@ -708,7 +711,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
708
711
// fetchServiceBannerLoop fetches the service banner on an interval. It will
709
712
// not be fetched immediately; the expectation is that it is primed elsewhere
710
713
// (and must be done before the session actually starts).
711
- func (a * agent ) fetchServiceBannerLoop (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
714
+ func (a * agent ) fetchServiceBannerLoop (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
712
715
ticker := time .NewTicker (a .announcementBannersRefreshInterval )
713
716
defer ticker .Stop ()
714
717
for {
@@ -744,7 +747,7 @@ func (a *agent) run() (retErr error) {
744
747
a .sessionToken .Store (& sessionToken )
745
748
746
749
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
747
- aAPI , tAPI , err := a .client .ConnectRPC23 (a .hardCtx )
750
+ aAPI , tAPI , err := a .client .ConnectRPC24 (a .hardCtx )
748
751
if err != nil {
749
752
return err
750
753
}
@@ -761,7 +764,7 @@ func (a *agent) run() (retErr error) {
761
764
connMan := newAPIConnRoutineManager (a .gracefulCtx , a .hardCtx , a .logger , aAPI , tAPI )
762
765
763
766
connMan .startAgentAPI ("init notification banners" , gracefulShutdownBehaviorStop ,
764
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
767
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
765
768
bannersProto , err := aAPI .GetAnnouncementBanners (ctx , & proto.GetAnnouncementBannersRequest {})
766
769
if err != nil {
767
770
return xerrors .Errorf ("fetch service banner: %w" , err )
@@ -778,7 +781,7 @@ func (a *agent) run() (retErr error) {
778
781
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
779
782
// shutdown scripts.
780
783
connMan .startAgentAPI ("send logs" , gracefulShutdownBehaviorRemain ,
781
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
784
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
782
785
err := a .logSender .SendLoop (ctx , aAPI )
783
786
if xerrors .Is (err , agentsdk .LogLimitExceededError ) {
784
787
// we don't want this error to tear down the API connection and propagate to the
@@ -796,6 +799,25 @@ func (a *agent) run() (retErr error) {
796
799
// metadata reporting can cease as soon as we start gracefully shutting down
797
800
connMan .startAgentAPI ("report metadata" , gracefulShutdownBehaviorStop , a .reportMetadata )
798
801
802
+ // resources monitor can cease as soon as we start gracefully shutting down.
803
+ connMan .startAgentAPI ("resources monitor" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
804
+ logger := a .logger .Named ("resources_monitor" )
805
+ clk := quartz .NewReal ()
806
+ config , err := aAPI .GetResourcesMonitoringConfiguration (ctx , & proto.GetResourcesMonitoringConfigurationRequest {})
807
+ if err != nil {
808
+ return xerrors .Errorf ("failed to get resources monitoring configuration: %w" , err )
809
+ }
810
+
811
+ statfetcher , err := clistat .New ()
812
+ if err != nil {
813
+ return xerrors .Errorf ("failed to create resources fetcher: %w" , err )
814
+ }
815
+ resourcesFetcher := resourcesmonitor .NewFetcher (statfetcher )
816
+
817
+ resourcesmonitor := resourcesmonitor .NewResourcesMonitor (logger , clk , config , resourcesFetcher , aAPI )
818
+ return resourcesmonitor .Start (ctx )
819
+ })
820
+
799
821
// channels to sync goroutines below
800
822
// handle manifest
801
823
// |
@@ -818,7 +840,7 @@ func (a *agent) run() (retErr error) {
818
840
connMan .startAgentAPI ("handle manifest" , gracefulShutdownBehaviorStop , a .handleManifest (manifestOK ))
819
841
820
842
connMan .startAgentAPI ("app health reporter" , gracefulShutdownBehaviorStop ,
821
- func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
843
+ func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
822
844
if err := manifestOK .wait (ctx ); err != nil {
823
845
return xerrors .Errorf ("no manifest: %w" , err )
824
846
}
@@ -833,7 +855,7 @@ func (a *agent) run() (retErr error) {
833
855
a .createOrUpdateNetwork (manifestOK , networkOK ))
834
856
835
857
connMan .startTailnetAPI ("coordination" , gracefulShutdownBehaviorStop ,
836
- func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient23 ) error {
858
+ func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient24 ) error {
837
859
if err := networkOK .wait (ctx ); err != nil {
838
860
return xerrors .Errorf ("no network: %w" , err )
839
861
}
@@ -842,7 +864,7 @@ func (a *agent) run() (retErr error) {
842
864
)
843
865
844
866
connMan .startTailnetAPI ("derp map subscriber" , gracefulShutdownBehaviorStop ,
845
- func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient23 ) error {
867
+ func (ctx context.Context , tAPI tailnetproto.DRPCTailnetClient24 ) error {
846
868
if err := networkOK .wait (ctx ); err != nil {
847
869
return xerrors .Errorf ("no network: %w" , err )
848
870
}
@@ -851,7 +873,7 @@ func (a *agent) run() (retErr error) {
851
873
852
874
connMan .startAgentAPI ("fetch service banner loop" , gracefulShutdownBehaviorStop , a .fetchServiceBannerLoop )
853
875
854
- connMan .startAgentAPI ("stats report loop" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
876
+ connMan .startAgentAPI ("stats report loop" , gracefulShutdownBehaviorStop , func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
855
877
if err := networkOK .wait (ctx ); err != nil {
856
878
return xerrors .Errorf ("no network: %w" , err )
857
879
}
@@ -864,8 +886,8 @@ func (a *agent) run() (retErr error) {
864
886
}
865
887
866
888
// handleManifest returns a function that fetches and processes the manifest
867
- func (a * agent ) handleManifest (manifestOK * checkpoint ) func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
868
- return func (ctx context.Context , aAPI proto.DRPCAgentClient23 ) error {
889
+ func (a * agent ) handleManifest (manifestOK * checkpoint ) func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
890
+ return func (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
869
891
var (
870
892
sentResult = false
871
893
err error
@@ -974,8 +996,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
974
996
975
997
// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
976
998
// the tailnet using the information in the manifest
977
- func (a * agent ) createOrUpdateNetwork (manifestOK , networkOK * checkpoint ) func (context.Context , proto.DRPCAgentClient23 ) error {
978
- return func (ctx context.Context , _ proto.DRPCAgentClient23 ) (retErr error ) {
999
+ func (a * agent ) createOrUpdateNetwork (manifestOK , networkOK * checkpoint ) func (context.Context , proto.DRPCAgentClient24 ) error {
1000
+ return func (ctx context.Context , _ proto.DRPCAgentClient24 ) (retErr error ) {
979
1001
if err := manifestOK .wait (ctx ); err != nil {
980
1002
return xerrors .Errorf ("no manifest: %w" , err )
981
1003
}
@@ -1279,7 +1301,7 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t
1279
1301
1280
1302
// runCoordinator runs a coordinator and returns whether a reconnect
1281
1303
// should occur.
1282
- func (a * agent ) runCoordinator (ctx context.Context , tClient tailnetproto.DRPCTailnetClient23 , network * tailnet.Conn ) error {
1304
+ func (a * agent ) runCoordinator (ctx context.Context , tClient tailnetproto.DRPCTailnetClient24 , network * tailnet.Conn ) error {
1283
1305
defer a .logger .Debug (ctx , "disconnected from coordination RPC" )
1284
1306
// we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we
1285
1307
// gracefully shut down.
@@ -1326,7 +1348,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
1326
1348
}
1327
1349
1328
1350
// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
1329
- func (a * agent ) runDERPMapSubscriber (ctx context.Context , tClient tailnetproto.DRPCTailnetClient23 , network * tailnet.Conn ) error {
1351
+ func (a * agent ) runDERPMapSubscriber (ctx context.Context , tClient tailnetproto.DRPCTailnetClient24 , network * tailnet.Conn ) error {
1330
1352
defer a .logger .Debug (ctx , "disconnected from derp map RPC" )
1331
1353
ctx , cancel := context .WithCancel (ctx )
1332
1354
defer cancel ()
@@ -1696,16 +1718,16 @@ const (
1696
1718
1697
1719
type apiConnRoutineManager struct {
1698
1720
logger slog.Logger
1699
- aAPI proto.DRPCAgentClient23
1700
- tAPI tailnetproto.DRPCTailnetClient23
1721
+ aAPI proto.DRPCAgentClient24
1722
+ tAPI tailnetproto.DRPCTailnetClient24
1701
1723
eg * errgroup.Group
1702
1724
stopCtx context.Context
1703
1725
remainCtx context.Context
1704
1726
}
1705
1727
1706
1728
func newAPIConnRoutineManager (
1707
1729
gracefulCtx , hardCtx context.Context , logger slog.Logger ,
1708
- aAPI proto.DRPCAgentClient23 , tAPI tailnetproto.DRPCTailnetClient23 ,
1730
+ aAPI proto.DRPCAgentClient24 , tAPI tailnetproto.DRPCTailnetClient24 ,
1709
1731
) * apiConnRoutineManager {
1710
1732
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
1711
1733
// exit if the errgroup hits an error, which usually means a problem with the conn.
@@ -1738,7 +1760,7 @@ func newAPIConnRoutineManager(
1738
1760
// but for Tailnet.
1739
1761
func (a * apiConnRoutineManager ) startAgentAPI (
1740
1762
name string , behavior gracefulShutdownBehavior ,
1741
- f func (context.Context , proto.DRPCAgentClient23 ) error ,
1763
+ f func (context.Context , proto.DRPCAgentClient24 ) error ,
1742
1764
) {
1743
1765
logger := a .logger .With (slog .F ("name" , name ))
1744
1766
var ctx context.Context
@@ -1775,7 +1797,7 @@ func (a *apiConnRoutineManager) startAgentAPI(
1775
1797
// but for the Agent API.
1776
1798
func (a * apiConnRoutineManager ) startTailnetAPI (
1777
1799
name string , behavior gracefulShutdownBehavior ,
1778
- f func (context.Context , tailnetproto.DRPCTailnetClient23 ) error ,
1800
+ f func (context.Context , tailnetproto.DRPCTailnetClient24 ) error ,
1779
1801
) {
1780
1802
logger := a .logger .With (slog .F ("name" , name ))
1781
1803
var ctx context.Context
0 commit comments