Skip to content

Commit af994c2

Browse files
committed
refactored to reaper & added tests
1 parent ca49519 commit af994c2

File tree

6 files changed

+294
-66
lines changed

6 files changed

+294
-66
lines changed

cli/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ import (
9292
"github.com/coder/coder/v2/coderd/prometheusmetrics"
9393
"github.com/coder/coder/v2/coderd/prometheusmetrics/insights"
9494
"github.com/coder/coder/v2/coderd/promoauth"
95+
"github.com/coder/coder/v2/coderd/reaper"
9596
"github.com/coder/coder/v2/coderd/schedule"
9697
"github.com/coder/coder/v2/coderd/telemetry"
9798
"github.com/coder/coder/v2/coderd/tracing"
98-
"github.com/coder/coder/v2/coderd/unhanger"
9999
"github.com/coder/coder/v2/coderd/updatecheck"
100100
"github.com/coder/coder/v2/coderd/util/ptr"
101101
"github.com/coder/coder/v2/coderd/util/slice"
@@ -1129,7 +1129,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
11291129

11301130
hangDetectorTicker := time.NewTicker(vals.JobHangDetectorInterval.Value())
11311131
defer hangDetectorTicker.Stop()
1132-
hangDetector := unhanger.New(ctx, options.Database, options.Pubsub, logger, hangDetectorTicker.C)
1132+
hangDetector := reaper.New(ctx, options.Database, options.Pubsub, logger, hangDetectorTicker.C)
11331133
hangDetector.Start()
11341134
defer hangDetector.Close()
11351135

coderd/coderdtest/coderdtest.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ import (
7272
"github.com/coder/coder/v2/coderd/notifications/notificationstest"
7373
"github.com/coder/coder/v2/coderd/rbac"
7474
"github.com/coder/coder/v2/coderd/rbac/policy"
75+
"github.com/coder/coder/v2/coderd/reaper"
7576
"github.com/coder/coder/v2/coderd/runtimeconfig"
7677
"github.com/coder/coder/v2/coderd/schedule"
7778
"github.com/coder/coder/v2/coderd/telemetry"
78-
"github.com/coder/coder/v2/coderd/unhanger"
7979
"github.com/coder/coder/v2/coderd/updatecheck"
8080
"github.com/coder/coder/v2/coderd/util/ptr"
8181
"github.com/coder/coder/v2/coderd/webpush"
@@ -367,7 +367,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
367367

368368
hangDetectorTicker := time.NewTicker(options.DeploymentValues.JobHangDetectorInterval.Value())
369369
defer hangDetectorTicker.Stop()
370-
hangDetector := unhanger.New(ctx, options.Database, options.Pubsub, options.Logger.Named("unhanger.detector"), hangDetectorTicker.C)
370+
hangDetector := reaper.New(ctx, options.Database, options.Pubsub, options.Logger.Named("reaper.detector"), hangDetectorTicker.C)
371371
hangDetector.Start()
372372
t.Cleanup(hangDetector.Close)
373373

coderd/database/dbauthz/dbauthz.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ var (
219219
Scope: rbac.ScopeAll,
220220
}.WithCachedASTValue()
221221

222-
// See unhanger package.
222+
// See reaper package.
223223
subjectHangDetector = rbac.Subject{
224224
Type: rbac.SubjectTypeHangDetector,
225225
FriendlyName: "Hang Detector",
@@ -408,7 +408,7 @@ func AsAutostart(ctx context.Context) context.Context {
408408
}
409409

410410
// AsHangDetector returns a context with an actor that has permissions required
411-
// for unhanger.Detector to function.
411+
// for reaper.Detector to function.
412412
func AsHangDetector(ctx context.Context) context.Context {
413413
return As(ctx, subjectHangDetector)
414414
}

coderd/unhanger/detector.go renamed to coderd/reaper/detector.go

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package unhanger
1+
package reaper
22

33
import (
44
"context"
@@ -49,14 +49,15 @@ const (
4949
notStartedJobType jobType = "not started"
5050
)
5151

52-
// HungJobLogMessages are written to provisioner job logs when a job is hung and
53-
// terminated.
54-
var HungJobLogMessages = []string{
55-
"",
56-
"====================",
57-
"Coder: Build has been detected as hung for 5 minutes and will be terminated.",
58-
"====================",
59-
"",
52+
// jobLogMessages are written to provisioner job logs when a job is reaped
53+
func jobLogMessages(jobType jobType, threshold float64) []string {
54+
return []string{
55+
"",
56+
"====================",
57+
fmt.Sprintf("Coder: Build has been detected as %s for %.0f minutes and will be terminated.", jobType, threshold),
58+
"====================",
59+
"",
60+
}
6061
}
6162

6263
// acquireLockError is returned when the detector fails to acquire a lock and
@@ -209,18 +210,14 @@ func (d *Detector) run(t time.Time) Stats {
209210
jobs = jobs[:MaxJobsPerRun]
210211
}
211212

212-
// Send a message into the build log for each hung or not startedjob saying that it
213-
// has been detected and will be terminated, then mark the job as
214-
// failed.
213+
// Send a message into the build log for each hung or not started job saying that it
214+
// has been detected and will be terminated, then mark the job as failed.
215215
for _, job := range jobs {
216216
log := d.log.With(slog.F("job_id", job.ID))
217217

218-
err := unhangJob(ctx, log, d.db, d.pubsub, job.ID)
218+
err := reapJob(ctx, log, d.db, d.pubsub, job.ID)
219219
if err != nil {
220-
jobType := notStartedJobType
221-
if job.StartedAt.Valid {
222-
jobType = hungJobType
223-
}
220+
jobType, _ := reapParamsFromJob(job)
224221
if !(xerrors.As(err, &acquireLockError{}) || xerrors.As(err, &jobIneligibleError{})) {
225222
log.Error(ctx, fmt.Sprintf("error forcefully terminating %s provisioner job", jobType), slog.Error(err))
226223
}
@@ -233,11 +230,11 @@ func (d *Detector) run(t time.Time) Stats {
233230
return stats
234231
}
235232

236-
func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubsub.Pubsub, jobID uuid.UUID) error {
233+
func reapJob(ctx context.Context, log slog.Logger, db database.Store, pub pubsub.Pubsub, jobID uuid.UUID) error {
237234
var lowestLogID int64
238235

239236
err := db.InTx(func(db database.Store) error {
240-
locked, err := db.TryAcquireLock(ctx, database.GenLockID(fmt.Sprintf("unhanger:%s", jobID)))
237+
locked, err := db.TryAcquireLock(ctx, database.GenLockID(fmt.Sprintf("reaper:%s", jobID)))
241238
if err != nil {
242239
return xerrors.Errorf("acquire lock: %w", err)
243240
}
@@ -252,20 +249,14 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
252249
return xerrors.Errorf("get provisioner job: %w", err)
253250
}
254251

255-
jobType := hungJobType
256-
threshold := HungJobDuration.Minutes()
257-
258-
if !job.StartedAt.Valid {
259-
jobType = notStartedJobType
260-
threshold = NotStartedTimeElapsed.Minutes()
261-
}
252+
jobType, threshold := reapParamsFromJob(job)
262253

263254
if job.CompletedAt.Valid {
264255
return jobIneligibleError{
265256
Err: xerrors.Errorf("job is completed (status %s)", job.JobStatus),
266257
}
267258
}
268-
if job.UpdatedAt.After(time.Now().Add(-HungJobDuration)) {
259+
if job.UpdatedAt.After(time.Now().Add(-time.Duration(threshold) * time.Minute)) {
269260
return jobIneligibleError{
270261
Err: xerrors.New("job has been updated recently"),
271262
}
@@ -303,7 +294,7 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
303294
Output: nil,
304295
}
305296
now := dbtime.Now()
306-
for i, msg := range HungJobLogMessages {
297+
for i, msg := range jobLogMessages(jobType, threshold) {
307298
// Set the created at in a way that ensures each message has
308299
// a unique timestamp so they will be sorted correctly.
309300
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
@@ -405,3 +396,13 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
405396

406397
return nil
407398
}
399+
400+
func reapParamsFromJob(job database.ProvisionerJob) (jobType, float64) {
401+
jobType := hungJobType
402+
threshold := HungJobDuration.Minutes()
403+
if !job.StartedAt.Valid {
404+
jobType = notStartedJobType
405+
threshold = NotStartedTimeElapsed.Minutes()
406+
}
407+
return jobType, threshold
408+
}

0 commit comments

Comments
 (0)