1
- package unhanger
1
+ package reaper
2
2
3
3
import (
4
4
"context"
@@ -49,14 +49,15 @@ const (
49
49
notStartedJobType jobType = "not started"
50
50
)
51
51
52
- // HungJobLogMessages are written to provisioner job logs when a job is hung and
53
- // terminated.
54
- var HungJobLogMessages = []string {
55
- "" ,
56
- "====================" ,
57
- "Coder: Build has been detected as hung for 5 minutes and will be terminated." ,
58
- "====================" ,
59
- "" ,
52
+ // jobLogMessages are written to provisioner job logs when a job is reaped
53
+ func jobLogMessages (jobType jobType , threshold float64 ) []string {
54
+ return []string {
55
+ "" ,
56
+ "====================" ,
57
+ fmt .Sprintf ("Coder: Build has been detected as %s for %.0f minutes and will be terminated." , jobType , threshold ),
58
+ "====================" ,
59
+ "" ,
60
+ }
60
61
}
61
62
62
63
// acquireLockError is returned when the detector fails to acquire a lock and
@@ -209,18 +210,14 @@ func (d *Detector) run(t time.Time) Stats {
209
210
jobs = jobs [:MaxJobsPerRun ]
210
211
}
211
212
212
- // Send a message into the build log for each hung or not startedjob saying that it
213
- // has been detected and will be terminated, then mark the job as
214
- // failed.
213
+ // Send a message into the build log for each hung or not started job saying that it
214
+ // has been detected and will be terminated, then mark the job as failed.
215
215
for _ , job := range jobs {
216
216
log := d .log .With (slog .F ("job_id" , job .ID ))
217
217
218
- err := unhangJob (ctx , log , d .db , d .pubsub , job .ID )
218
+ err := reapJob (ctx , log , d .db , d .pubsub , job .ID )
219
219
if err != nil {
220
- jobType := notStartedJobType
221
- if job .StartedAt .Valid {
222
- jobType = hungJobType
223
- }
220
+ jobType , _ := reapParamsFromJob (job )
224
221
if ! (xerrors .As (err , & acquireLockError {}) || xerrors .As (err , & jobIneligibleError {})) {
225
222
log .Error (ctx , fmt .Sprintf ("error forcefully terminating %s provisioner job" , jobType ), slog .Error (err ))
226
223
}
@@ -233,11 +230,11 @@ func (d *Detector) run(t time.Time) Stats {
233
230
return stats
234
231
}
235
232
236
- func unhangJob (ctx context.Context , log slog.Logger , db database.Store , pub pubsub.Pubsub , jobID uuid.UUID ) error {
233
+ func reapJob (ctx context.Context , log slog.Logger , db database.Store , pub pubsub.Pubsub , jobID uuid.UUID ) error {
237
234
var lowestLogID int64
238
235
239
236
err := db .InTx (func (db database.Store ) error {
240
- locked , err := db .TryAcquireLock (ctx , database .GenLockID (fmt .Sprintf ("unhanger :%s" , jobID )))
237
+ locked , err := db .TryAcquireLock (ctx , database .GenLockID (fmt .Sprintf ("reaper :%s" , jobID )))
241
238
if err != nil {
242
239
return xerrors .Errorf ("acquire lock: %w" , err )
243
240
}
@@ -252,20 +249,14 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
252
249
return xerrors .Errorf ("get provisioner job: %w" , err )
253
250
}
254
251
255
- jobType := hungJobType
256
- threshold := HungJobDuration .Minutes ()
257
-
258
- if ! job .StartedAt .Valid {
259
- jobType = notStartedJobType
260
- threshold = NotStartedTimeElapsed .Minutes ()
261
- }
252
+ jobType , threshold := reapParamsFromJob (job )
262
253
263
254
if job .CompletedAt .Valid {
264
255
return jobIneligibleError {
265
256
Err : xerrors .Errorf ("job is completed (status %s)" , job .JobStatus ),
266
257
}
267
258
}
268
- if job .UpdatedAt .After (time .Now ().Add (- HungJobDuration )) {
259
+ if job .UpdatedAt .After (time .Now ().Add (- time . Duration ( threshold ) * time . Minute )) {
269
260
return jobIneligibleError {
270
261
Err : xerrors .New ("job has been updated recently" ),
271
262
}
@@ -303,7 +294,7 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
303
294
Output : nil ,
304
295
}
305
296
now := dbtime .Now ()
306
- for i , msg := range HungJobLogMessages {
297
+ for i , msg := range jobLogMessages ( jobType , threshold ) {
307
298
// Set the created at in a way that ensures each message has
308
299
// a unique timestamp so they will be sorted correctly.
309
300
insertParams .CreatedAt = append (insertParams .CreatedAt , now .Add (time .Millisecond * time .Duration (i )))
@@ -405,3 +396,13 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
405
396
406
397
return nil
407
398
}
399
+
400
+ func reapParamsFromJob (job database.ProvisionerJob ) (jobType , float64 ) {
401
+ jobType := hungJobType
402
+ threshold := HungJobDuration .Minutes ()
403
+ if ! job .StartedAt .Valid {
404
+ jobType = notStartedJobType
405
+ threshold = NotStartedTimeElapsed .Minutes ()
406
+ }
407
+ return jobType , threshold
408
+ }
0 commit comments