Skip to content

Commit 7311ffb

Browse files
authored
feat: implement agent process management (#9461)
- An opt-in feature has been added to the agent to allow deprioritizing non coder-related processes for CPU by setting their niceness level to 10. - Opting in to the feature requires setting CODER_PROC_PRIO_MGMT to a non-empty value.
1 parent 79d4179 commit 7311ffb

13 files changed

+856
-1
lines changed

agent/agent.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"os/exec"
1616
"os/user"
1717
"path/filepath"
18+
"runtime"
19+
"runtime/debug"
1820
"sort"
1921
"strconv"
2022
"strings"
@@ -34,6 +36,7 @@ import (
3436
"tailscale.com/types/netlogtype"
3537

3638
"cdr.dev/slog"
39+
"github.com/coder/coder/v2/agent/agentproc"
3740
"github.com/coder/coder/v2/agent/agentssh"
3841
"github.com/coder/coder/v2/agent/reconnectingpty"
3942
"github.com/coder/coder/v2/buildinfo"
@@ -51,6 +54,10 @@ const (
5154
ProtocolDial = "dial"
5255
)
5356

57+
// EnvProcPrioMgmt determines whether we attempt to manage
58+
// process CPU and OOM Killer priority.
59+
const EnvProcPrioMgmt = "CODER_PROC_PRIO_MGMT"
60+
5461
type Options struct {
5562
Filesystem afero.Fs
5663
LogDir string
@@ -68,6 +75,11 @@ type Options struct {
6875
PrometheusRegistry *prometheus.Registry
6976
ReportMetadataInterval time.Duration
7077
ServiceBannerRefreshInterval time.Duration
78+
Syscaller agentproc.Syscaller
79+
// ModifiedProcesses is used for testing process priority management.
80+
ModifiedProcesses chan []*agentproc.Process
81+
// ProcessManagementTick is used for testing process priority management.
82+
ProcessManagementTick <-chan time.Time
7183
}
7284

7385
type Client interface {
@@ -120,6 +132,10 @@ func New(options Options) Agent {
120132
prometheusRegistry = prometheus.NewRegistry()
121133
}
122134

135+
if options.Syscaller == nil {
136+
options.Syscaller = agentproc.NewSyscaller()
137+
}
138+
123139
ctx, cancelFunc := context.WithCancel(context.Background())
124140
a := &agent{
125141
tailnetListenPort: options.TailnetListenPort,
@@ -143,6 +159,9 @@ func New(options Options) Agent {
143159
sshMaxTimeout: options.SSHMaxTimeout,
144160
subsystems: options.Subsystems,
145161
addresses: options.Addresses,
162+
syscaller: options.Syscaller,
163+
modifiedProcs: options.ModifiedProcesses,
164+
processManagementTick: options.ProcessManagementTick,
146165

147166
prometheusRegistry: prometheusRegistry,
148167
metrics: newAgentMetrics(prometheusRegistry),
@@ -197,6 +216,12 @@ type agent struct {
197216

198217
prometheusRegistry *prometheus.Registry
199218
metrics *agentMetrics
219+
syscaller agentproc.Syscaller
220+
221+
// modifiedProcs is used for testing process priority management.
222+
modifiedProcs chan []*agentproc.Process
223+
// processManagementTick is used for testing process priority management.
224+
processManagementTick <-chan time.Time
200225
}
201226

202227
func (a *agent) TailnetConn() *tailnet.Conn {
@@ -225,6 +250,7 @@ func (a *agent) runLoop(ctx context.Context) {
225250
go a.reportLifecycleLoop(ctx)
226251
go a.reportMetadataLoop(ctx)
227252
go a.fetchServiceBannerLoop(ctx)
253+
go a.manageProcessPriorityLoop(ctx)
228254

229255
for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
230256
a.logger.Info(ctx, "connecting to coderd")
@@ -1253,6 +1279,119 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
12531279
}
12541280
}
12551281

1282+
var prioritizedProcs = []string{"coder agent"}
1283+
1284+
func (a *agent) manageProcessPriorityLoop(ctx context.Context) {
1285+
defer func() {
1286+
if r := recover(); r != nil {
1287+
a.logger.Critical(ctx, "recovered from panic",
1288+
slog.F("panic", r),
1289+
slog.F("stack", string(debug.Stack())),
1290+
)
1291+
}
1292+
}()
1293+
1294+
if val := a.envVars[EnvProcPrioMgmt]; val == "" || runtime.GOOS != "linux" {
1295+
a.logger.Debug(ctx, "process priority not enabled, agent will not manage process niceness/oom_score_adj ",
1296+
slog.F("env_var", EnvProcPrioMgmt),
1297+
slog.F("value", val),
1298+
slog.F("goos", runtime.GOOS),
1299+
)
1300+
return
1301+
}
1302+
1303+
if a.processManagementTick == nil {
1304+
ticker := time.NewTicker(time.Second)
1305+
defer ticker.Stop()
1306+
a.processManagementTick = ticker.C
1307+
}
1308+
1309+
for {
1310+
procs, err := a.manageProcessPriority(ctx)
1311+
if err != nil {
1312+
a.logger.Error(ctx, "manage process priority",
1313+
slog.Error(err),
1314+
)
1315+
}
1316+
if a.modifiedProcs != nil {
1317+
a.modifiedProcs <- procs
1318+
}
1319+
1320+
select {
1321+
case <-a.processManagementTick:
1322+
case <-ctx.Done():
1323+
return
1324+
}
1325+
}
1326+
}
1327+
1328+
func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process, error) {
1329+
const (
1330+
niceness = 10
1331+
)
1332+
1333+
procs, err := agentproc.List(a.filesystem, a.syscaller)
1334+
if err != nil {
1335+
return nil, xerrors.Errorf("list: %w", err)
1336+
}
1337+
1338+
var (
1339+
modProcs = []*agentproc.Process{}
1340+
logger slog.Logger
1341+
)
1342+
1343+
for _, proc := range procs {
1344+
logger = a.logger.With(
1345+
slog.F("cmd", proc.Cmd()),
1346+
slog.F("pid", proc.PID),
1347+
)
1348+
1349+
containsFn := func(e string) bool {
1350+
contains := strings.Contains(proc.Cmd(), e)
1351+
return contains
1352+
}
1353+
1354+
// If the process is prioritized we should adjust
1355+
// it's oom_score_adj and avoid lowering its niceness.
1356+
if slices.ContainsFunc[[]string, string](prioritizedProcs, containsFn) {
1357+
continue
1358+
}
1359+
1360+
score, err := proc.Niceness(a.syscaller)
1361+
if err != nil {
1362+
logger.Warn(ctx, "unable to get proc niceness",
1363+
slog.Error(err),
1364+
)
1365+
continue
1366+
}
1367+
1368+
// We only want processes that don't have a nice value set
1369+
// so we don't override user nice values.
1370+
// Getpriority actually returns priority for the nice value
1371+
// which is niceness + 20, so here 20 = a niceness of 0 (aka unset).
1372+
if score != 20 {
1373+
if score != niceness {
1374+
logger.Debug(ctx, "skipping process due to custom niceness",
1375+
slog.F("niceness", score),
1376+
)
1377+
}
1378+
continue
1379+
}
1380+
1381+
err = proc.SetNiceness(a.syscaller, niceness)
1382+
if err != nil {
1383+
logger.Warn(ctx, "unable to set proc niceness",
1384+
slog.F("niceness", niceness),
1385+
slog.Error(err),
1386+
)
1387+
continue
1388+
}
1389+
1390+
modProcs = append(modProcs, proc)
1391+
}
1392+
return modProcs, nil
1393+
}
1394+
12561395
// isClosed returns whether the API is closed or not.
12571396
func (a *agent) isClosed() bool {
12581397
select {

0 commit comments

Comments
 (0)