Skip to content

Commit 0b9e78a

Browse files
committed
tx per unhang job
1 parent e284b47 commit 0b9e78a

File tree

4 files changed

+243
-168
lines changed

4 files changed

+243
-168
lines changed

coderd/database/dbfake/dbfake.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2090,7 +2090,7 @@ func (q *fakeQuerier) GetProvisionerLogsAfterID(_ context.Context, arg database.
20902090
if jobLog.JobID != arg.JobID {
20912091
continue
20922092
}
2093-
if arg.CreatedAfter != 0 && jobLog.ID < arg.CreatedAfter {
2093+
if jobLog.ID <= arg.CreatedAfter {
20942094
continue
20952095
}
20962096
logs = append(logs, jobLog)

coderd/database/lock.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package database
22

3+
import "hash/fnv"
4+
35
// Well-known lock IDs for lock functions in the database. These should not
46
// change. If locks are deprecated, they should be kept in this list to avoid
57
// reusing the same ID.
68
const (
79
lockIDUnused = iota
810
LockIDDeploymentSetup
9-
LockIDHangDetector
1011
)
12+
13+
// GenLockID generates a unique and consistent lock ID from a given string.
14+
func GenLockID(name string) int64 {
15+
hash := fnv.New64()
16+
_, _ = hash.Write([]byte(name))
17+
return int64(hash.Sum64())
18+
}

coderd/unhanger/detector.go

Lines changed: 157 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"database/sql"
66
"encoding/json"
7+
"fmt"
8+
"math/rand" //#nosec // this is only used for shuffling an array to pick random jobs to unhang
79
"time"
810

911
"golang.org/x/xerrors"
@@ -56,6 +58,17 @@ func (acquireLockError) Error() string {
5658
return "lock is held by another client"
5759
}
5860

61+
// jobNotRunningError is returned when the detector attempts to terminate a job
62+
// that is not running.
63+
type jobNotRunningError struct {
64+
Status codersdk.ProvisionerJobStatus
65+
}
66+
67+
// Error implements error.
68+
func (e jobNotRunningError) Error() string {
69+
return fmt.Sprintf("job is not running (status: %s)", e.Status)
70+
}
71+
5972
// Detector automatically detects hung provisioner jobs, sends messages into the
6073
// build log and terminates them as failed.
6174
type Detector struct {
@@ -163,182 +176,181 @@ func (d *Detector) run(t time.Time) Stats {
163176
Error: nil,
164177
}
165178

166-
err := d.db.InTx(func(db database.Store) error {
167-
locked, err := db.TryAcquireLock(ctx, database.LockIDHangDetector)
168-
if !locked {
169-
// If we can't acquire the lock, it means another instance of the
170-
// hang detector is already running in another coder replica.
171-
// There's no point in waiting to run it again, so we'll just retry
172-
// on the next tick.
173-
d.log.Info(ctx, "skipping workspace build hang detector run due to lock")
174-
// This error is ignored.
175-
return acquireLockError{}
179+
// Find all provisioner jobs that are currently running but have not
180+
// received an update in the last 5 minutes.
181+
jobs, err := d.db.GetHungProvisionerJobs(ctx, t.Add(-HungJobDuration))
182+
if err != nil {
183+
stats.Error = xerrors.Errorf("get hung provisioner jobs: %w", err)
184+
return stats
185+
}
186+
187+
// Limit the number of jobs we'll unhang in a single run to avoid
188+
// timing out.
189+
if len(jobs) > MaxJobsPerRun {
190+
// Pick a random subset of the jobs to unhang.
191+
rand.Shuffle(len(jobs), func(i, j int) {
192+
jobs[i], jobs[j] = jobs[j], jobs[i]
193+
})
194+
jobs = jobs[:MaxJobsPerRun]
195+
}
196+
197+
// Send a message into the build log for each hung job saying that it
198+
// has been detected and will be terminated, then mark the job as
199+
// failed.
200+
for _, job := range jobs {
201+
log := d.log.With(slog.F("job_id", job.ID))
202+
203+
err := unhangJob(ctx, log, d.db, d.pubsub, job.ID)
204+
if err != nil && !(xerrors.As(err, &acquireLockError{}) || xerrors.As(err, &jobNotRunningError{})) {
205+
log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err))
206+
continue
176207
}
208+
209+
stats.TerminatedJobIDs = append(stats.TerminatedJobIDs, job.ID)
210+
}
211+
212+
return stats
213+
}
214+
215+
func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubsub.Pubsub, jobID uuid.UUID) error {
216+
var lowestLogID int64
217+
218+
err := db.InTx(func(db database.Store) error {
219+
locked, err := db.TryAcquireLock(ctx, database.GenLockID(fmt.Sprintf("hang-detector:%s", jobID)))
177220
if err != nil {
178-
d.log.Warn(ctx, "skipping workspace build hang detector run due to error acquiring lock", slog.Error(err))
179221
return xerrors.Errorf("acquire lock: %w", err)
180222
}
181-
d.log.Info(ctx, "running workspace build hang detector")
223+
if !locked {
224+
// This error is ignored.
225+
return acquireLockError{}
226+
}
182227

183-
// Find all provisioner jobs that are currently running but have not
184-
// received an update in the last 5 minutes.
185-
jobs, err := db.GetHungProvisionerJobs(ctx, t.Add(-HungJobDuration))
228+
// Refetch the job while we hold the lock.
229+
job, err := db.GetProvisionerJobByID(ctx, jobID)
186230
if err != nil {
187-
return xerrors.Errorf("get hung provisioner jobs: %w", err)
231+
return xerrors.Errorf("get provisioner job: %w", err)
188232
}
189-
190-
// Limit the number of jobs we'll unhang in a single run to avoid
191-
// timing out.
192-
if len(jobs) > MaxJobsPerRun {
193-
jobs = jobs[:MaxJobsPerRun]
233+
jobStatus := db2sdk.ProvisionerJobStatus(job)
234+
if jobStatus != codersdk.ProvisionerJobRunning {
235+
return jobNotRunningError{
236+
Status: jobStatus,
237+
}
194238
}
195239

196-
// Send a message into the build log for each hung job saying that it
197-
// has been detected and will be terminated, then mark the job as
198-
// failed.
199-
for _, job := range jobs {
200-
log := d.log.With(slog.F("job_id", job.ID))
240+
log.Info(ctx, "detected hung (>5m) provisioner job, forcefully terminating")
201241

202-
jobStatus := db2sdk.ProvisionerJobStatus(job)
203-
if jobStatus != codersdk.ProvisionerJobRunning {
204-
log.Error(ctx, "hang detector query discovered non-running job, this is a bug", slog.F("status", jobStatus))
205-
continue
206-
}
242+
// First, get the latest logs from the build so we can make sure
243+
// our messages are in the latest stage.
244+
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
245+
JobID: job.ID,
246+
CreatedAfter: 0,
247+
})
248+
if err != nil {
249+
return xerrors.Errorf("get logs for hung job: %w", err)
250+
}
251+
logStage := ""
252+
if len(logs) != 0 {
253+
logStage = logs[len(logs)-1].Stage
254+
}
255+
if logStage == "" {
256+
logStage = "Unknown"
257+
}
207258

208-
log.Info(ctx, "detected hung (>5m) provisioner job, forcefully terminating")
259+
// Insert the messages into the build log.
260+
insertParams := database.InsertProvisionerJobLogsParams{
261+
JobID: job.ID,
262+
}
263+
now := database.Now()
264+
for i, msg := range HungJobLogMessages {
265+
// Set the created at in a way that ensures each message has
266+
// a unique timestamp so they will be sorted correctly.
267+
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
268+
insertParams.Level = append(insertParams.Level, database.LogLevelError)
269+
insertParams.Stage = append(insertParams.Stage, logStage)
270+
insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon)
271+
insertParams.Output = append(insertParams.Output, msg)
272+
}
273+
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
274+
if err != nil {
275+
return xerrors.Errorf("insert logs for hung job: %w", err)
276+
}
277+
lowestLogID = newLogs[0].ID
278+
279+
// Mark the job as failed.
280+
now = database.Now()
281+
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
282+
ID: job.ID,
283+
UpdatedAt: now,
284+
CompletedAt: sql.NullTime{
285+
Time: now,
286+
Valid: true,
287+
},
288+
Error: sql.NullString{
289+
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.",
290+
Valid: true,
291+
},
292+
ErrorCode: sql.NullString{
293+
Valid: false,
294+
},
295+
})
296+
if err != nil {
297+
return xerrors.Errorf("mark job as failed: %w", err)
298+
}
209299

210-
err := unhangJob(ctx, db, d.pubsub, job)
300+
// If the provisioner job is a workspace build, copy the
301+
// provisioner state from the previous build to this workspace
302+
// build.
303+
if job.Type == database.ProvisionerJobTypeWorkspaceBuild {
304+
build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID)
211305
if err != nil {
212-
log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err))
213-
continue
306+
return xerrors.Errorf("get workspace build for workspace build job by job id: %w", err)
214307
}
215308

