Skip to content

Commit d6b9806

Browse files
chore: implement oom/ood processing component (#16436)
Implements the processing logic as set out in the OOM/OOD RFC.
1 parent b5329ae commit d6b9806

26 files changed

+1823
-113
lines changed

coderd/agentapi/api.go

+25-3
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ import (
1717

1818
"cdr.dev/slog"
1919
agentproto "github.com/coder/coder/v2/agent/proto"
20+
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
2021
"github.com/coder/coder/v2/coderd/appearance"
2122
"github.com/coder/coder/v2/coderd/database"
2223
"github.com/coder/coder/v2/coderd/database/pubsub"
2324
"github.com/coder/coder/v2/coderd/externalauth"
25+
"github.com/coder/coder/v2/coderd/notifications"
2426
"github.com/coder/coder/v2/coderd/prometheusmetrics"
2527
"github.com/coder/coder/v2/coderd/tracing"
2628
"github.com/coder/coder/v2/coderd/workspacestats"
@@ -29,6 +31,7 @@ import (
2931
"github.com/coder/coder/v2/codersdk/agentsdk"
3032
"github.com/coder/coder/v2/tailnet"
3133
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
34+
"github.com/coder/quartz"
3235
)
3336

3437
// API implements the DRPC agent API interface from agent/proto. This struct is
@@ -59,7 +62,9 @@ type Options struct {
5962

6063
Ctx context.Context
6164
Log slog.Logger
65+
Clock quartz.Clock
6266
Database database.Store
67+
NotificationsEnqueuer notifications.Enqueuer
6368
Pubsub pubsub.Pubsub
6469
DerpMapFn func() *tailcfg.DERPMap
6570
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
@@ -82,6 +87,10 @@ type Options struct {
8287
}
8388

8489
func New(opts Options) *API {
90+
if opts.Clock == nil {
91+
opts.Clock = quartz.NewReal()
92+
}
93+
8594
api := &API{
8695
opts: opts,
8796
mu: sync.Mutex{},
@@ -104,9 +113,22 @@ func New(opts Options) *API {
104113
}
105114

106115
api.ResourcesMonitoringAPI = &ResourcesMonitoringAPI{
107-
Log: opts.Log,
108-
AgentID: opts.AgentID,
109-
Database: opts.Database,
116+
AgentID: opts.AgentID,
117+
WorkspaceID: opts.WorkspaceID,
118+
Clock: opts.Clock,
119+
Database: opts.Database,
120+
NotificationsEnqueuer: opts.NotificationsEnqueuer,
121+
Debounce: 5 * time.Minute,
122+
123+
Config: resourcesmonitor.Config{
124+
NumDatapoints: 20,
125+
CollectionInterval: 10 * time.Second,
126+
127+
Alert: resourcesmonitor.AlertConfig{
128+
MinimumNOKsPercent: 20,
129+
ConsecutiveNOKsPercent: 50,
130+
},
131+
},
110132
}
111133

112134
api.StatsAPI = &StatsAPI{

coderd/agentapi/resources_monitoring.go

+198-9
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,35 @@ import (
44
"context"
55
"database/sql"
66
"errors"
7+
"fmt"
8+
"time"
79

810
"golang.org/x/xerrors"
911

12+
"cdr.dev/slog"
13+
1014
"github.com/google/uuid"
1115

12-
"cdr.dev/slog"
1316
"github.com/coder/coder/v2/agent/proto"
17+
"github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
1418
"github.com/coder/coder/v2/coderd/database"
19+
"github.com/coder/coder/v2/coderd/database/dbauthz"
20+
"github.com/coder/coder/v2/coderd/database/dbtime"
21+
"github.com/coder/coder/v2/coderd/notifications"
22+
"github.com/coder/quartz"
1523
)
1624

1725
type ResourcesMonitoringAPI struct {
18-
AgentID uuid.UUID
19-
Database database.Store
20-
Log slog.Logger
26+
AgentID uuid.UUID
27+
WorkspaceID uuid.UUID
28+
29+
Log slog.Logger
30+
Clock quartz.Clock
31+
Database database.Store
32+
NotificationsEnqueuer notifications.Enqueuer
33+
34+
Debounce time.Duration
35+
Config resourcesmonitor.Config
2136
}
2237

2338
func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
@@ -33,8 +48,8 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
3348

3449
return &proto.GetResourcesMonitoringConfigurationResponse{
3550
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
36-
CollectionIntervalSeconds: 10,
37-
NumDatapoints: 20,
51+
CollectionIntervalSeconds: int32(a.Config.CollectionInterval.Seconds()),
52+
NumDatapoints: a.Config.NumDatapoints,
3853
},
3954
Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory {
4055
if memoryErr != nil {
@@ -60,8 +75,182 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
6075
}
6176

6277
func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
63-
a.Log.Info(ctx, "resources monitoring usage received",
64-
slog.F("request", req))
78+
var err error
79+
80+
if memoryErr := a.monitorMemory(ctx, req.Datapoints); memoryErr != nil {
81+
err = errors.Join(err, xerrors.Errorf("monitor memory: %w", memoryErr))
82+
}
83+
84+
if volumeErr := a.monitorVolumes(ctx, req.Datapoints); volumeErr != nil {
85+
err = errors.Join(err, xerrors.Errorf("monitor volume: %w", volumeErr))
86+
}
87+
88+
return &proto.PushResourcesMonitoringUsageResponse{}, err
89+
}
90+
91+
func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
92+
monitor, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
93+
if err != nil {
94+
// It is valid for an agent to not have a memory monitor, so we
95+
// do not want to treat it as an error.
96+
if errors.Is(err, sql.ErrNoRows) {
97+
return nil
98+
}
99+
100+
return xerrors.Errorf("fetch memory resource monitor: %w", err)
101+
}
102+
103+
if !monitor.Enabled {
104+
return nil
105+
}
106+
107+
usageDatapoints := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint_MemoryUsage, 0, len(datapoints))
108+
for _, datapoint := range datapoints {
109+
usageDatapoints = append(usageDatapoints, datapoint.Memory)
110+
}
111+
112+
usageStates := resourcesmonitor.CalculateMemoryUsageStates(monitor, usageDatapoints)
113+
114+
oldState := monitor.State
115+
newState := resourcesmonitor.NextState(a.Config, oldState, usageStates)
116+
117+
debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
118+
119+
//nolint:gocritic // We need to be able to update the resource monitor here.
120+
err = a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
121+
AgentID: a.AgentID,
122+
State: newState,
123+
UpdatedAt: dbtime.Time(a.Clock.Now()),
124+
DebouncedUntil: dbtime.Time(debouncedUntil),
125+
})
126+
if err != nil {
127+
return xerrors.Errorf("update workspace monitor: %w", err)
128+
}
129+
130+
if !shouldNotify {
131+
return nil
132+
}
133+
134+
workspace, err := a.Database.GetWorkspaceByID(ctx, a.WorkspaceID)
135+
if err != nil {
136+
return xerrors.Errorf("get workspace by id: %w", err)
137+
}
138+
139+
_, err = a.NotificationsEnqueuer.EnqueueWithData(
140+
// nolint:gocritic // We need to be able to send the notification.
141+
dbauthz.AsNotifier(ctx),
142+
workspace.OwnerID,
143+
notifications.TemplateWorkspaceOutOfMemory,
144+
map[string]string{
145+
"workspace": workspace.Name,
146+
"threshold": fmt.Sprintf("%d%%", monitor.Threshold),
147+
},
148+
map[string]any{
149+
// NOTE(DanielleMaywood):
150+
// When notifications are enqueued, they are checked to be
151+
// unique within a single day. This means that if we attempt
152+
// to send two OOM notifications for the same workspace on
153+
// the same day, the enqueuer will prevent us from sending
154+
// a second one. We are inject a timestamp to make the
155+
// notifications appear different enough to circumvent this
156+
// deduplication logic.
157+
"timestamp": a.Clock.Now(),
158+
},
159+
"workspace-monitor-memory",
160+
)
161+
if err != nil {
162+
return xerrors.Errorf("notify workspace OOM: %w", err)
163+
}
164+
165+
return nil
166+
}
167+
168+
func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
169+
volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
170+
if err != nil {
171+
return xerrors.Errorf("get or insert volume monitor: %w", err)
172+
}
173+
174+
outOfDiskVolumes := make([]map[string]any, 0)
175+
176+
for _, monitor := range volumeMonitors {
177+
if !monitor.Enabled {
178+
continue
179+
}
180+
181+
usageDatapoints := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage, 0, len(datapoints))
182+
for _, datapoint := range datapoints {
183+
var usage *proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage
184+
185+
for _, volume := range datapoint.Volumes {
186+
if volume.Volume == monitor.Path {
187+
usage = volume
188+
break
189+
}
190+
}
191+
192+
usageDatapoints = append(usageDatapoints, usage)
193+
}
194+
195+
usageStates := resourcesmonitor.CalculateVolumeUsageStates(monitor, usageDatapoints)
196+
197+
oldState := monitor.State
198+
newState := resourcesmonitor.NextState(a.Config, oldState, usageStates)
199+
200+
debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
201+
202+
if shouldNotify {
203+
outOfDiskVolumes = append(outOfDiskVolumes, map[string]any{
204+
"path": monitor.Path,
205+
"threshold": fmt.Sprintf("%d%%", monitor.Threshold),
206+
})
207+
}
208+
209+
//nolint:gocritic // We need to be able to update the resource monitor here.
210+
if err := a.Database.UpdateVolumeResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateVolumeResourceMonitorParams{
211+
AgentID: a.AgentID,
212+
Path: monitor.Path,
213+
State: newState,
214+
UpdatedAt: dbtime.Time(a.Clock.Now()),
215+
DebouncedUntil: dbtime.Time(debouncedUntil),
216+
}); err != nil {
217+
return xerrors.Errorf("update workspace monitor: %w", err)
218+
}
219+
}
220+
221+
if len(outOfDiskVolumes) == 0 {
222+
return nil
223+
}
224+
225+
workspace, err := a.Database.GetWorkspaceByID(ctx, a.WorkspaceID)
226+
if err != nil {
227+
return xerrors.Errorf("get workspace by id: %w", err)
228+
}
229+
230+
if _, err := a.NotificationsEnqueuer.EnqueueWithData(
231+
// nolint:gocritic // We need to be able to send the notification.
232+
dbauthz.AsNotifier(ctx),
233+
workspace.OwnerID,
234+
notifications.TemplateWorkspaceOutOfDisk,
235+
map[string]string{
236+
"workspace": workspace.Name,
237+
},
238+
map[string]any{
239+
"volumes": outOfDiskVolumes,
240+
// NOTE(DanielleMaywood):
241+
// When notifications are enqueued, they are checked to be
242+
// unique within a single day. This means that if we attempt
243+
// to send two OOM notifications for the same workspace on
244+
// the same day, the enqueuer will prevent us from sending
245+
// a second one. We are inject a timestamp to make the
246+
// notifications appear different enough to circumvent this
247+
// deduplication logic.
248+
"timestamp": a.Clock.Now(),
249+
},
250+
"workspace-monitor-volumes",
251+
); err != nil {
252+
return xerrors.Errorf("notify workspace OOD: %w", err)
253+
}
65254

66-
return &proto.PushResourcesMonitoringUsageResponse{}, nil
255+
return nil
67256
}

0 commit comments

Comments
 (0)