@@ -15,6 +15,8 @@ import (
15
15
"os/exec"
16
16
"os/user"
17
17
"path/filepath"
18
+ "runtime"
19
+ "runtime/debug"
18
20
"sort"
19
21
"strconv"
20
22
"strings"
@@ -34,6 +36,7 @@ import (
34
36
"tailscale.com/types/netlogtype"
35
37
36
38
"cdr.dev/slog"
39
+ "github.com/coder/coder/v2/agent/agentproc"
37
40
"github.com/coder/coder/v2/agent/agentssh"
38
41
"github.com/coder/coder/v2/agent/reconnectingpty"
39
42
"github.com/coder/coder/v2/buildinfo"
@@ -51,6 +54,10 @@ const (
51
54
ProtocolDial = "dial"
52
55
)
53
56
57
+ // EnvProcPrioMgmt determines whether we attempt to manage
58
+ // process CPU and OOM Killer priority.
59
+ const EnvProcPrioMgmt = "CODER_PROC_PRIO_MGMT"
60
+
54
61
type Options struct {
55
62
Filesystem afero.Fs
56
63
LogDir string
@@ -68,6 +75,11 @@ type Options struct {
68
75
PrometheusRegistry * prometheus.Registry
69
76
ReportMetadataInterval time.Duration
70
77
ServiceBannerRefreshInterval time.Duration
78
+ Syscaller agentproc.Syscaller
79
+ // ModifiedProcesses is used for testing process priority management.
80
+ ModifiedProcesses chan []* agentproc.Process
81
+ // ProcessManagementTick is used for testing process priority management.
82
+ ProcessManagementTick <- chan time.Time
71
83
}
72
84
73
85
type Client interface {
@@ -120,6 +132,10 @@ func New(options Options) Agent {
120
132
prometheusRegistry = prometheus .NewRegistry ()
121
133
}
122
134
135
+ if options .Syscaller == nil {
136
+ options .Syscaller = agentproc .NewSyscaller ()
137
+ }
138
+
123
139
ctx , cancelFunc := context .WithCancel (context .Background ())
124
140
a := & agent {
125
141
tailnetListenPort : options .TailnetListenPort ,
@@ -143,6 +159,9 @@ func New(options Options) Agent {
143
159
sshMaxTimeout : options .SSHMaxTimeout ,
144
160
subsystems : options .Subsystems ,
145
161
addresses : options .Addresses ,
162
+ syscaller : options .Syscaller ,
163
+ modifiedProcs : options .ModifiedProcesses ,
164
+ processManagementTick : options .ProcessManagementTick ,
146
165
147
166
prometheusRegistry : prometheusRegistry ,
148
167
metrics : newAgentMetrics (prometheusRegistry ),
@@ -197,6 +216,12 @@ type agent struct {
197
216
198
217
prometheusRegistry * prometheus.Registry
199
218
metrics * agentMetrics
219
+ syscaller agentproc.Syscaller
220
+
221
+ // modifiedProcs is used for testing process priority management.
222
+ modifiedProcs chan []* agentproc.Process
223
+ // processManagementTick is used for testing process priority management.
224
+ processManagementTick <- chan time.Time
200
225
}
201
226
202
227
func (a * agent ) TailnetConn () * tailnet.Conn {
@@ -225,6 +250,7 @@ func (a *agent) runLoop(ctx context.Context) {
225
250
go a .reportLifecycleLoop (ctx )
226
251
go a .reportMetadataLoop (ctx )
227
252
go a .fetchServiceBannerLoop (ctx )
253
+ go a .manageProcessPriorityLoop (ctx )
228
254
229
255
for retrier := retry .New (100 * time .Millisecond , 10 * time .Second ); retrier .Wait (ctx ); {
230
256
a .logger .Info (ctx , "connecting to coderd" )
@@ -1253,6 +1279,119 @@ func (a *agent) startReportingConnectionStats(ctx context.Context) {
1253
1279
}
1254
1280
}
1255
1281
1282
+ var prioritizedProcs = []string {"coder agent" }
1283
+
1284
+ func (a * agent ) manageProcessPriorityLoop (ctx context.Context ) {
1285
+ defer func () {
1286
+ if r := recover (); r != nil {
1287
+ a .logger .Critical (ctx , "recovered from panic" ,
1288
+ slog .F ("panic" , r ),
1289
+ slog .F ("stack" , string (debug .Stack ())),
1290
+ )
1291
+ }
1292
+ }()
1293
+
1294
+ if val := a .envVars [EnvProcPrioMgmt ]; val == "" || runtime .GOOS != "linux" {
1295
+ a .logger .Debug (ctx , "process priority not enabled, agent will not manage process niceness/oom_score_adj " ,
1296
+ slog .F ("env_var" , EnvProcPrioMgmt ),
1297
+ slog .F ("value" , val ),
1298
+ slog .F ("goos" , runtime .GOOS ),
1299
+ )
1300
+ return
1301
+ }
1302
+
1303
+ if a .processManagementTick == nil {
1304
+ ticker := time .NewTicker (time .Second )
1305
+ defer ticker .Stop ()
1306
+ a .processManagementTick = ticker .C
1307
+ }
1308
+
1309
+ for {
1310
+ procs , err := a .manageProcessPriority (ctx )
1311
+ if err != nil {
1312
+ a .logger .Error (ctx , "manage process priority" ,
1313
+ slog .Error (err ),
1314
+ )
1315
+ }
1316
+ if a .modifiedProcs != nil {
1317
+ a .modifiedProcs <- procs
1318
+ }
1319
+
1320
+ select {
1321
+ case <- a .processManagementTick :
1322
+ case <- ctx .Done ():
1323
+ return
1324
+ }
1325
+ }
1326
+ }
1327
+
1328
+ func (a * agent ) manageProcessPriority (ctx context.Context ) ([]* agentproc.Process , error ) {
1329
+ const (
1330
+ niceness = 10
1331
+ )
1332
+
1333
+ procs , err := agentproc .List (a .filesystem , a .syscaller )
1334
+ if err != nil {
1335
+ return nil , xerrors .Errorf ("list: %w" , err )
1336
+ }
1337
+
1338
+ var (
1339
+ modProcs = []* agentproc.Process {}
1340
+ logger slog.Logger
1341
+ )
1342
+
1343
+ for _ , proc := range procs {
1344
+ logger = a .logger .With (
1345
+ slog .F ("cmd" , proc .Cmd ()),
1346
+ slog .F ("pid" , proc .PID ),
1347
+ )
1348
+
1349
+ containsFn := func (e string ) bool {
1350
+ contains := strings .Contains (proc .Cmd (), e )
1351
+ return contains
1352
+ }
1353
+
1354
+ // If the process is prioritized we should adjust
1355
+ // it's oom_score_adj and avoid lowering its niceness.
1356
+ if slices .ContainsFunc [[]string , string ](prioritizedProcs , containsFn ) {
1357
+ continue
1358
+ }
1359
+
1360
+ score , err := proc .Niceness (a .syscaller )
1361
+ if err != nil {
1362
+ logger .Warn (ctx , "unable to get proc niceness" ,
1363
+ slog .Error (err ),
1364
+ )
1365
+ continue
1366
+ }
1367
+
1368
+ // We only want processes that don't have a nice value set
1369
+ // so we don't override user nice values.
1370
+ // Getpriority actually returns priority for the nice value
1371
+ // which is niceness + 20, so here 20 = a niceness of 0 (aka unset).
1372
+ if score != 20 {
1373
+ if score != niceness {
1374
+ logger .Debug (ctx , "skipping process due to custom niceness" ,
1375
+ slog .F ("niceness" , score ),
1376
+ )
1377
+ }
1378
+ continue
1379
+ }
1380
+
1381
+ err = proc .SetNiceness (a .syscaller , niceness )
1382
+ if err != nil {
1383
+ logger .Warn (ctx , "unable to set proc niceness" ,
1384
+ slog .F ("niceness" , niceness ),
1385
+ slog .Error (err ),
1386
+ )
1387
+ continue
1388
+ }
1389
+
1390
+ modProcs = append (modProcs , proc )
1391
+ }
1392
+ return modProcs , nil
1393
+ }
1394
+
1256
1395
// isClosed returns whether the API is closed or not.
1257
1396
func (a * agent ) isClosed () bool {
1258
1397
select {
0 commit comments