216-
stats.TerminatedJobIDs = append(stats.TerminatedJobIDs, job.ID)
309+
// Only copy the provisioner state if there's no state in
310+
// the current build.
311+
if len(build.ProvisionerState) == 0 {
312+
// Get the previous build if it exists.
313+
prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
314+
WorkspaceID: build.WorkspaceID,
315+
BuildNumber: build.BuildNumber - 1,
316+
})
317+
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
318+
return xerrors.Errorf("get previous workspace build: %w", err)
319+
}
320+
if err == nil {
321+
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
322+
ID: build.ID,
323+
UpdatedAt: database.Now(),
324+
ProvisionerState: prevBuild.ProvisionerState,
325+
Deadline: time.Time{},
326+
MaxDeadline: time.Time{},
327+
})
328+
if err != nil {
329+
return xerrors.Errorf("update workspace build by id: %w", err)
330+
}
331+
}
332+
}
217333
}
218334

219335
return nil
220336
}, nil)
221337
if err != nil {
222-
stats.Error = err
223-
return stats
338+
return xerrors.Errorf("in tx: %w", err)
224339
}
225340

226-
return stats
227-
}
228-
229-
func unhangJob(ctx context.Context, db database.Store, pub pubsub.Pubsub, job database.ProvisionerJob) error {
230-
jobStatus := db2sdk.ProvisionerJobStatus(job)
231-
if jobStatus != codersdk.ProvisionerJobRunning {
232-
return xerrors.Errorf("hang detector query discovered non-running job, this is a bug: %s", jobStatus)
233-
}
234-
235-
// First, get the latest logs from the build so we can make sure
236-
// our messages are in the latest stage.
237-
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
238-
JobID: job.ID,
239-
CreatedAfter: 0,
240-
})
241-
if err != nil {
242-
return xerrors.Errorf("get logs for hung job: %w", err)
243-
}
244-
logStage := ""
245-
if len(logs) != 0 {
246-
logStage = logs[len(logs)-1].Stage
247-
}
248-
if logStage == "" {
249-
logStage = "Unknown"
250-
}
251-
252-
// Insert the messages into the build log.
253-
insertParams := database.InsertProvisionerJobLogsParams{
254-
JobID: job.ID,
255-
}
256-
now := database.Now()
257-
for i, msg := range HungJobLogMessages {
258-
// Set the created at in a way that ensures each message has
259-
// a unique timestamp so they will be sorted correctly.
260-
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
261-
insertParams.Level = append(insertParams.Level, database.LogLevelError)
262-
insertParams.Stage = append(insertParams.Stage, logStage)
263-
insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon)
264-
insertParams.Output = append(insertParams.Output, msg)
265-
}
266-
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
267-
if err != nil {
268-
return xerrors.Errorf("insert logs for hung job: %w", err)
269-
}
270-
271-
// Publish the new log notification to pubsub. Use the lowest
272-
// log ID inserted so the log stream will fetch everything after
273-
// that point.
274-
lowestID := newLogs[0].ID
341+
// Publish the new log notification to pubsub. Use the lowest log ID
342+
// inserted so the log stream will fetch everything after that point.
275343
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
276-
CreatedAfter: lowestID - 1,
344+
CreatedAfter: lowestLogID - 1,
277345
EndOfLogs: true,
278346
})
279347
if err != nil {
280348
return xerrors.Errorf("marshal log notification: %w", err)
281349
}
282-
err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), data)
350+
err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(jobID), data)
283351
if err != nil {
284352
return xerrors.Errorf("publish log notification: %w", err)
285353
}
286354

