Skip to content

Commit e284b47

Browse files
committed
small refactor
1 parent 6f1e127 commit e284b47

File tree

3 files changed

+213
-144
lines changed

3 files changed

+213
-144
lines changed

coderd/unhanger/detector.go

Lines changed: 143 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ const (
3131
// Provisioners should avoid keeping a job "running" for longer than this
3232
// time after failing to send an update to the job.
3333
HungJobExitTimeout = 3 * time.Minute
34+
35+
// MaxJobsPerRun is the maximum number of hung jobs that the detector will
36+
// terminate in a single run.
37+
MaxJobsPerRun = 10
3438
)
3539

3640
// HungJobLogMessages are written to provisioner job logs when a job is hung and
@@ -45,18 +49,11 @@ var HungJobLogMessages = []string{
4549

4650
// acquireLockError is returned when the detector fails to acquire a lock and
4751
// cancels the current run.
48-
type acquireLockError struct {
49-
err error
50-
}
52+
type acquireLockError struct{}
5153

5254
// Error implements error.
53-
func (e *acquireLockError) Error() string {
54-
return "acquire lock: " + e.err.Error()
55-
}
56-
57-
// Unwrap implements xerrors.Wrapper.
58-
func (e *acquireLockError) Unwrap() error {
59-
return e.err
55+
func (acquireLockError) Error() string {
56+
return "lock is held by another client"
6057
}
6158

6259
// Detector automatically detects hung provisioner jobs, sends messages into the
@@ -75,9 +72,9 @@ type Detector struct {
7572

7673
// Stats contains statistics about the last run of the detector.
7774
type Stats struct {
78-
// HungJobIDs contains the IDs of all jobs that were detected as hung and
75+
// TerminatedJobIDs contains the IDs of all jobs that were detected as hung and
7976
// terminated.
80-
HungJobIDs []uuid.UUID
77+
TerminatedJobIDs []uuid.UUID
8178
// Error is the fatal error that occurred during the last run of the
8279
// detector, if any. Error may be set to AcquireLockError if the detector
8380
// failed to acquire a lock.
@@ -88,7 +85,7 @@ type Stats struct {
8885
func New(ctx context.Context, db database.Store, pub pubsub.Pubsub, log slog.Logger, tick <-chan time.Time) *Detector {
8986
//nolint:gocritic // Hang detector has a limited set of permissions.
9087
ctx, cancel := context.WithCancel(dbauthz.AsHangDetector(ctx))
91-
le := &Detector{
88+
d := &Detector{
9289
ctx: ctx,
9390
cancel: cancel,
9491
done: make(chan struct{}),
@@ -98,7 +95,7 @@ func New(ctx context.Context, db database.Store, pub pubsub.Pubsub, log slog.Log
9895
tick: tick,
9996
stats: nil,
10097
}
101-
return le
98+
return d
10299
}
103100

104101
// WithStatsChannel will cause Executor to push a RunStats to ch after
@@ -131,8 +128,8 @@ func (d *Detector) Start() {
131128
if stats.Error != nil && !xerrors.As(stats.Error, &acquireLockError{}) {
132129
d.log.Warn(d.ctx, "error running workspace build hang detector once", slog.Error(stats.Error))
133130
}
134-
if len(stats.HungJobIDs) != 0 {
135-
d.log.Warn(d.ctx, "detected (and terminated) hung provisioner jobs", slog.F("job_ids", stats.HungJobIDs))
131+
if len(stats.TerminatedJobIDs) != 0 {
132+
d.log.Warn(d.ctx, "detected (and terminated) hung provisioner jobs", slog.F("job_ids", stats.TerminatedJobIDs))
136133
}
137134
if d.stats != nil {
138135
select {
@@ -162,8 +159,8 @@ func (d *Detector) run(t time.Time) Stats {
162159
defer cancel()
163160

164161
stats := Stats{
165-
HungJobIDs: []uuid.UUID{},
166-
Error: nil,
162+
TerminatedJobIDs: []uuid.UUID{},
163+
Error: nil,
167164
}
168165

169166
err := d.db.InTx(func(db database.Store) error {
@@ -173,9 +170,9 @@ func (d *Detector) run(t time.Time) Stats {
173170
// hang detector is already running in another coder replica.
174171
// There's no point in waiting to run it again, so we'll just retry
175172
// on the next tick.
176-
d.log.Info(ctx, "skipping workspace build hang detector run due to lock", slog.Error(err))
173+
d.log.Info(ctx, "skipping workspace build hang detector run due to lock")
177174
// This error is ignored.
178-
return &acquireLockError{err: err}
175+
return acquireLockError{}
179176
}
180177
if err != nil {
181178
d.log.Warn(ctx, "skipping workspace build hang detector run due to error acquiring lock", slog.Error(err))
@@ -190,6 +187,12 @@ func (d *Detector) run(t time.Time) Stats {
190187
return xerrors.Errorf("get hung provisioner jobs: %w", err)
191188
}
192189

190+
// Limit the number of jobs we'll unhang in a single run to avoid
191+
// timing out.
192+
if len(jobs) > MaxJobsPerRun {
193+
jobs = jobs[:MaxJobsPerRun]
194+
}
195+
193196
// Send a message into the build log for each hung job saying that it
194197
// has been detected and will be terminated, then mark the job as
195198
// failed.
@@ -204,121 +207,13 @@ func (d *Detector) run(t time.Time) Stats {
204207

205208
log.Info(ctx, "detected hung (>5m) provisioner job, forcefully terminating")
206209

207-
// First, get the latest logs from the build so we can make sure
208-
// our messages are in the latest stage.
209-
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
210-
JobID: job.ID,
211-
CreatedAfter: 0,
212-
})
213-
if err != nil {
214-
log.Warn(ctx, "get logs for hung job", slog.Error(err))
215-
continue
216-
}
217-
logStage := ""
218-
if len(logs) != 0 {
219-
logStage = logs[len(logs)-1].Stage
220-
}
221-
if logStage == "" {
222-
logStage = "Unknown"
223-
}
224-
225-
// Insert the messages into the build log.
226-
insertParams := database.InsertProvisionerJobLogsParams{
227-
JobID: job.ID,
228-
}
229-
now := database.Now()
230-
for i, msg := range HungJobLogMessages {
231-
// Set the created at in a way that ensures each message has
232-
// a unique timestamp so they will be sorted correctly.
233-
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
234-
insertParams.Level = append(insertParams.Level, database.LogLevelError)
235-
insertParams.Stage = append(insertParams.Stage, logStage)
236-
insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon)
237-
insertParams.Output = append(insertParams.Output, msg)
238-
}
239-
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
210+
err := unhangJob(ctx, db, d.pubsub, job)
240211
if err != nil {
241-
log.Warn(ctx, "insert logs for hung job", slog.Error(err))
212+
log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err))
242213
continue
243214
}
244215

245-
// Publish the new log notification to pubsub. Use the lowest
246-
// log ID inserted so the log stream will fetch everything after
247-
// that point.
248-
lowestID := newLogs[0].ID
249-
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
250-
CreatedAfter: lowestID - 1,
251-
})
252-
if err != nil {
253-
log.Warn(ctx, "marshal log notification", slog.Error(err))
254-
continue
255-
}
256-
err = d.pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), data)
257-
if err != nil {
258-
log.Warn(ctx, "publish log notification", slog.Error(err))
259-
continue
260-
}
261-
262-
// Mark the job as failed.
263-
now = database.Now()
264-
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
265-
ID: job.ID,
266-
UpdatedAt: now,
267-
CompletedAt: sql.NullTime{
268-
Time: now,
269-
Valid: true,
270-
},
271-
Error: sql.NullString{
272-
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated.",
273-
Valid: true,
274-
},
275-
ErrorCode: sql.NullString{
276-
Valid: false,
277-
},
278-
})
279-
if err != nil {
280-
log.Warn(ctx, "mark job as failed", slog.Error(err))
281-
continue
282-
}
283-
284-
// If the provisioner job is a workspace build, copy the
285-
// provisioner state from the previous build to this workspace
286-
// build.
287-
if job.Type == database.ProvisionerJobTypeWorkspaceBuild {
288-
build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID)
289-
if err != nil {
290-
log.Warn(ctx, "get workspace build for workspace build job by job id", slog.Error(err))
291-
continue
292-
}
293-
294-
// Only copy the provisioner state if there's no state in
295-
// the current build.
296-
if len(build.ProvisionerState) == 0 {
297-
// Get the previous build if it exists.
298-
prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
299-
WorkspaceID: build.WorkspaceID,
300-
BuildNumber: build.BuildNumber - 1,
301-
})
302-
if err == nil {
303-
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
304-
ID: build.ID,
305-
UpdatedAt: database.Now(),
306-
ProvisionerState: prevBuild.ProvisionerState,
307-
Deadline: time.Time{},
308-
MaxDeadline: time.Time{},
309-
})
310-
if err != nil {
311-
log.Warn(ctx, "update hung workspace build provisioner state to match previous build", slog.Error(err))
312-
continue
313-
}
314-
} else if !xerrors.Is(err, sql.ErrNoRows) {
315-
log.Warn(ctx, "get previous workspace build", slog.Error(err))
316-
continue
317-
}
318-
}
319-
}
320-
321-
stats.HungJobIDs = append(stats.HungJobIDs, job.ID)
216+
stats.TerminatedJobIDs = append(stats.TerminatedJobIDs, job.ID)
322217
}
323218

