Skip to content

feat: support adjusting child proc oom scores #12655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
debounce logs
  • Loading branch information
sreya committed Apr 2, 2024
commit 17714eda6fd9af4c57d8669d2bf2e2627af52cb4
81 changes: 66 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,15 +1581,28 @@ func (a *agent) manageProcessPriorityUntilGracefulShutdown() {
oomScore := unsetOOMScore
if scoreStr, ok := a.environmentVariables[EnvProcOOMScore]; ok {
score, err := strconv.Atoi(strings.TrimSpace(scoreStr))
if err == nil {
if err == nil && score >= -1000 && score <= 1000 {
oomScore = score
} else {
a.logger.Error(ctx, "invalid oom score",
slog.F("min_value", -1000),
slog.F("max_value", 1000),
slog.F("value", scoreStr),
)
}
}

debouncer := &logDebouncer{
logger: a.logger,
messages: map[string]time.Time{},
interval: time.Minute,
}

for {
procs, err := a.manageProcessPriority(ctx, oomScore)
procs, err := a.manageProcessPriority(ctx, debouncer, oomScore)
// Avoid spamming the logs too often.
if err != nil {
a.logger.Error(ctx, "manage process priority",
debouncer.Error(ctx, "manage process priority",
slog.Error(err),
)
}
Expand All @@ -1605,13 +1618,16 @@ func (a *agent) manageProcessPriorityUntilGracefulShutdown() {
}
}

// unsetOOMScore is set to an invalid OOM score to imply an unset value.
const unsetOOMScore = 1001

func (a *agent) manageProcessPriority(ctx context.Context, oomScore int) ([]*agentproc.Process, error) {
func (a *agent) manageProcessPriority(ctx context.Context, debouncer *logDebouncer, oomScore int) ([]*agentproc.Process, error) {
const (
niceness = 10
)

// We fetch the agent score each time because it's possible someone updates the
// value after it is started.
agentScore, err := a.getAgentOOMScore()
if err != nil {
agentScore = unsetOOMScore
Expand All @@ -1629,14 +1645,9 @@ func (a *agent) manageProcessPriority(ctx context.Context, oomScore int) ([]*age

var (
modProcs = []*agentproc.Process{}
logger slog.Logger
)

for _, proc := range procs {
logger = a.logger.With(
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
)

containsFn := func(e string) bool {
contains := strings.Contains(proc.Cmd(), e)
Expand All @@ -1651,7 +1662,9 @@ func (a *agent) manageProcessPriority(ctx context.Context, oomScore int) ([]*age

score, niceErr := proc.Niceness(a.syscaller)
if niceErr != nil && !xerrors.Is(niceErr, os.ErrPermission) {
logger.Warn(ctx, "unable to get proc niceness",
debouncer.Warn(ctx, "unable to get proc niceness",
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
slog.Error(niceErr),
)
continue
Expand All @@ -1662,27 +1675,31 @@ func (a *agent) manageProcessPriority(ctx context.Context, oomScore int) ([]*age
// Getpriority actually returns priority for the nice value
// which is niceness + 20, so here 20 = a niceness of 0 (aka unset).
if score != 20 {
// We don't log here since it can get spammy
continue
}

if niceErr == nil {
err := proc.SetNiceness(a.syscaller, niceness)
if err != nil && !xerrors.Is(err, os.ErrPermission) {
logger.Warn(ctx, "unable to set proc niceness",
debouncer.Warn(ctx, "unable to set proc niceness",
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
slog.F("niceness", niceness),
slog.Error(err),
)
}
}

// If the oom score is valid and it's not already set and isn't a custom value set by another process
// then it's ok to update it.
// If the oom score is valid and it's not already set and isn't a custom value set by another process then it's ok to update it.
if oomScore != unsetOOMScore && oomScore != proc.OOMScoreAdj && !isCustomOOMScore(agentScore, proc) {
oomScoreStr := strconv.Itoa(oomScore)
err := afero.WriteFile(a.filesystem, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), []byte(oomScoreStr), 0o644)
if err != nil && !xerrors.Is(err, os.ErrPermission) {
logger.Warn(ctx, "unable to set oom_score_adj",
slog.F("score", "0"),
debouncer.Warn(ctx, "unable to set oom_score_adj",
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
slog.F("score", oomScoreStr),
slog.Error(err),
)
}
Expand Down Expand Up @@ -2081,3 +2098,37 @@ func isCustomOOMScore(agentScore int, process *agentproc.Process) bool {
score := process.OOMScoreAdj
return agentScore != score && score != 1000 && score != 0 && score != 998
}

// logDebouncer prevents generating a log for a particular message if
// it's been emitted within the given interval duration.
// It's a shoddy implementation use in one spot that should be replaced at
// some point.
type logDebouncer struct {
logger slog.Logger
messages map[string]time.Time
interval time.Duration
}

func (l *logDebouncer) Warn(ctx context.Context, msg string, fields ...any) {
l.log(ctx, slog.LevelWarn, msg, fields...)
}

func (l *logDebouncer) Error(ctx context.Context, msg string, fields ...any) {
l.log(ctx, slog.LevelError, msg, fields...)
}

func (l *logDebouncer) log(ctx context.Context, level slog.Level, msg string, fields ...any) {
// This (bad) implementation assumes you wouldn't reuse the same msg
// for different levels.
last, ok := l.messages[msg]
if ok && time.Since(last) < l.interval {
return
}
switch level {
case slog.LevelWarn:
l.logger.Warn(ctx, msg, fields...)
case slog.LevelError:
l.logger.Error(ctx, msg, fields...)
}
l.messages[msg] = time.Now()
}
59 changes: 59 additions & 0 deletions agent/stats_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package agent

import (
"bytes"
"context"
"encoding/json"
"io"
"net/netip"
"sync"
"testing"
Expand All @@ -14,6 +17,7 @@ import (
"tailscale.com/types/netlogtype"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogjson"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/testutil"
Expand Down Expand Up @@ -210,3 +214,58 @@ func newFakeStatsDest() *fakeStatsDest {
resps: make(chan *proto.UpdateStatsResponse),
}
}

func Test_logDebouncer(t *testing.T) {
t.Parallel()

var (
buf bytes.Buffer
logger = slog.Make(slogjson.Sink(&buf))
ctx = context.Background()
)

debouncer := &logDebouncer{
logger: logger,
messages: map[string]time.Time{},
interval: time.Minute,
}

fields := map[string]interface{}{
"field_1": float64(1),
"field_2": "2",
}

debouncer.Error(ctx, "my message", "field_1", 1, "field_2", "2")
debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2")
// Shouldn't log this.
debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2")

require.Len(t, debouncer.messages, 2)

type entry struct {
Msg string `json:"msg"`
Level string `json:"level"`
Fields map[string]interface{} `json:"fields"`
}

assertLog := func(msg string, level string, fields map[string]interface{}) {
line, err := buf.ReadString('\n')
require.NoError(t, err)

var e entry
err = json.Unmarshal([]byte(line), &e)
require.NoError(t, err)
require.Equal(t, msg, e.Msg)
require.Equal(t, level, e.Level)
require.Equal(t, fields, e.Fields)
}
assertLog("my message", "ERROR", fields)
assertLog("another message", "WARN", fields)

debouncer.messages["another message"] = time.Now().Add(-2 * time.Minute)
debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2")
assertLog("another message", "WARN", fields)
// Assert nothing else was written.
_, err := buf.ReadString('\n')
require.ErrorIs(t, err, io.EOF)
}