287-
// Mark the job as failed.
288-
now = database.Now()
289-
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
290-
ID: job.ID,
291-
UpdatedAt: now,
292-
CompletedAt: sql.NullTime{
293-
Time: now,
294-
Valid: true,
295-
},
296-
Error: sql.NullString{
297-
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.",
298-
Valid: true,
299-
},
300-
ErrorCode: sql.NullString{
301-
Valid: false,
302-
},
303-
})
304-
if err != nil {
305-
return xerrors.Errorf("mark job as failed: %w", err)
306-
}
307-
308-
// If the provisioner job is a workspace build, copy the
309-
// provisioner state from the previous build to this workspace
310-
// build.
311-
if job.Type == database.ProvisionerJobTypeWorkspaceBuild {
312-
build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID)
313-
if err != nil {
314-
return xerrors.Errorf("get workspace build for workspace build job by job id: %w", err)
315-
}
316-
317-
// Only copy the provisioner state if there's no state in
318-
// the current build.
319-
if len(build.ProvisionerState) == 0 {
320-
// Get the previous build if it exists.
321-
prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
322-
WorkspaceID: build.WorkspaceID,
323-
BuildNumber: build.BuildNumber - 1,
324-
})
325-
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
326-
return xerrors.Errorf("get previous workspace build: %w", err)
327-
}
328-
if err == nil {
329-
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
330-
ID: build.ID,
331-
UpdatedAt: database.Now(),
332-
ProvisionerState: prevBuild.ProvisionerState,
333-
Deadline: time.Time{},
334-
MaxDeadline: time.Time{},
335-
})
336-
if err != nil {
337-
return xerrors.Errorf("update workspace build by id: %w", err)
338-
}
339-
}
340-
}
341-
}
342-
343355
return nil
344356
}

0 commit comments

Comments
 (0)