Skip to content

feat: add provisioner job hang detector #7927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
small refactor
  • Loading branch information
deansheather committed Jun 21, 2023
commit e284b472d67f9cf42ad688f3643adc7c12d7785b
274 changes: 143 additions & 131 deletions coderd/unhanger/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const (
// Provisioners should avoid keeping a job "running" for longer than this
// time after failing to send an update to the job.
HungJobExitTimeout = 3 * time.Minute

// MaxJobsPerRun is the maximum number of hung jobs that the detector will
// terminate in a single run.
MaxJobsPerRun = 10
)

// HungJobLogMessages are written to provisioner job logs when a job is hung and
Expand All @@ -45,18 +49,11 @@ var HungJobLogMessages = []string{

// acquireLockError is returned when the detector fails to acquire a lock and
// cancels the current run.
type acquireLockError struct {
err error
}
type acquireLockError struct{}

// Error implements error.
func (e *acquireLockError) Error() string {
return "acquire lock: " + e.err.Error()
}

// Unwrap implements xerrors.Wrapper.
func (e *acquireLockError) Unwrap() error {
return e.err
func (acquireLockError) Error() string {
return "lock is held by another client"
}

// Detector automatically detects hung provisioner jobs, sends messages into the
Expand All @@ -75,9 +72,9 @@ type Detector struct {

// Stats contains statistics about the last run of the detector.
type Stats struct {
// HungJobIDs contains the IDs of all jobs that were detected as hung and
// TerminatedJobIDs contains the IDs of all jobs that were detected as hung and
// terminated.
HungJobIDs []uuid.UUID
TerminatedJobIDs []uuid.UUID
// Error is the fatal error that occurred during the last run of the
// detector, if any. Error may be set to AcquireLockError if the detector
// failed to acquire a lock.
Expand All @@ -88,7 +85,7 @@ type Stats struct {
func New(ctx context.Context, db database.Store, pub pubsub.Pubsub, log slog.Logger, tick <-chan time.Time) *Detector {
//nolint:gocritic // Hang detector has a limited set of permissions.
ctx, cancel := context.WithCancel(dbauthz.AsHangDetector(ctx))
le := &Detector{
d := &Detector{
ctx: ctx,
cancel: cancel,
done: make(chan struct{}),
Expand All @@ -98,7 +95,7 @@ func New(ctx context.Context, db database.Store, pub pubsub.Pubsub, log slog.Log
tick: tick,
stats: nil,
}
return le
return d
}

// WithStatsChannel will cause Executor to push a RunStats to ch after
Expand Down Expand Up @@ -131,8 +128,8 @@ func (d *Detector) Start() {
if stats.Error != nil && !xerrors.As(stats.Error, &acquireLockError{}) {
d.log.Warn(d.ctx, "error running workspace build hang detector once", slog.Error(stats.Error))
}
if len(stats.HungJobIDs) != 0 {
d.log.Warn(d.ctx, "detected (and terminated) hung provisioner jobs", slog.F("job_ids", stats.HungJobIDs))
if len(stats.TerminatedJobIDs) != 0 {
d.log.Warn(d.ctx, "detected (and terminated) hung provisioner jobs", slog.F("job_ids", stats.TerminatedJobIDs))
}
if d.stats != nil {
select {
Expand Down Expand Up @@ -162,8 +159,8 @@ func (d *Detector) run(t time.Time) Stats {
defer cancel()

stats := Stats{
HungJobIDs: []uuid.UUID{},
Error: nil,
TerminatedJobIDs: []uuid.UUID{},
Error: nil,
}

err := d.db.InTx(func(db database.Store) error {
Expand All @@ -173,9 +170,9 @@ func (d *Detector) run(t time.Time) Stats {
// hang detector is already running in another coder replica.
// There's no point in waiting to run it again, so we'll just retry
// on the next tick.
d.log.Info(ctx, "skipping workspace build hang detector run due to lock", slog.Error(err))
d.log.Info(ctx, "skipping workspace build hang detector run due to lock")
// This error is ignored.
return &acquireLockError{err: err}
return acquireLockError{}
}
if err != nil {
d.log.Warn(ctx, "skipping workspace build hang detector run due to error acquiring lock", slog.Error(err))
Expand All @@ -190,6 +187,12 @@ func (d *Detector) run(t time.Time) Stats {
return xerrors.Errorf("get hung provisioner jobs: %w", err)
}

// Limit the number of jobs we'll unhang in a single run to avoid
// timing out.
if len(jobs) > MaxJobsPerRun {
jobs = jobs[:MaxJobsPerRun]
}

// Send a message into the build log for each hung job saying that it
// has been detected and will be terminated, then mark the job as
// failed.
Expand All @@ -204,121 +207,13 @@ func (d *Detector) run(t time.Time) Stats {

log.Info(ctx, "detected hung (>5m) provisioner job, forcefully terminating")

// First, get the latest logs from the build so we can make sure
// our messages are in the latest stage.
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
JobID: job.ID,
CreatedAfter: 0,
})
if err != nil {
log.Warn(ctx, "get logs for hung job", slog.Error(err))
continue
}
logStage := ""
if len(logs) != 0 {
logStage = logs[len(logs)-1].Stage
}
if logStage == "" {
logStage = "Unknown"
}

// Insert the messages into the build log.
insertParams := database.InsertProvisionerJobLogsParams{
JobID: job.ID,
}
now := database.Now()
for i, msg := range HungJobLogMessages {
// Set the created at in a way that ensures each message has
// a unique timestamp so they will be sorted correctly.
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
insertParams.Level = append(insertParams.Level, database.LogLevelError)
insertParams.Stage = append(insertParams.Stage, logStage)
insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon)
insertParams.Output = append(insertParams.Output, msg)
}
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
err := unhangJob(ctx, db, d.pubsub, job)
if err != nil {
log.Warn(ctx, "insert logs for hung job", slog.Error(err))
log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err))
continue
}

// Publish the new log notification to pubsub. Use the lowest
// log ID inserted so the log stream will fetch everything after
// that point.
lowestID := newLogs[0].ID
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
CreatedAfter: lowestID - 1,
})
if err != nil {
log.Warn(ctx, "marshal log notification", slog.Error(err))
continue
}
err = d.pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), data)
if err != nil {
log.Warn(ctx, "publish log notification", slog.Error(err))
continue
}

// Mark the job as failed.
now = database.Now()
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
UpdatedAt: now,
CompletedAt: sql.NullTime{
Time: now,
Valid: true,
},
Error: sql.NullString{
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated.",
Valid: true,
},
ErrorCode: sql.NullString{
Valid: false,
},
})
if err != nil {
log.Warn(ctx, "mark job as failed", slog.Error(err))
continue
}

// If the provisioner job is a workspace build, copy the
// provisioner state from the previous build to this workspace
// build.
if job.Type == database.ProvisionerJobTypeWorkspaceBuild {
build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID)
if err != nil {
log.Warn(ctx, "get workspace build for workspace build job by job id", slog.Error(err))
continue
}

// Only copy the provisioner state if there's no state in
// the current build.
if len(build.ProvisionerState) == 0 {
// Get the previous build if it exists.
prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
WorkspaceID: build.WorkspaceID,
BuildNumber: build.BuildNumber - 1,
})
if err == nil {
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: database.Now(),
ProvisionerState: prevBuild.ProvisionerState,
Deadline: time.Time{},
MaxDeadline: time.Time{},
})
if err != nil {
log.Warn(ctx, "update hung workspace build provisioner state to match previous build", slog.Error(err))
continue
}
} else if !xerrors.Is(err, sql.ErrNoRows) {
log.Warn(ctx, "get previous workspace build", slog.Error(err))
continue
}
}
}

