Skip to content

feat: integrate agentAPI with resources monitoring logic #16438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9d42cad
work on new agent version
defelmnq Feb 4, 2025
7b2d19e
improve function for resources monitoring
defelmnq Feb 10, 2025
0124d60
add missing files
defelmnq Feb 10, 2025
3661e8c
work on resources monitor tests
defelmnq Feb 11, 2025
a5a788e
apply fmt and lint
defelmnq Feb 11, 2025
91d1515
work on dbauthz tests
defelmnq Feb 11, 2025
0bc7632
work on dbauthz
defelmnq Feb 11, 2025
3085041
work on rbac
defelmnq Feb 11, 2025
120a37b
continue to iterate
defelmnq Feb 11, 2025
dd8ed40
continue to iterate
defelmnq Feb 11, 2025
0a8941b
continue to iterate
defelmnq Feb 11, 2025
f3388b4
work on tests
defelmnq Feb 11, 2025
523f6fd
improve testing
defelmnq Feb 11, 2025
06adbf7
improve error messages
defelmnq Feb 11, 2025
c7b03d0
rework architecture of resources monitor
defelmnq Feb 12, 2025
2c3d171
improve resourcesmonitor struct
defelmnq Feb 12, 2025
18b65e0
improve resourcesmonitor struct
defelmnq Feb 12, 2025
c95b05a
change proto payload for get resources monitoring config
defelmnq Feb 12, 2025
c79b6cb
change proto payload for get resources monitoring config
defelmnq Feb 12, 2025
b28d4fa
rework fetcher and tests
defelmnq Feb 13, 2025
7701624
fix tests
defelmnq Feb 13, 2025
5fad903
fix tests
defelmnq Feb 13, 2025
b611ae5
fix tests
defelmnq Feb 13, 2025
3c65b8a
fix logic
defelmnq Feb 13, 2025
63c5869
improve testing fetcher and rename struct
defelmnq Feb 13, 2025
2d3eeb5
lint
defelmnq Feb 13, 2025
e17aafc
work on dbauthz
defelmnq Feb 13, 2025
c5a4201
improve dbauthz for fetching
defelmnq Feb 13, 2025
262a672
change dbauthz permissions
defelmnq Feb 13, 2025
dbca96e
finalise tests
defelmnq Feb 13, 2025
3145eab
fix comments from github
defelmnq Feb 13, 2025
3bec324
add collectedAt
defelmnq Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rework architecture of resources monitor
  • Loading branch information
defelmnq committed Feb 12, 2025
commit c7b03d03011c305a3bb7234e8d444c1190e15985
16 changes: 15 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/coder/coder/v2/agent/agentscripts"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
"github.com/coder/coder/v2/agent/reconnectingpty"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/gitauth"
Expand All @@ -46,6 +47,7 @@ import (
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
"github.com/coder/retry"
)

Expand Down Expand Up @@ -785,7 +787,19 @@ func (a *agent) run() (retErr error) {
// metadata reporting can cease as soon as we start gracefully shutting down
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)

connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, a.pushResourcesMonitoring)
// resources monitor can cease as soon as we start gracefully shutting down.
// The resources monitor is interesting when the workspace is running.
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
logger := a.logger.Named("resources_monitor")
clk := quartz.NewReal()
config, err := aAPI.GetResourcesMonitoringConfiguration(ctx, &proto.GetResourcesMonitoringConfigurationRequest{})
if err != nil {
return xerrors.Errorf("failed to get resources monitoring configuration: %w", err)
}

resourcesmonitor := resourcesmonitor.NewResourcesMonitor(logger, clk, config, aAPI)
return resourcesmonitor.Start(ctx)
})

// channels to sync goroutines below
// handle manifest
Expand Down
262 changes: 132 additions & 130 deletions agent/proto/agent.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion agent/proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ message GetResourcesMonitoringConfigurationResponse {
bool enabled = 1;
message Config {
int32 num_datapoints = 1;
int32 tick_interval = 2; // expressed in seconds
int32 collection_interval_seconds = 2;
}
Config config = 2;
repeated string monitored_volumes = 3;
Expand Down
2 changes: 2 additions & 0 deletions agent/proto/agent_drpc_old.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type DRPCAgentClient23 interface {
ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error)
}

