@@ -4,20 +4,35 @@ import (
4
4
"context"
5
5
"database/sql"
6
6
"errors"
7
+ "fmt"
8
+ "time"
7
9
8
10
"golang.org/x/xerrors"
9
11
12
+ "cdr.dev/slog"
13
+
10
14
"github.com/google/uuid"
11
15
12
- "cdr.dev/slog"
13
16
"github.com/coder/coder/v2/agent/proto"
17
+ "github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
14
18
"github.com/coder/coder/v2/coderd/database"
19
+ "github.com/coder/coder/v2/coderd/database/dbauthz"
20
+ "github.com/coder/coder/v2/coderd/database/dbtime"
21
+ "github.com/coder/coder/v2/coderd/notifications"
22
+ "github.com/coder/quartz"
15
23
)
16
24
17
25
type ResourcesMonitoringAPI struct {
18
- AgentID uuid.UUID
19
- Database database.Store
20
- Log slog.Logger
26
+ AgentID uuid.UUID
27
+ WorkspaceID uuid.UUID
28
+
29
+ Log slog.Logger
30
+ Clock quartz.Clock
31
+ Database database.Store
32
+ NotificationsEnqueuer notifications.Enqueuer
33
+
34
+ Debounce time.Duration
35
+ Config resourcesmonitor.Config
21
36
}
22
37
23
38
func (a * ResourcesMonitoringAPI ) GetResourcesMonitoringConfiguration (ctx context.Context , _ * proto.GetResourcesMonitoringConfigurationRequest ) (* proto.GetResourcesMonitoringConfigurationResponse , error ) {
@@ -33,8 +48,8 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
33
48
34
49
return & proto.GetResourcesMonitoringConfigurationResponse {
35
50
Config : & proto.GetResourcesMonitoringConfigurationResponse_Config {
36
- CollectionIntervalSeconds : 10 ,
37
- NumDatapoints : 20 ,
51
+ CollectionIntervalSeconds : int32 ( a . Config . CollectionInterval . Seconds ()) ,
52
+ NumDatapoints : a . Config . NumDatapoints ,
38
53
},
39
54
Memory : func () * proto.GetResourcesMonitoringConfigurationResponse_Memory {
40
55
if memoryErr != nil {
@@ -60,8 +75,182 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
60
75
}
61
76
62
77
func (a * ResourcesMonitoringAPI ) PushResourcesMonitoringUsage (ctx context.Context , req * proto.PushResourcesMonitoringUsageRequest ) (* proto.PushResourcesMonitoringUsageResponse , error ) {
63
- a .Log .Info (ctx , "resources monitoring usage received" ,
64
- slog .F ("request" , req ))
78
+ var err error
79
+
80
+ if memoryErr := a .monitorMemory (ctx , req .Datapoints ); memoryErr != nil {
81
+ err = errors .Join (err , xerrors .Errorf ("monitor memory: %w" , memoryErr ))
82
+ }
83
+
84
+ if volumeErr := a .monitorVolumes (ctx , req .Datapoints ); volumeErr != nil {
85
+ err = errors .Join (err , xerrors .Errorf ("monitor volume: %w" , volumeErr ))
86
+ }
87
+
88
+ return & proto.PushResourcesMonitoringUsageResponse {}, err
89
+ }
90
+
91
+ func (a * ResourcesMonitoringAPI ) monitorMemory (ctx context.Context , datapoints []* proto.PushResourcesMonitoringUsageRequest_Datapoint ) error {
92
+ monitor , err := a .Database .FetchMemoryResourceMonitorsByAgentID (ctx , a .AgentID )
93
+ if err != nil {
94
+ // It is valid for an agent to not have a memory monitor, so we
95
+ // do not want to treat it as an error.
96
+ if errors .Is (err , sql .ErrNoRows ) {
97
+ return nil
98
+ }
99
+
100
+ return xerrors .Errorf ("fetch memory resource monitor: %w" , err )
101
+ }
102
+
103
+ if ! monitor .Enabled {
104
+ return nil
105
+ }
106
+
107
+ usageDatapoints := make ([]* proto.PushResourcesMonitoringUsageRequest_Datapoint_MemoryUsage , 0 , len (datapoints ))
108
+ for _ , datapoint := range datapoints {
109
+ usageDatapoints = append (usageDatapoints , datapoint .Memory )
110
+ }
111
+
112
+ usageStates := resourcesmonitor .CalculateMemoryUsageStates (monitor , usageDatapoints )
113
+
114
+ oldState := monitor .State
115
+ newState := resourcesmonitor .NextState (a .Config , oldState , usageStates )
116
+
117
+ debouncedUntil , shouldNotify := monitor .Debounce (a .Debounce , a .Clock .Now (), oldState , newState )
118
+
119
+ //nolint:gocritic // We need to be able to update the resource monitor here.
120
+ err = a .Database .UpdateMemoryResourceMonitor (dbauthz .AsResourceMonitor (ctx ), database.UpdateMemoryResourceMonitorParams {
121
+ AgentID : a .AgentID ,
122
+ State : newState ,
123
+ UpdatedAt : dbtime .Time (a .Clock .Now ()),
124
+ DebouncedUntil : dbtime .Time (debouncedUntil ),
125
+ })
126
+ if err != nil {
127
+ return xerrors .Errorf ("update workspace monitor: %w" , err )
128
+ }
129
+
130
+ if ! shouldNotify {
131
+ return nil
132
+ }
133
+
134
+ workspace , err := a .Database .GetWorkspaceByID (ctx , a .WorkspaceID )
135
+ if err != nil {
136
+ return xerrors .Errorf ("get workspace by id: %w" , err )
137
+ }
138
+
139
+ _ , err = a .NotificationsEnqueuer .EnqueueWithData (
140
+ // nolint:gocritic // We need to be able to send the notification.
141
+ dbauthz .AsNotifier (ctx ),
142
+ workspace .OwnerID ,
143
+ notifications .TemplateWorkspaceOutOfMemory ,
144
+ map [string ]string {
145
+ "workspace" : workspace .Name ,
146
+ "threshold" : fmt .Sprintf ("%d%%" , monitor .Threshold ),
147
+ },
148
+ map [string ]any {
149
+ // NOTE(DanielleMaywood):
150
+ // When notifications are enqueued, they are checked to be
151
+ // unique within a single day. This means that if we attempt
152
+ // to send two OOM notifications for the same workspace on
153
+ // the same day, the enqueuer will prevent us from sending
154
+ // a second one. We are inject a timestamp to make the
155
+ // notifications appear different enough to circumvent this
156
+ // deduplication logic.
157
+ "timestamp" : a .Clock .Now (),
158
+ },
159
+ "workspace-monitor-memory" ,
160
+ )
161
+ if err != nil {
162
+ return xerrors .Errorf ("notify workspace OOM: %w" , err )
163
+ }
164
+
165
+ return nil
166
+ }
167
+
168
+ func (a * ResourcesMonitoringAPI ) monitorVolumes (ctx context.Context , datapoints []* proto.PushResourcesMonitoringUsageRequest_Datapoint ) error {
169
+ volumeMonitors , err := a .Database .FetchVolumesResourceMonitorsByAgentID (ctx , a .AgentID )
170
+ if err != nil {
171
+ return xerrors .Errorf ("get or insert volume monitor: %w" , err )
172
+ }
173
+
174
+ outOfDiskVolumes := make ([]map [string ]any , 0 )
175
+
176
+ for _ , monitor := range volumeMonitors {
177
+ if ! monitor .Enabled {
178
+ continue
179
+ }
180
+
181
+ usageDatapoints := make ([]* proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage , 0 , len (datapoints ))
182
+ for _ , datapoint := range datapoints {
183
+ var usage * proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage
184
+
185
+ for _ , volume := range datapoint .Volumes {
186
+ if volume .Volume == monitor .Path {
187
+ usage = volume
188
+ break
189
+ }
190
+ }
191
+
192
+ usageDatapoints = append (usageDatapoints , usage )
193
+ }
194
+
195
+ usageStates := resourcesmonitor .CalculateVolumeUsageStates (monitor , usageDatapoints )
196
+
197
+ oldState := monitor .State
198
+ newState := resourcesmonitor .NextState (a .Config , oldState , usageStates )
199
+
200
+ debouncedUntil , shouldNotify := monitor .Debounce (a .Debounce , a .Clock .Now (), oldState , newState )
201
+
202
+ if shouldNotify {
203
+ outOfDiskVolumes = append (outOfDiskVolumes , map [string ]any {
204
+ "path" : monitor .Path ,
205
+ "threshold" : fmt .Sprintf ("%d%%" , monitor .Threshold ),
206
+ })
207
+ }
208
+
209
+ //nolint:gocritic // We need to be able to update the resource monitor here.
210
+ if err := a .Database .UpdateVolumeResourceMonitor (dbauthz .AsResourceMonitor (ctx ), database.UpdateVolumeResourceMonitorParams {
211
+ AgentID : a .AgentID ,
212
+ Path : monitor .Path ,
213
+ State : newState ,
214
+ UpdatedAt : dbtime .Time (a .Clock .Now ()),
215
+ DebouncedUntil : dbtime .Time (debouncedUntil ),
216
+ }); err != nil {
217
+ return xerrors .Errorf ("update workspace monitor: %w" , err )
218
+ }
219
+ }
220
+
221
+ if len (outOfDiskVolumes ) == 0 {
222
+ return nil
223
+ }
224
+
225
+ workspace , err := a .Database .GetWorkspaceByID (ctx , a .WorkspaceID )
226
+ if err != nil {
227
+ return xerrors .Errorf ("get workspace by id: %w" , err )
228
+ }
229
+
230
+ if _ , err := a .NotificationsEnqueuer .EnqueueWithData (
231
+ // nolint:gocritic // We need to be able to send the notification.
232
+ dbauthz .AsNotifier (ctx ),
233
+ workspace .OwnerID ,
234
+ notifications .TemplateWorkspaceOutOfDisk ,
235
+ map [string ]string {
236
+ "workspace" : workspace .Name ,
237
+ },
238
+ map [string ]any {
239
+ "volumes" : outOfDiskVolumes ,
240
+ // NOTE(DanielleMaywood):
241
+ // When notifications are enqueued, they are checked to be
242
+ // unique within a single day. This means that if we attempt
243
+ // to send two OOM notifications for the same workspace on
244
+ // the same day, the enqueuer will prevent us from sending
245
+ // a second one. We are inject a timestamp to make the
246
+ // notifications appear different enough to circumvent this
247
+ // deduplication logic.
248
+ "timestamp" : a .Clock .Now (),
249
+ },
250
+ "workspace-monitor-volumes" ,
251
+ ); err != nil {
252
+ return xerrors .Errorf ("notify workspace OOD: %w" , err )
253
+ }
65
254
66
- return & proto. PushResourcesMonitoringUsageResponse {}, nil
255
+ return nil
67
256
}
0 commit comments