stats.HungJobIDs = append(stats.HungJobIDs, job.ID)
stats.TerminatedJobIDs = append(stats.TerminatedJobIDs, job.ID)
}

return nil
Expand All @@ -330,3 +225,120 @@ func (d *Detector) run(t time.Time) Stats {

return stats
}

func unhangJob(ctx context.Context, db database.Store, pub pubsub.Pubsub, job database.ProvisionerJob) error {
jobStatus := db2sdk.ProvisionerJobStatus(job)
if jobStatus != codersdk.ProvisionerJobRunning {
return xerrors.Errorf("hang detector query discovered non-running job, this is a bug: %s", jobStatus)
}

// First, get the latest logs from the build so we can make sure
// our messages are in the latest stage.
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
JobID: job.ID,
CreatedAfter: 0,
})
if err != nil {
return xerrors.Errorf("get logs for hung job: %w", err)
}
logStage := ""
if len(logs) != 0 {
logStage = logs[len(logs)-1].Stage
}
if logStage == "" {
logStage = "Unknown"
}

// Insert the messages into the build log.
insertParams := database.InsertProvisionerJobLogsParams{
JobID: job.ID,
}
now := database.Now()
for i, msg := range HungJobLogMessages {
// Set the created at in a way that ensures each message has
// a unique timestamp so they will be sorted correctly.
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
insertParams.Level = append(insertParams.Level, database.LogLevelError)
insertParams.Stage = append(insertParams.Stage, logStage)
insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon)
insertParams.Output = append(insertParams.Output, msg)
}
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
if err != nil {
return xerrors.Errorf("insert logs for hung job: %w", err)
}

// Publish the new log notification to pubsub. Use the lowest
// log ID inserted so the log stream will fetch everything after
// that point.
lowestID := newLogs[0].ID
data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
CreatedAfter: lowestID - 1,
EndOfLogs: true,
})
if err != nil {
return xerrors.Errorf("marshal log notification: %w", err)
}
err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), data)
if err != nil {
return xerrors.Errorf("publish log notification: %w", err)
}

// Mark the job as failed.
now = database.Now()
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
UpdatedAt: now,
CompletedAt: sql.NullTime{
Time: now,
Valid: true,
},
Error: sql.NullString{
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.",
Valid: true,
},
ErrorCode: sql.NullString{
Valid: false,
},
})
if err != nil {
return xerrors.Errorf("mark job as failed: %w", err)
}

// If the provisioner job is a workspace build, copy the
// provisioner state from the previous build to this workspace
// build.
if job.Type == database.ProvisionerJobTypeWorkspaceBuild {
build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID)
if err != nil {
return xerrors.Errorf("get workspace build for workspace build job by job id: %w", err)
}

// Only copy the provisioner state if there's no state in
// the current build.
if len(build.ProvisionerState) == 0 {
// Get the previous build if it exists.
prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{
WorkspaceID: build.WorkspaceID,
BuildNumber: build.BuildNumber - 1,
})
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get previous workspace build: %w", err)
}
if err == nil {
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
ID: build.ID,
UpdatedAt: database.Now(),
ProvisionerState: prevBuild.ProvisionerState,
Deadline: time.Time{},
MaxDeadline: time.Time{},
})
if err != nil {
return xerrors.Errorf("update workspace build by id: %w", err)
}
}
}
}

return nil
}
Loading