// DRPCAgentClient24 is the Agent API at v2.4. It adds the GetResourcesMonitoringConfiguration and
// PushResourcesMonitoringUsage RPCs. Compatible with Coder v2.19+
type DRPCAgentClient24 interface {
DRPCAgentClient23
GetResourcesMonitoringConfiguration(ctx context.Context, in *GetResourcesMonitoringConfigurationRequest) (*GetResourcesMonitoringConfigurationResponse, error)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,57 +1,57 @@
package agent
package resourcesmonitor

import (
"github.com/coder/coder/v2/agent/proto"
)

type ResourcesMonitorDatapoint struct {
Memory *ResourcesMonitorMemoryDatapoint
Volumes []*ResourcesMonitorVolumeDatapoint
type Datapoint struct {
Memory *MemoryDatapoint
Volumes []*VolumeDatapoint
}

type ResourcesMonitorMemoryDatapoint struct {
type MemoryDatapoint struct {
Total int64
Used int64
}

type ResourcesMonitorVolumeDatapoint struct {
type VolumeDatapoint struct {
Path string
Total int64
Used int64
}

// ResourcesMonitorQueue represents a FIFO queue with a fixed size
type ResourcesMonitorQueue struct {
items []ResourcesMonitorDatapoint
// Queue represents a FIFO queue with a fixed size
type Queue struct {
items []Datapoint
size int
}

// newResourcesMonitorQueue creates a new ResourcesMonitorQueue with the given size
func NewResourcesMonitorQueue(size int) *ResourcesMonitorQueue {
return &ResourcesMonitorQueue{
items: make([]ResourcesMonitorDatapoint, 0, size),
// newQueue creates a new Queue with the given size
func NewQueue(size int) *Queue {
return &Queue{
items: make([]Datapoint, 0, size),
size: size,
}
}

// Push adds a new item to the queue
func (q *ResourcesMonitorQueue) Push(item ResourcesMonitorDatapoint) {
func (q *Queue) Push(item Datapoint) {
if len(q.items) >= q.size {
// Remove the first item (FIFO)
q.items = q.items[1:]
}
q.items = append(q.items, item)
}

func (q *ResourcesMonitorQueue) IsFull() bool {
func (q *Queue) IsFull() bool {
return len(q.items) == q.size
}

func (q *ResourcesMonitorQueue) Items() []ResourcesMonitorDatapoint {
func (q *Queue) Items() []Datapoint {
return q.items
}

func (q *ResourcesMonitorQueue) ItemsAsProto() []*proto.PushResourcesMonitoringUsageRequest_Datapoint {
func (q *Queue) ItemsAsProto() []*proto.PushResourcesMonitoringUsageRequest_Datapoint {
items := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint, 0, len(q.items))

for _, item := range q.items {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package agent_test
package resourcesmonitor_test

import (
"testing"

"github.com/coder/coder/v2/agent"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
)

func TestResourceMonitorQueue(t *testing.T) {
Expand All @@ -12,30 +12,30 @@ func TestResourceMonitorQueue(t *testing.T) {
tests := []struct {
name string
pushCount int
expected []agent.ResourcesMonitorDatapoint
expected []resourcesmonitor.Datapoint
}{
{
name: "Push zero",
pushCount: 0,
expected: []agent.ResourcesMonitorDatapoint{},
expected: []resourcesmonitor.Datapoint{},
},
{
name: "Push less than capacity",
pushCount: 3,
expected: []agent.ResourcesMonitorDatapoint{
{Memory: &agent.ResourcesMonitorMemoryDatapoint{Total: 1, Used: 1}},
{Memory: &agent.ResourcesMonitorMemoryDatapoint{Total: 2, Used: 2}},
{Memory: &agent.ResourcesMonitorMemoryDatapoint{Total: 3, Used: 3}},
expected: []resourcesmonitor.Datapoint{
{Memory: &resourcesmonitor.MemoryDatapoint{Total: 1, Used: 1}},
{Memory: &resourcesmonitor.MemoryDatapoint{Total: 2, Used: 2}},
{Memory: &resourcesmonitor.MemoryDatapoint{Total: 3, Used: 3}},
},
},
{
name: "Push exactly capacity",
pushCount: 20,
expected: func() []agent.ResourcesMonitorDatapoint {
var result []agent.ResourcesMonitorDatapoint
expected: func() []resourcesmonitor.Datapoint {
var result []resourcesmonitor.Datapoint
for i := 1; i <= 20; i++ {
result = append(result, agent.ResourcesMonitorDatapoint{
Memory: &agent.ResourcesMonitorMemoryDatapoint{
result = append(result, resourcesmonitor.Datapoint{
Memory: &resourcesmonitor.MemoryDatapoint{
Total: int64(i),
Used: int64(i),
},
Expand All @@ -47,11 +47,11 @@ func TestResourceMonitorQueue(t *testing.T) {
{
name: "Push more than capacity",
pushCount: 25,
expected: func() []agent.ResourcesMonitorDatapoint {
var result []agent.ResourcesMonitorDatapoint
expected: func() []resourcesmonitor.Datapoint {
var result []resourcesmonitor.Datapoint
for i := 6; i <= 25; i++ {
result = append(result, agent.ResourcesMonitorDatapoint{
Memory: &agent.ResourcesMonitorMemoryDatapoint{
result = append(result, resourcesmonitor.Datapoint{
Memory: &resourcesmonitor.MemoryDatapoint{
Total: int64(i),
Used: int64(i),
},
Expand All @@ -67,10 +67,10 @@ func TestResourceMonitorQueue(t *testing.T) {

t.Run(tt.name, func(t *testing.T) {
t.Parallel()
queue := agent.NewResourcesMonitorQueue(20)
queue := resourcesmonitor.NewQueue(20)
for i := 1; i <= tt.pushCount; i++ {
queue.Push(agent.ResourcesMonitorDatapoint{
Memory: &agent.ResourcesMonitorMemoryDatapoint{
queue.Push(resourcesmonitor.Datapoint{
Memory: &resourcesmonitor.MemoryDatapoint{
Total: int64(i),
Used: int64(i),
},
Expand All @@ -89,7 +89,7 @@ func TestResourceMonitorQueue(t *testing.T) {
}
}

func equal(a, b []agent.ResourcesMonitorDatapoint) bool {
func equal(a, b []resourcesmonitor.Datapoint) bool {
if len(a) != len(b) {
return false
}
Expand Down
130 changes: 130 additions & 0 deletions agent/proto/resourcesmonitor/resources_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package resourcesmonitor

import (
"context"
"time"

"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/cli/clistat"
"github.com/coder/quartz"
)

type monitor struct {
logger slog.Logger
clock quartz.Clock
config *proto.GetResourcesMonitoringConfigurationResponse
datapointsPusher datapointsPusher
}

//nolint:revive
func NewResourcesMonitor(logger slog.Logger, clock quartz.Clock, config *proto.GetResourcesMonitoringConfigurationResponse, datapointsPusher datapointsPusher) *monitor {
return &monitor{
logger: logger,
clock: clock,
config: config,
datapointsPusher: datapointsPusher,
}
}

type datapointsPusher interface {
PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error)
}

func (m *monitor) Start(ctx context.Context) error {
if !m.config.Enabled {
m.logger.Info(ctx, "resources monitoring is disabled - skipping")
return nil
}

resourcesFetcher, err := clistat.New()
if err != nil {
return xerrors.Errorf("failed to create resources fetcher: %w", err)
}

datapointsQueue := NewQueue(int(m.config.Config.NumDatapoints))

m.clock.TickerFunc(ctx, time.Duration(m.config.Config.CollectionIntervalSeconds*int32(time.Second)), func() error {
memTotal, memUsed, err := m.fetchResourceMonitoredMemory(resourcesFetcher)
if err != nil {
m.logger.Error(ctx, "failed to fetch memory", slog.Error(err))
return nil
}

volumes := make([]*VolumeDatapoint, 0, len(m.config.MonitoredVolumes))
for _, volume := range m.config.MonitoredVolumes {
volTotal, volUsed, err := m.fetchResourceMonitoredVolume(resourcesFetcher, volume)
if err != nil {
m.logger.Error(ctx, "failed to fetch volume", slog.Error(err))
continue
}

volumes = append(volumes, &VolumeDatapoint{
Path: volume,
Total: volTotal,
Used: volUsed,
})
}

datapointsQueue.Push(Datapoint{
Memory: &MemoryDatapoint{
Total: memTotal,
Used: memUsed,
},
Volumes: volumes,
})

if datapointsQueue.IsFull() {
_, err = m.datapointsPusher.PushResourcesMonitoringUsage(ctx, &proto.PushResourcesMonitoringUsageRequest{
Datapoints: datapointsQueue.ItemsAsProto(),
})
if err != nil {
m.logger.Error(ctx, "failed to push resources monitoring usage", slog.Error(err))
}
}

return nil
}, "resources_monitor")

return nil
}

func (m *monitor) fetchResourceMonitoredMemory(fetcher *clistat.Statter) (total int64, used int64, err error) {
mem, err := fetcher.HostMemory(clistat.PrefixMebi)
if err != nil {
return 0, 0, err
}

var memTotal, memUsed int64
if mem.Total == nil {
return 0, 0, xerrors.New("memory total is nil - can not fetch memory")
}

memTotal = m.bytesToMegabytes(int64(*mem.Total))
memUsed = m.bytesToMegabytes(int64(mem.Used))

return memTotal, memUsed, nil
}

func (m *monitor) fetchResourceMonitoredVolume(fetcher *clistat.Statter, volume string) (total int64, used int64, err error) {
vol, err := fetcher.Disk(clistat.PrefixMebi, volume)
if err != nil {
return 0, 0, err
}

var volTotal, volUsed int64
if vol.Total == nil {
return 0, 0, xerrors.New("volume total is nil - can not fetch volume")
}

volTotal = m.bytesToMegabytes(int64(*vol.Total))
volUsed = m.bytesToMegabytes(int64(vol.Used))

return volTotal, volUsed, nil
}

func (*monitor) bytesToMegabytes(bytes int64) int64 {
return bytes / (1024 * 1024)
}
Loading
Loading