Skip to content

feat: implement agent process management #9461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
some minor agent tests
  • Loading branch information
sreya committed Sep 8, 2023
commit 760cbb38d56bb7f76564cc4e42a6162eb8a14bd3
164 changes: 96 additions & 68 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Options struct {
ReportMetadataInterval time.Duration
ServiceBannerRefreshInterval time.Duration
Syscaller agentproc.Syscaller
ProcessManagementInterval time.Duration
}

type Client interface {
Expand Down Expand Up @@ -125,6 +126,14 @@ func New(options Options) Agent {
prometheusRegistry = prometheus.NewRegistry()
}

if options.Syscaller == nil {
options.Syscaller = agentproc.UnixSyscaller{}
}

if options.ProcessManagementInterval == 0 {
options.ProcessManagementInterval = time.Second
}

ctx, cancelFunc := context.WithCancel(context.Background())
a := &agent{
tailnetListenPort: options.TailnetListenPort,
Expand All @@ -148,6 +157,8 @@ func New(options Options) Agent {
sshMaxTimeout: options.SSHMaxTimeout,
subsystems: options.Subsystems,
addresses: options.Addresses,
syscaller: options.Syscaller,
processManagementInterval: options.ProcessManagementInterval,

prometheusRegistry: prometheusRegistry,
metrics: newAgentMetrics(prometheusRegistry),
Expand Down Expand Up @@ -200,9 +211,10 @@ type agent struct {

connCountReconnectingPTY atomic.Int64

prometheusRegistry *prometheus.Registry
metrics *agentMetrics
syscaller agentproc.Syscaller
prometheusRegistry *prometheus.Registry
metrics *agentMetrics
processManagementInterval time.Duration
syscaller agentproc.Syscaller
}

func (a *agent) TailnetConn() *tailnet.Conn {
Expand Down Expand Up @@ -1263,15 +1275,9 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
var prioritizedProcs = []string{"coder"}

func (a *agent) manageProcessPriorityLoop(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
ticker := time.NewTicker(a.processManagementInterval)
defer ticker.Stop()

const (
procDir = agentproc.DefaultProcDir
niceness = 10
oomScoreAdj = 100
)

if val := a.envVars[EnvProcMemNice]; val == "" || runtime.GOOS != "linux" {
a.logger.Info(ctx, "process priority not enabled, agent will not manage process niceness/oom_score_adj ",
slog.F("env_var", EnvProcMemNice),
Expand All @@ -1281,81 +1287,103 @@ func (a *agent) manageProcessPriorityLoop(ctx context.Context) {
return
}

// Do once before falling into loop.
if err := a.manageProcessPriority(ctx); err != nil {
a.logger.Error(ctx, "manage process priority",
slog.F("dir", agentproc.DefaultProcDir),
slog.Error(err),
)
}

for {
select {
case <-ticker.C:
procs, err := agentproc.List(a.filesystem, a.syscaller, agentproc.DefaultProcDir)
if err != nil {
a.logger.Error(ctx, "failed to list procs",
if err := a.manageProcessPriority(ctx); err != nil {
a.logger.Error(ctx, "manage process priority",
slog.F("dir", agentproc.DefaultProcDir),
slog.Error(err),
)
continue
}
for _, proc := range procs {
// Trim off the path e.g. "./coder" -> "coder"
name := filepath.Base(proc.Name())
// If the process is prioritized we should adjust
// it's oom_score_adj and avoid lowering its niceness.
if slices.Contains(prioritizedProcs, name) {
err = proc.SetOOMAdj(oomScoreAdj)
if err != nil {
a.logger.Error(ctx, "unable to set proc oom_score_adj",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("oom_score_adj", oomScoreAdj),
slog.Error(err),
)
continue
}

a.logger.Debug(ctx, "decreased process oom_score",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("oom_score_adj", oomScoreAdj),
)
continue
}
case <-ctx.Done():
return
}
}
}

score, err := proc.Niceness(a.syscaller)
if err != nil {
a.logger.Error(ctx, "unable to get proc niceness",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.Error(err),
)
continue
}
if score != 20 {
a.logger.Error(ctx, "skipping process due to custom niceness",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("niceness", score),
)
continue
}
func (a *agent) manageProcessPriority(ctx context.Context) error {
const (
procDir = agentproc.DefaultProcDir
niceness = 10
oomScoreAdj = 100
)

err = proc.SetNiceness(a.syscaller, niceness)
if err != nil {
a.logger.Error(ctx, "unable to set proc niceness",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("niceness", niceness),
slog.Error(err),
)
continue
}
procs, err := agentproc.List(a.filesystem, a.syscaller, agentproc.DefaultProcDir)
if err != nil {
return xerrors.Errorf("list: %w", err)
}

a.logger.Debug(ctx, "deprioritized process",
for _, proc := range procs {
// Trim off the path e.g. "./coder" -> "coder"
name := filepath.Base(proc.Name())
// If the process is prioritized we should adjust
// it's oom_score_adj and avoid lowering its niceness.
if slices.Contains(prioritizedProcs, name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to specifically prioritize the agent and not other coder processes right? If I'm reading this code correctly it would treat coder server and coder stat the same as the agent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch, I don't see that as being a big deal but we can be more discriminate about which processes we want to prioritize by also parsing command arguments. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just check if its the current process?

err = proc.SetOOMAdj(oomScoreAdj)
if err != nil {
a.logger.Error(ctx, "unable to set proc oom_score_adj",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("niceness", niceness),
slog.F("oom_score_adj", oomScoreAdj),
slog.Error(err),
)
continue
}
case <-ctx.Done():
return

a.logger.Debug(ctx, "decreased process oom_score",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("oom_score_adj", oomScoreAdj),
)
continue
}

score, err := proc.Niceness(a.syscaller)
if err != nil {
a.logger.Error(ctx, "unable to get proc niceness",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.Error(err),
)
continue
}
if score != 20 {
a.logger.Error(ctx, "skipping process due to custom niceness",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("niceness", score),
)
continue
}

err = proc.SetNiceness(a.syscaller, niceness)
if err != nil {
a.logger.Error(ctx, "unable to set proc niceness",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("niceness", niceness),
slog.Error(err),
)
continue
}

a.logger.Debug(ctx, "deprioritized process",
slog.F("name", proc.Name()),
slog.F("pid", proc.PID),
slog.F("niceness", niceness),
)
}
return nil
}

// isClosed returns whether the API is closed or not.
Expand Down
43 changes: 43 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"tailscale.com/tailcfg"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent"
"github.com/coder/coder/v2/agent/agentssh"
Expand Down Expand Up @@ -2395,6 +2396,48 @@ func TestAgent_Metrics_SSH(t *testing.T) {
require.NoError(t, err)
}

func TestAgent_ManageProcessPriority(t *testing.T) {
t.Parallel()

t.Run("DisabledByDefault", func(t *testing.T) {
t.Parallel()

if runtime.GOOS != "linux" {
t.Skip("Skipping non-linux environment")
}

var buf bytes.Buffer
log := slog.Make(sloghuman.Sink(&buf))

_, _, _, _, _ = setupAgent(t, agentsdk.Manifest{}, 0, func(c *agenttest.Client, o *agent.Options) {
o.Logger = log
})

require.Eventually(t, func() bool {
return strings.Contains(buf.String(), "process priority not enabled")
}, testutil.WaitLong, testutil.IntervalFast)
})

t.Run("DisabledForNonLinux", func(t *testing.T) {
t.Parallel()

if runtime.GOOS == "linux" {
t.Skip("Skipping linux environment")
}

var buf bytes.Buffer
log := slog.Make(sloghuman.Sink(&buf))

_, _, _, _, _ = setupAgent(t, agentsdk.Manifest{}, 0, func(c *agenttest.Client, o *agent.Options) {
o.Logger = log
o.EnvironmentVariables = map[string]string{agent.EnvProcMemNice: "1"}
})
require.Eventually(t, func() bool {
return strings.Contains(buf.String(), "process priority not enabled")
}, testutil.WaitLong, testutil.IntervalFast)
})
}

func verifyCollectedMetrics(t *testing.T, expected []agentsdk.AgentMetric, actual []*promgo.MetricFamily) bool {
t.Helper()

Expand Down
1 change: 0 additions & 1 deletion agent/agentproc/agentproctest/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ func GenerateProcess(t *testing.T, fs afero.Fs, dir string) agentproc.Process {
Dir: fmt.Sprintf("%s/%d", dir, pid),
FS: fs,
}

}
4 changes: 2 additions & 2 deletions agent/agentproc/syscaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (UnixSyscaller) GetPriority(pid int32) (int, error) {
return nice, nil
}

func (UnixSyscaller) Kill(pid int, sig syscall.Signal) error {
err := syscall.Kill(pid, sig)
func (UnixSyscaller) Kill(pid int32, sig syscall.Signal) error {
err := syscall.Kill(int(pid), sig)
if err != nil {
return xerrors.Errorf("kill: %w", err)
}
Expand Down