Skip to content

Commit 18f3c8a

Browse files
Merge branch 'main' into dm-experiment-autostart
2 parents 23b2ec9 + 56eb722 commit 18f3c8a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+559
-1409
lines changed

agent/agent.go

+2-266
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"os"
1313
"os/user"
1414
"path/filepath"
15-
"runtime"
16-
"runtime/debug"
1715
"sort"
1816
"strconv"
1917
"strings"
@@ -35,7 +33,6 @@ import (
3533
"tailscale.com/util/clientmetric"
3634

3735
"cdr.dev/slog"
38-
"github.com/coder/coder/v2/agent/agentproc"
3936
"github.com/coder/coder/v2/agent/agentscripts"
4037
"github.com/coder/coder/v2/agent/agentssh"
4138
"github.com/coder/coder/v2/agent/proto"
@@ -82,12 +79,7 @@ type Options struct {
8279
PrometheusRegistry *prometheus.Registry
8380
ReportMetadataInterval time.Duration
8481
ServiceBannerRefreshInterval time.Duration
85-
Syscaller agentproc.Syscaller
86-
// ModifiedProcesses is used for testing process priority management.
87-
ModifiedProcesses chan []*agentproc.Process
88-
// ProcessManagementTick is used for testing process priority management.
89-
ProcessManagementTick <-chan time.Time
90-
BlockFileTransfer bool
82+
BlockFileTransfer bool
9183
}
9284

9385
type Client interface {
@@ -147,10 +139,6 @@ func New(options Options) Agent {
147139
prometheusRegistry = prometheus.NewRegistry()
148140
}
149141

150-
if options.Syscaller == nil {
151-
options.Syscaller = agentproc.NewSyscaller()
152-
}
153-
154142
hardCtx, hardCancel := context.WithCancel(context.Background())
155143
gracefulCtx, gracefulCancel := context.WithCancel(hardCtx)
156144
a := &agent{
@@ -178,9 +166,6 @@ func New(options Options) Agent {
178166
announcementBannersRefreshInterval: options.ServiceBannerRefreshInterval,
179167
sshMaxTimeout: options.SSHMaxTimeout,
180168
subsystems: options.Subsystems,
181-
syscaller: options.Syscaller,
182-
modifiedProcs: options.ModifiedProcesses,
183-
processManagementTick: options.ProcessManagementTick,
184169
logSender: agentsdk.NewLogSender(options.Logger),
185170
blockFileTransfer: options.BlockFileTransfer,
186171

@@ -253,13 +238,7 @@ type agent struct {
253238
prometheusRegistry *prometheus.Registry
254239
// metrics are prometheus registered metrics that will be collected and
255240
// labeled in Coder with the agent + workspace.
256-
metrics *agentMetrics
257-
syscaller agentproc.Syscaller
258-
259-
// modifiedProcs is used for testing process priority management.
260-
modifiedProcs chan []*agentproc.Process
261-
// processManagementTick is used for testing process priority management.
262-
processManagementTick <-chan time.Time
241+
metrics *agentMetrics
263242
}
264243

265244
func (a *agent) TailnetConn() *tailnet.Conn {
@@ -308,8 +287,6 @@ func (a *agent) init() {
308287
// may be happening, but regardless after the intermittent
309288
// failure, you'll want the agent to reconnect.
310289
func (a *agent) runLoop() {
311-
go a.manageProcessPriorityUntilGracefulShutdown()
312-
313290
// need to keep retrying up to the hardCtx so that we can send graceful shutdown-related
314291
// messages.
315292
ctx := a.hardCtx
@@ -1443,162 +1420,6 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
14431420
return stats
14441421
}
14451422

1446-
var prioritizedProcs = []string{"coder agent"}
1447-
1448-
func (a *agent) manageProcessPriorityUntilGracefulShutdown() {
1449-
// process priority can stop as soon as we are gracefully shutting down
1450-
ctx := a.gracefulCtx
1451-
defer func() {
1452-
if r := recover(); r != nil {
1453-
a.logger.Critical(ctx, "recovered from panic",
1454-
slog.F("panic", r),
1455-
slog.F("stack", string(debug.Stack())),
1456-
)
1457-
}
1458-
}()
1459-
1460-
if val := a.environmentVariables[EnvProcPrioMgmt]; val == "" || runtime.GOOS != "linux" {
1461-
a.logger.Debug(ctx, "process priority not enabled, agent will not manage process niceness/oom_score_adj ",
1462-
slog.F("env_var", EnvProcPrioMgmt),
1463-
slog.F("value", val),
1464-
slog.F("goos", runtime.GOOS),
1465-
)
1466-
return
1467-
}
1468-
1469-
if a.processManagementTick == nil {
1470-
ticker := time.NewTicker(time.Second)
1471-
defer ticker.Stop()
1472-
a.processManagementTick = ticker.C
1473-
}
1474-
1475-
oomScore := unsetOOMScore
1476-
if scoreStr, ok := a.environmentVariables[EnvProcOOMScore]; ok {
1477-
score, err := strconv.Atoi(strings.TrimSpace(scoreStr))
1478-
if err == nil && score >= -1000 && score <= 1000 {
1479-
oomScore = score
1480-
} else {
1481-
a.logger.Error(ctx, "invalid oom score",
1482-
slog.F("min_value", -1000),
1483-
slog.F("max_value", 1000),
1484-
slog.F("value", scoreStr),
1485-
)
1486-
}
1487-
}
1488-
1489-
debouncer := &logDebouncer{
1490-
logger: a.logger,
1491-
messages: map[string]time.Time{},
1492-
interval: time.Minute,
1493-
}
1494-
1495-
for {
1496-
procs, err := a.manageProcessPriority(ctx, debouncer, oomScore)
1497-
// Avoid spamming the logs too often.
1498-
if err != nil {
1499-
debouncer.Error(ctx, "manage process priority",
1500-
slog.Error(err),
1501-
)
1502-
}
1503-
if a.modifiedProcs != nil {
1504-
a.modifiedProcs <- procs
1505-
}
1506-
1507-
select {
1508-
case <-a.processManagementTick:
1509-
case <-ctx.Done():
1510-
return
1511-
}
1512-
}
1513-
}
1514-
1515-
// unsetOOMScore is set to an invalid OOM score to imply an unset value.
1516-
const unsetOOMScore = 1001
1517-
1518-
func (a *agent) manageProcessPriority(ctx context.Context, debouncer *logDebouncer, oomScore int) ([]*agentproc.Process, error) {
1519-
const (
1520-
niceness = 10
1521-
)
1522-
1523-
// We fetch the agent score each time because it's possible someone updates the
1524-
// value after it is started.
1525-
agentScore, err := a.getAgentOOMScore()
1526-
if err != nil {
1527-
agentScore = unsetOOMScore
1528-
}
1529-
if oomScore == unsetOOMScore && agentScore != unsetOOMScore {
1530-
// If the child score has not been explicitly specified we should
1531-
// set it to a score relative to the agent score.
1532-
oomScore = childOOMScore(agentScore)
1533-
}
1534-
1535-
procs, err := agentproc.List(a.filesystem, a.syscaller)
1536-
if err != nil {
1537-
return nil, xerrors.Errorf("list: %w", err)
1538-
}
1539-
1540-
modProcs := []*agentproc.Process{}
1541-
1542-
for _, proc := range procs {
1543-
containsFn := func(e string) bool {
1544-
contains := strings.Contains(proc.Cmd(), e)
1545-
return contains
1546-
}
1547-
1548-
// If the process is prioritized we should adjust
1549-
// it's oom_score_adj and avoid lowering its niceness.
1550-
if slices.ContainsFunc(prioritizedProcs, containsFn) {
1551-
continue
1552-
}
1553-
1554-
score, niceErr := proc.Niceness(a.syscaller)
1555-
if niceErr != nil && !isBenignProcessErr(niceErr) {
1556-
debouncer.Warn(ctx, "unable to get proc niceness",
1557-
slog.F("cmd", proc.Cmd()),
1558-
slog.F("pid", proc.PID),
1559-
slog.Error(niceErr),
1560-
)
1561-
}
1562-
1563-
// We only want processes that don't have a nice value set
1564-
// so we don't override user nice values.
1565-
// Getpriority actually returns priority for the nice value
1566-
// which is niceness + 20, so here 20 = a niceness of 0 (aka unset).
1567-
if score != 20 {
1568-
// We don't log here since it can get spammy
1569-
continue
1570-
}
1571-
1572-
if niceErr == nil {
1573-
err := proc.SetNiceness(a.syscaller, niceness)
1574-
if err != nil && !isBenignProcessErr(err) {
1575-
debouncer.Warn(ctx, "unable to set proc niceness",
1576-
slog.F("cmd", proc.Cmd()),
1577-
slog.F("pid", proc.PID),
1578-
slog.F("niceness", niceness),
1579-
slog.Error(err),
1580-
)
1581-
}
1582-
}
1583-
1584-
// If the oom score is valid and it's not already set and isn't a custom value set by another process then it's ok to update it.
1585-
if oomScore != unsetOOMScore && oomScore != proc.OOMScoreAdj && !isCustomOOMScore(agentScore, proc) {
1586-
oomScoreStr := strconv.Itoa(oomScore)
1587-
err := afero.WriteFile(a.filesystem, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), []byte(oomScoreStr), 0o644)
1588-
if err != nil && !isBenignProcessErr(err) {
1589-
debouncer.Warn(ctx, "unable to set oom_score_adj",
1590-
slog.F("cmd", proc.Cmd()),
1591-
slog.F("pid", proc.PID),
1592-
slog.F("score", oomScoreStr),
1593-
slog.Error(err),
1594-
)
1595-
}
1596-
}
1597-
modProcs = append(modProcs, proc)
1598-
}
1599-
return modProcs, nil
1600-
}
1601-
16021423
// isClosed returns whether the API is closed or not.
16031424
func (a *agent) isClosed() bool {
16041425
return a.hardCtx.Err() != nil
@@ -1992,88 +1813,3 @@ func PrometheusMetricsHandler(prometheusRegistry *prometheus.Registry, logger sl
19921813
}
19931814
})
19941815
}
1995-
1996-
// childOOMScore returns the oom_score_adj for a child process. It is based
1997-
// on the oom_score_adj of the agent process.
1998-
func childOOMScore(agentScore int) int {
1999-
// If the agent has a negative oom_score_adj, we set the child to 0
2000-
// so it's treated like every other process.
2001-
if agentScore < 0 {
2002-
return 0
2003-
}
2004-
2005-
// If the agent is already almost at the maximum then set it to the max.
2006-
if agentScore >= 998 {
2007-
return 1000
2008-
}
2009-
2010-
// If the agent oom_score_adj is >=0, we set the child to slightly
2011-
// less than the maximum. If users want a different score they set it
2012-
// directly.
2013-
return 998
2014-
}
2015-
2016-
func (a *agent) getAgentOOMScore() (int, error) {
2017-
scoreStr, err := afero.ReadFile(a.filesystem, "/proc/self/oom_score_adj")
2018-
if err != nil {
2019-
return 0, xerrors.Errorf("read file: %w", err)
2020-
}
2021-
2022-
score, err := strconv.Atoi(strings.TrimSpace(string(scoreStr)))
2023-
if err != nil {
2024-
return 0, xerrors.Errorf("parse int: %w", err)
2025-
}
2026-
2027-
return score, nil
2028-
}
2029-
2030-
// isCustomOOMScore checks to see if the oom_score_adj is not a value that would
2031-
// originate from an agent-spawned process.
2032-
func isCustomOOMScore(agentScore int, process *agentproc.Process) bool {
2033-
score := process.OOMScoreAdj
2034-
return agentScore != score && score != 1000 && score != 0 && score != 998
2035-
}
2036-
2037-
// logDebouncer skips writing a log for a particular message if
2038-
// it's been emitted within the given interval duration.
2039-
// It's a shoddy implementation used in one spot that should be replaced at
2040-
// some point.
2041-
type logDebouncer struct {
2042-
logger slog.Logger
2043-
messages map[string]time.Time
2044-
interval time.Duration
2045-
}
2046-
2047-
func (l *logDebouncer) Warn(ctx context.Context, msg string, fields ...any) {
2048-
l.log(ctx, slog.LevelWarn, msg, fields...)
2049-
}
2050-
2051-
func (l *logDebouncer) Error(ctx context.Context, msg string, fields ...any) {
2052-
l.log(ctx, slog.LevelError, msg, fields...)
2053-
}
2054-
2055-
func (l *logDebouncer) log(ctx context.Context, level slog.Level, msg string, fields ...any) {
2056-
// This (bad) implementation assumes you wouldn't reuse the same msg
2057-
// for different levels.
2058-
if last, ok := l.messages[msg]; ok && time.Since(last) < l.interval {
2059-
return
2060-
}
2061-
switch level {
2062-
case slog.LevelWarn:
2063-
l.logger.Warn(ctx, msg, fields...)
2064-
case slog.LevelError:
2065-
l.logger.Error(ctx, msg, fields...)
2066-
}
2067-
l.messages[msg] = time.Now()
2068-
}
2069-
2070-
func isBenignProcessErr(err error) bool {
2071-
return err != nil &&
2072-
(xerrors.Is(err, os.ErrNotExist) ||
2073-
xerrors.Is(err, os.ErrPermission) ||
2074-
isNoSuchProcessErr(err))
2075-
}
2076-
2077-
func isNoSuchProcessErr(err error) bool {
2078-
return err != nil && strings.Contains(err.Error(), "no such process")
2079-
}

0 commit comments

Comments
 (0)