-
Notifications
You must be signed in to change notification settings - Fork 894
feat: implement agent process management #9461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
4ed4069
7e59db6
8c65216
760cbb3
f4b864e
cbcb854
8230247
3e1defd
2fe9c70
ef41e9a
05baba0
478d57c
0ced5ce
8aaa6d5
cea4851
5020eb4
11ab047
46ef05a
d132480
04ee5cb
ffbeab9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |
"os/exec" | ||
"os/user" | ||
"path/filepath" | ||
"runtime" | ||
"sort" | ||
"strconv" | ||
"strings" | ||
|
@@ -34,6 +35,7 @@ import ( | |
"tailscale.com/types/netlogtype" | ||
|
||
"cdr.dev/slog" | ||
"github.com/coder/coder/v2/agent/agentproc" | ||
"github.com/coder/coder/v2/agent/agentssh" | ||
"github.com/coder/coder/v2/agent/reconnectingpty" | ||
"github.com/coder/coder/v2/buildinfo" | ||
|
@@ -51,6 +53,10 @@ const ( | |
ProtocolDial = "dial" | ||
) | ||
|
||
// EnvProcMemNice determines whether we attempt to manage | ||
// process CPU and OOM Killer priority. | ||
const EnvProcMemNice = "CODER_PROC_MEMNICE_ENABLE" | ||
|
||
type Options struct { | ||
Filesystem afero.Fs | ||
LogDir string | ||
|
@@ -68,6 +74,11 @@ type Options struct { | |
PrometheusRegistry *prometheus.Registry | ||
ReportMetadataInterval time.Duration | ||
ServiceBannerRefreshInterval time.Duration | ||
Syscaller agentproc.Syscaller | ||
// ModifiedProcesses is used for testing process priority management. | ||
ModifiedProcesses chan []*agentproc.Process | ||
// ProcessManagementTick is used for testing process priority management. | ||
ProcessManagementTick <-chan time.Time | ||
} | ||
|
||
type Client interface { | ||
|
@@ -120,6 +131,10 @@ func New(options Options) Agent { | |
prometheusRegistry = prometheus.NewRegistry() | ||
} | ||
|
||
if options.Syscaller == nil { | ||
options.Syscaller = agentproc.NewSyscaller() | ||
} | ||
|
||
ctx, cancelFunc := context.WithCancel(context.Background()) | ||
a := &agent{ | ||
tailnetListenPort: options.TailnetListenPort, | ||
|
@@ -143,6 +158,9 @@ func New(options Options) Agent { | |
sshMaxTimeout: options.SSHMaxTimeout, | ||
subsystems: options.Subsystems, | ||
addresses: options.Addresses, | ||
syscaller: options.Syscaller, | ||
modifiedProcs: options.ModifiedProcesses, | ||
processManagementTick: options.ProcessManagementTick, | ||
|
||
prometheusRegistry: prometheusRegistry, | ||
metrics: newAgentMetrics(prometheusRegistry), | ||
|
@@ -197,6 +215,12 @@ type agent struct { | |
|
||
prometheusRegistry *prometheus.Registry | ||
metrics *agentMetrics | ||
syscaller agentproc.Syscaller | ||
|
||
// podifiedProcs is used for testing process priority management. | ||
sreya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
modifiedProcs chan []*agentproc.Process | ||
// processManagementTick is used for testing process priority management. | ||
processManagementTick <-chan time.Time | ||
} | ||
|
||
func (a *agent) TailnetConn() *tailnet.Conn { | ||
|
@@ -225,6 +249,7 @@ func (a *agent) runLoop(ctx context.Context) { | |
go a.reportLifecycleLoop(ctx) | ||
go a.reportMetadataLoop(ctx) | ||
go a.fetchServiceBannerLoop(ctx) | ||
go a.manageProcessPriorityLoop(ctx) | ||
|
||
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); { | ||
a.logger.Info(ctx, "connecting to coderd") | ||
|
@@ -1253,6 +1278,133 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) { | |
} | ||
} | ||
|
||
var prioritizedProcs = []string{"coder"} | ||
|
||
func (a *agent) manageProcessPriorityLoop(ctx context.Context) { | ||
if val := a.envVars[EnvProcMemNice]; val == "" || runtime.GOOS != "linux" { | ||
a.logger.Info(ctx, "process priority not enabled, agent will not manage process niceness/oom_score_adj ", | ||
sreya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
slog.F("env_var", EnvProcMemNice), | ||
slog.F("value", val), | ||
slog.F("goos", runtime.GOOS), | ||
) | ||
return | ||
} | ||
|
||
manage := func() { | ||
procs, err := a.manageProcessPriority(ctx) | ||
if err != nil { | ||
a.logger.Error(ctx, "manage process priority", | ||
slog.F("dir", agentproc.DefaultProcDir), | ||
slog.Error(err), | ||
) | ||
} | ||
if a.modifiedProcs != nil { | ||
a.modifiedProcs <- procs | ||
} | ||
} | ||
|
||
// Do once before falling into loop. | ||
manage() | ||
sreya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if a.processManagementTick == nil { | ||
ticker := time.NewTicker(time.Second) | ||
defer ticker.Stop() | ||
a.processManagementTick = ticker.C | ||
} | ||
|
||
for { | ||
select { | ||
case <-a.processManagementTick: | ||
manage() | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process, error) { | ||
const ( | ||
procDir = agentproc.DefaultProcDir | ||
niceness = 10 | ||
oomScoreAdj = -500 | ||
) | ||
|
||
procs, err := agentproc.List(a.filesystem, a.syscaller, agentproc.DefaultProcDir) | ||
if err != nil { | ||
return nil, xerrors.Errorf("list: %w", err) | ||
} | ||
|
||
modProcs := []*agentproc.Process{} | ||
for _, proc := range procs { | ||
// Trim off the path e.g. "./coder" -> "coder" | ||
name := filepath.Base(proc.Name()) | ||
// If the process is prioritized we should adjust | ||
// it's oom_score_adj and avoid lowering its niceness. | ||
if slices.Contains(prioritizedProcs, name) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to specifically prioritize the agent and not other coder processes right? If I'm reading this code correctly it would treat There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good catch, I don't see that as being a big deal but we can be more discriminate about which processes we want to prioritize by also parsing command arguments. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just check if its the current process? |
||
err = proc.SetOOMAdj(oomScoreAdj) | ||
if err != nil { | ||
a.logger.Error(ctx, "unable to set proc oom_score_adj", | ||
slog.F("name", proc.Name()), | ||
slog.F("pid", proc.PID), | ||
slog.F("oom_score_adj", oomScoreAdj), | ||
slog.Error(err), | ||
) | ||
continue | ||
} | ||
modProcs = append(modProcs, proc) | ||
|
||
a.logger.Debug(ctx, "decreased process oom_score", | ||
slog.F("name", proc.Name()), | ||
slog.F("pid", proc.PID), | ||
slog.F("oom_score_adj", oomScoreAdj), | ||
) | ||
continue | ||
} | ||
|
||
score, err := proc.Niceness(a.syscaller) | ||
if err != nil { | ||
a.logger.Error(ctx, "unable to get proc niceness", | ||
slog.F("name", proc.Name()), | ||
slog.F("pid", proc.PID), | ||
slog.Error(err), | ||
) | ||
continue | ||
} | ||
|
||
// We only want processes that don't have a nice value set | ||
// so we don't override user nice values. | ||
// Getpriority actually returns priority for the nice value | ||
// which is niceness + 20, so here 20 = a niceness of 0 (aka unset). | ||
if score != 20 { | ||
a.logger.Error(ctx, "skipping process due to custom niceness", | ||
slog.F("name", proc.Name()), | ||
slog.F("pid", proc.PID), | ||
slog.F("niceness", score), | ||
) | ||
continue | ||
} | ||
|
||
err = proc.SetNiceness(a.syscaller, niceness) | ||
if err != nil { | ||
a.logger.Error(ctx, "unable to set proc niceness", | ||
slog.F("name", proc.Name()), | ||
slog.F("pid", proc.PID), | ||
slog.F("niceness", niceness), | ||
slog.Error(err), | ||
) | ||
continue | ||
} | ||
|
||
a.logger.Debug(ctx, "deprioritized process", | ||
slog.F("name", proc.Name()), | ||
slog.F("pid", proc.PID), | ||
sreya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
slog.F("niceness", niceness), | ||
) | ||
modProcs = append(modProcs, proc) | ||
} | ||
return modProcs, nil | ||
} | ||
|
||
// isClosed returns whether the API is closed or not. | ||
func (a *agent) isClosed() bool { | ||
select { | ||
|
Uh oh!
There was an error while loading. Please reload this page.