@@ -31,6 +31,10 @@ const (
31
31
// Provisioners should avoid keeping a job "running" for longer than this
32
32
// time after failing to send an update to the job.
33
33
HungJobExitTimeout = 3 * time .Minute
34
+
35
+ // MaxJobsPerRun is the maximum number of hung jobs that the detector will
36
+ // terminate in a single run.
37
+ MaxJobsPerRun = 10
34
38
)
35
39
36
40
// HungJobLogMessages are written to provisioner job logs when a job is hung and
@@ -45,18 +49,11 @@ var HungJobLogMessages = []string{
45
49
46
50
// acquireLockError is returned when the detector fails to acquire a lock and
47
51
// cancels the current run.
48
- type acquireLockError struct {
49
- err error
50
- }
52
+ type acquireLockError struct {}
51
53
52
54
// Error implements error.
53
- func (e * acquireLockError ) Error () string {
54
- return "acquire lock: " + e .err .Error ()
55
- }
56
-
57
- // Unwrap implements xerrors.Wrapper.
58
- func (e * acquireLockError ) Unwrap () error {
59
- return e .err
55
+ func (acquireLockError ) Error () string {
56
+ return "lock is held by another client"
60
57
}
61
58
62
59
// Detector automatically detects hung provisioner jobs, sends messages into the
@@ -75,9 +72,9 @@ type Detector struct {
75
72
76
73
// Stats contains statistics about the last run of the detector.
77
74
type Stats struct {
78
- // HungJobIDs contains the IDs of all jobs that were detected as hung and
75
+ // TerminatedJobIDs contains the IDs of all jobs that were detected as hung and
79
76
// terminated.
80
- HungJobIDs []uuid.UUID
77
+ TerminatedJobIDs []uuid.UUID
81
78
// Error is the fatal error that occurred during the last run of the
82
79
// detector, if any. Error may be set to AcquireLockError if the detector
83
80
// failed to acquire a lock.
@@ -88,7 +85,7 @@ type Stats struct {
88
85
func New (ctx context.Context , db database.Store , pub pubsub.Pubsub , log slog.Logger , tick <- chan time.Time ) * Detector {
89
86
//nolint:gocritic // Hang detector has a limited set of permissions.
90
87
ctx , cancel := context .WithCancel (dbauthz .AsHangDetector (ctx ))
91
- le := & Detector {
88
+ d := & Detector {
92
89
ctx : ctx ,
93
90
cancel : cancel ,
94
91
done : make (chan struct {}),
@@ -98,7 +95,7 @@ func New(ctx context.Context, db database.Store, pub pubsub.Pubsub, log slog.Log
98
95
tick : tick ,
99
96
stats : nil ,
100
97
}
101
- return le
98
+ return d
102
99
}
103
100
104
101
// WithStatsChannel will cause Executor to push a RunStats to ch after
@@ -131,8 +128,8 @@ func (d *Detector) Start() {
131
128
if stats .Error != nil && ! xerrors .As (stats .Error , & acquireLockError {}) {
132
129
d .log .Warn (d .ctx , "error running workspace build hang detector once" , slog .Error (stats .Error ))
133
130
}
134
- if len (stats .HungJobIDs ) != 0 {
135
- d .log .Warn (d .ctx , "detected (and terminated) hung provisioner jobs" , slog .F ("job_ids" , stats .HungJobIDs ))
131
+ if len (stats .TerminatedJobIDs ) != 0 {
132
+ d .log .Warn (d .ctx , "detected (and terminated) hung provisioner jobs" , slog .F ("job_ids" , stats .TerminatedJobIDs ))
136
133
}
137
134
if d .stats != nil {
138
135
select {
@@ -162,8 +159,8 @@ func (d *Detector) run(t time.Time) Stats {
162
159
defer cancel ()
163
160
164
161
stats := Stats {
165
- HungJobIDs : []uuid.UUID {},
166
- Error : nil ,
162
+ TerminatedJobIDs : []uuid.UUID {},
163
+ Error : nil ,
167
164
}
168
165
169
166
err := d .db .InTx (func (db database.Store ) error {
@@ -173,9 +170,9 @@ func (d *Detector) run(t time.Time) Stats {
173
170
// hang detector is already running in another coder replica.
174
171
// There's no point in waiting to run it again, so we'll just retry
175
172
// on the next tick.
176
- d .log .Info (ctx , "skipping workspace build hang detector run due to lock" , slog . Error ( err ) )
173
+ d .log .Info (ctx , "skipping workspace build hang detector run due to lock" )
177
174
// This error is ignored.
178
- return & acquireLockError {err : err }
175
+ return acquireLockError {}
179
176
}
180
177
if err != nil {
181
178
d .log .Warn (ctx , "skipping workspace build hang detector run due to error acquiring lock" , slog .Error (err ))
@@ -190,6 +187,12 @@ func (d *Detector) run(t time.Time) Stats {
190
187
return xerrors .Errorf ("get hung provisioner jobs: %w" , err )
191
188
}
192
189
190
+ // Limit the number of jobs we'll unhang in a single run to avoid
191
+ // timing out.
192
+ if len (jobs ) > MaxJobsPerRun {
193
+ jobs = jobs [:MaxJobsPerRun ]
194
+ }
195
+
193
196
// Send a message into the build log for each hung job saying that it
194
197
// has been detected and will be terminated, then mark the job as
195
198
// failed.
@@ -204,121 +207,13 @@ func (d *Detector) run(t time.Time) Stats {
204
207
205
208
log .Info (ctx , "detected hung (>5m) provisioner job, forcefully terminating" )
206
209
207
- // First, get the latest logs from the build so we can make sure
208
- // our messages are in the latest stage.
209
- logs , err := db .GetProvisionerLogsAfterID (ctx , database.GetProvisionerLogsAfterIDParams {
210
- JobID : job .ID ,
211
- CreatedAfter : 0 ,
212
- })
213
- if err != nil {
214
- log .Warn (ctx , "get logs for hung job" , slog .Error (err ))
215
- continue
216
- }
217
- logStage := ""
218
- if len (logs ) != 0 {
219
- logStage = logs [len (logs )- 1 ].Stage
220
- }
221
- if logStage == "" {
222
- logStage = "Unknown"
223
- }
224
-
225
- // Insert the messages into the build log.
226
- insertParams := database.InsertProvisionerJobLogsParams {
227
- JobID : job .ID ,
228
- }
229
- now := database .Now ()
230
- for i , msg := range HungJobLogMessages {
231
- // Set the created at in a way that ensures each message has
232
- // a unique timestamp so they will be sorted correctly.
233
- insertParams .CreatedAt = append (insertParams .CreatedAt , now .Add (time .Millisecond * time .Duration (i )))
234
- insertParams .Level = append (insertParams .Level , database .LogLevelError )
235
- insertParams .Stage = append (insertParams .Stage , logStage )
236
- insertParams .Source = append (insertParams .Source , database .LogSourceProvisionerDaemon )
237
- insertParams .Output = append (insertParams .Output , msg )
238
- }
239
- newLogs , err := db .InsertProvisionerJobLogs (ctx , insertParams )
210
+ err := unhangJob (ctx , db , d .pubsub , job )
240
211
if err != nil {
241
- log .Warn (ctx , "insert logs for hung job" , slog .Error (err ))
212
+ log .Error (ctx , "error forcefully terminating hung provisioner job" , slog .Error (err ))
242
213
continue
243
214
}
244
215
245
- // Publish the new log notification to pubsub. Use the lowest
246
- // log ID inserted so the log stream will fetch everything after
247
- // that point.
248
- lowestID := newLogs [0 ].ID
249
- data , err := json .Marshal (provisionersdk.ProvisionerJobLogsNotifyMessage {
250
- CreatedAfter : lowestID - 1 ,
251
- })
252
- if err != nil {
253
- log .Warn (ctx , "marshal log notification" , slog .Error (err ))
254
- continue
255
- }
256
- err = d .pubsub .Publish (provisionersdk .ProvisionerJobLogsNotifyChannel (job .ID ), data )
257
- if err != nil {
258
- log .Warn (ctx , "publish log notification" , slog .Error (err ))
259
- continue
260
- }
261
-
262
- // Mark the job as failed.
263
- now = database .Now ()
264
- err = db .UpdateProvisionerJobWithCompleteByID (ctx , database.UpdateProvisionerJobWithCompleteByIDParams {
265
- ID : job .ID ,
266
- UpdatedAt : now ,
267
- CompletedAt : sql.NullTime {
268
- Time : now ,
269
- Valid : true ,
270
- },
271
- Error : sql.NullString {
272
- String : "Coder: Build has been detected as hung for 5 minutes and has been terminated." ,
273
- Valid : true ,
274
- },
275
- ErrorCode : sql.NullString {
276
- Valid : false ,
277
- },
278
- })
279
- if err != nil {
280
- log .Warn (ctx , "mark job as failed" , slog .Error (err ))
281
- continue
282
- }
283
-
284
- // If the provisioner job is a workspace build, copy the
285
- // provisioner state from the previous build to this workspace
286
- // build.
287
- if job .Type == database .ProvisionerJobTypeWorkspaceBuild {
288
- build , err := db .GetWorkspaceBuildByJobID (ctx , job .ID )
289
- if err != nil {
290
- log .Warn (ctx , "get workspace build for workspace build job by job id" , slog .Error (err ))
291
- continue
292
- }
293
-
294
- // Only copy the provisioner state if there's no state in
295
- // the current build.
296
- if len (build .ProvisionerState ) == 0 {
297
- // Get the previous build if it exists.
298
- prevBuild , err := db .GetWorkspaceBuildByWorkspaceIDAndBuildNumber (ctx , database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams {
299
- WorkspaceID : build .WorkspaceID ,
300
- BuildNumber : build .BuildNumber - 1 ,
301
- })
302
- if err == nil {
303
- _ , err = db .UpdateWorkspaceBuildByID (ctx , database.UpdateWorkspaceBuildByIDParams {
304
- ID : build .ID ,
305
- UpdatedAt : database .Now (),
306
- ProvisionerState : prevBuild .ProvisionerState ,
307
- Deadline : time.Time {},
308
- MaxDeadline : time.Time {},
309
- })
310
- if err != nil {
311
- log .Warn (ctx , "update hung workspace build provisioner state to match previous build" , slog .Error (err ))
312
- continue
313
- }
314
- } else if ! xerrors .Is (err , sql .ErrNoRows ) {
315
- log .Warn (ctx , "get previous workspace build" , slog .Error (err ))
316
- continue
317
- }
318
- }
319
- }
320
-
321
- stats .HungJobIDs = append (stats .HungJobIDs , job .ID )
216
+ stats .TerminatedJobIDs = append (stats .TerminatedJobIDs , job .ID )
322
217
}
323
218
324
219
return nil
@@ -330,3 +225,120 @@ func (d *Detector) run(t time.Time) Stats {
330
225
331
226
return stats
332
227
}
228
+
229
+ func unhangJob (ctx context.Context , db database.Store , pub pubsub.Pubsub , job database.ProvisionerJob ) error {
230
+ jobStatus := db2sdk .ProvisionerJobStatus (job )
231
+ if jobStatus != codersdk .ProvisionerJobRunning {
232
+ return xerrors .Errorf ("hang detector query discovered non-running job, this is a bug: %s" , jobStatus )
233
+ }
234
+
235
+ // First, get the latest logs from the build so we can make sure
236
+ // our messages are in the latest stage.
237
+ logs , err := db .GetProvisionerLogsAfterID (ctx , database.GetProvisionerLogsAfterIDParams {
238
+ JobID : job .ID ,
239
+ CreatedAfter : 0 ,
240
+ })
241
+ if err != nil {
242
+ return xerrors .Errorf ("get logs for hung job: %w" , err )
243
+ }
244
+ logStage := ""
245
+ if len (logs ) != 0 {
246
+ logStage = logs [len (logs )- 1 ].Stage
247
+ }
248
+ if logStage == "" {
249
+ logStage = "Unknown"
250
+ }
251
+
252
+ // Insert the messages into the build log.
253
+ insertParams := database.InsertProvisionerJobLogsParams {
254
+ JobID : job .ID ,
255
+ }
256
+ now := database .Now ()
257
+ for i , msg := range HungJobLogMessages {
258
+ // Set the created at in a way that ensures each message has
259
+ // a unique timestamp so they will be sorted correctly.
260
+ insertParams .CreatedAt = append (insertParams .CreatedAt , now .Add (time .Millisecond * time .Duration (i )))
261
+ insertParams .Level = append (insertParams .Level , database .LogLevelError )
262
+ insertParams .Stage = append (insertParams .Stage , logStage )
263
+ insertParams .Source = append (insertParams .Source , database .LogSourceProvisionerDaemon )
264
+ insertParams .Output = append (insertParams .Output , msg )
265
+ }
266
+ newLogs , err := db .InsertProvisionerJobLogs (ctx , insertParams )
267
+ if err != nil {
268
+ return xerrors .Errorf ("insert logs for hung job: %w" , err )
269
+ }
270
+
271
+ // Publish the new log notification to pubsub. Use the lowest
272
+ // log ID inserted so the log stream will fetch everything after
273
+ // that point.
274
+ lowestID := newLogs [0 ].ID
275
+ data , err := json .Marshal (provisionersdk.ProvisionerJobLogsNotifyMessage {
276
+ CreatedAfter : lowestID - 1 ,
277
+ EndOfLogs : true ,
278
+ })
279
+ if err != nil {
280
+ return xerrors .Errorf ("marshal log notification: %w" , err )
281
+ }
282
+ err = pub .Publish (provisionersdk .ProvisionerJobLogsNotifyChannel (job .ID ), data )
283
+ if err != nil {
284
+ return xerrors .Errorf ("publish log notification: %w" , err )
285
+ }
286
+
287
+ // Mark the job as failed.
288
+ now = database .Now ()
289
+ err = db .UpdateProvisionerJobWithCompleteByID (ctx , database.UpdateProvisionerJobWithCompleteByIDParams {
290
+ ID : job .ID ,
291
+ UpdatedAt : now ,
292
+ CompletedAt : sql.NullTime {
293
+ Time : now ,
294
+ Valid : true ,
295
+ },
296
+ Error : sql.NullString {
297
+ String : "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector." ,
298
+ Valid : true ,
299
+ },
300
+ ErrorCode : sql.NullString {
301
+ Valid : false ,
302
+ },
303
+ })
304
+ if err != nil {
305
+ return xerrors .Errorf ("mark job as failed: %w" , err )
306
+ }
307
+
308
+ // If the provisioner job is a workspace build, copy the
309
+ // provisioner state from the previous build to this workspace
310
+ // build.
311
+ if job .Type == database .ProvisionerJobTypeWorkspaceBuild {
312
+ build , err := db .GetWorkspaceBuildByJobID (ctx , job .ID )
313
+ if err != nil {
314
+ return xerrors .Errorf ("get workspace build for workspace build job by job id: %w" , err )
315
+ }
316
+
317
+ // Only copy the provisioner state if there's no state in
318
+ // the current build.
319
+ if len (build .ProvisionerState ) == 0 {
320
+ // Get the previous build if it exists.
321
+ prevBuild , err := db .GetWorkspaceBuildByWorkspaceIDAndBuildNumber (ctx , database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams {
322
+ WorkspaceID : build .WorkspaceID ,
323
+ BuildNumber : build .BuildNumber - 1 ,
324
+ })
325
+ if err != nil && ! xerrors .Is (err , sql .ErrNoRows ) {
326
+ return xerrors .Errorf ("get previous workspace build: %w" , err )
327
+ }
328
+ if err == nil {
329
+ _ , err = db .UpdateWorkspaceBuildByID (ctx , database.UpdateWorkspaceBuildByIDParams {
330
+ ID : build .ID ,
331
+ UpdatedAt : database .Now (),
332
+ ProvisionerState : prevBuild .ProvisionerState ,
333
+ Deadline : time.Time {},
334
+ MaxDeadline : time.Time {},
335
+ })
336
+ if err != nil {
337
+ return xerrors .Errorf ("update workspace build by id: %w" , err )
338
+ }
339
+ }
340
+ }
341
+ }
342
+
343
+ return nil
344
+ }
0 commit comments