324219
return nil
@@ -330,3 +225,120 @@ func (d *Detector) run(t time.Time) Stats {
330225

331226
return stats
332227
}
228+
229+
func unhangJob(ctx context.Context, db database.Store, pub pubsub.Pubsub, job database.ProvisionerJob) error {
230+
jobStatus := db2sdk.ProvisionerJobStatus(job)
231+
if jobStatus != codersdk.ProvisionerJobRunning {
232+
return xerrors.Errorf("hang detector query discovered non-running job, this is a bug: %s", jobStatus)
233+
}
234+
235+
// First, get the latest logs from the build so we can make sure
236+
// our messages are in the latest stage.
237+
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
238+
JobID: job.ID,
239+
CreatedAfter: 0,
240+
})
241+
if err != nil {
242+
return xerrors.Errorf("get logs for hung job: %w", err)
243+
}
244+
logStage := ""
245+
if len(logs) != 0 {
246+
logStage = logs[len(logs)-1].Stage
247+
}
248+
if logStage == "" {
249+
logStage = "Unknown"
250+
}
251+
252+
// Insert the messages into the build log.
253+
insertParams := database.InsertProvisionerJobLogsParams{
254+
JobID: job.ID,
255+
}
256+
now := database.Now()
257+
for i, msg := range HungJobLogMessages {
258+
// Set the created at in a way that ensures each message has
259+
// a unique timestamp so they will be sorted correctly.
260+
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
261+
insertParams.Level = append(insertParams.Level, database.LogLevelError)
262+
insertParams.Stage = append(insertParams.Stage, logStage)
263+
insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon)
264+
insertParams.Output = append(insertParams.Output, msg)
265+
}
266+
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
267+
if err != nil {
268+
return xerrors.Errorf("insert logs for hung job: %w", err)
269+
}
270+
271+
// Publish the new log notification to pubsub. Use the lowest
272+
// log ID inserted so the log stream will fetch everything after
273+
// that point.
274+
lowestID := newLogs[0].ID
275+
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
276+
CreatedAfter: lowestID - 1,
277+
EndOfLogs: true,
278+
})
279+
if err != nil {
280+
return xerrors.Errorf("marshal log notification: %w", err)
281+
}
282+
err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), data)
283+
if err != nil {
284+
return xerrors.Errorf("publish log notification: %w", err)
285+
}
286+
287+
// Mark the job as failed.
288+
now = database.Now()
289+
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
290+
ID: job.ID,
291+
UpdatedAt: now,
292+
CompletedAt: sql.NullTime{
293+
Time: now,
294+
Valid: true,
295+
},
296+
Error: sql.NullString{
297+
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.",
298+
Valid: true,
299+
},
300+
ErrorCode: sql.NullString{
301+
Valid: false,
302+
},
303+
})
304+
if err != nil {
305+
return xerrors.Errorf("mark job as failed: %w", err)
306+
}
307+
308+
// If the provisioner job is a workspace build, copy the
309+
// provisioner state from the previous build to this workspace
310+
// build.
311+
if job.Type == database.ProvisionerJobTypeWorkspaceBuild {
312+
build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID)
313+
if err != nil {
314+
return xerrors.Errorf("get workspace build for workspace build job by job id: %w", err)
315+
}
316+
317+
// Only copy the provisioner state if there's no state in
318+
// the current build.
319+
if len(build.ProvisionerState) == 0 {
320+
// Get the previous build if it exists.
321+
prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
322+
WorkspaceID: build.WorkspaceID,
323+
BuildNumber: build.BuildNumber - 1,
324+
})
325+
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
326+
return xerrors.Errorf("get previous workspace build: %w", err)
327+
}
328+
if err == nil {
329+
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
330+
ID: build.ID,
331+
UpdatedAt: database.Now(),
332+
ProvisionerState: prevBuild.ProvisionerState,
333+
Deadline: time.Time{},
334+
MaxDeadline: time.Time{},
335+
})
336+
if err != nil {
337+
return xerrors.Errorf("update workspace build by id: %w", err)
338+
}
339+
}
340+
}
341+
}
342+
343+
return nil
344+
}

0 commit comments

Comments
 (0)