|
4 | 4 | "context"
|
5 | 5 | "database/sql"
|
6 | 6 | "encoding/json"
|
| 7 | + "fmt" |
| 8 | + "math/rand" //#nosec // this is only used for shuffling an array to pick random jobs to unhang |
7 | 9 | "time"
|
8 | 10 |
|
9 | 11 | "golang.org/x/xerrors"
|
@@ -56,6 +58,17 @@ func (acquireLockError) Error() string {
|
56 | 58 | return "lock is held by another client"
|
57 | 59 | }
|
58 | 60 |
|
| 61 | +// jobNotRunningError is returned when the detector attempts to terminate a job |
| 62 | +// that is not running. |
| 63 | +type jobNotRunningError struct { |
| 64 | + Status codersdk.ProvisionerJobStatus |
| 65 | +} |
| 66 | + |
| 67 | +// Error implements error. |
| 68 | +func (e jobNotRunningError) Error() string { |
| 69 | + return fmt.Sprintf("job is not running (status: %s)", e.Status) |
| 70 | +} |
| 71 | + |
59 | 72 | // Detector automatically detects hung provisioner jobs, sends messages into the
|
60 | 73 | // build log and terminates them as failed.
|
61 | 74 | type Detector struct {
|
@@ -163,182 +176,181 @@ func (d *Detector) run(t time.Time) Stats {
|
163 | 176 | Error: nil,
|
164 | 177 | }
|
165 | 178 |
|
166 |
| - err := d.db.InTx(func(db database.Store) error { |
167 |
| - locked, err := db.TryAcquireLock(ctx, database.LockIDHangDetector) |
168 |
| - if !locked { |
169 |
| - // If we can't acquire the lock, it means another instance of the |
170 |
| - // hang detector is already running in another coder replica. |
171 |
| - // There's no point in waiting to run it again, so we'll just retry |
172 |
| - // on the next tick. |
173 |
| - d.log.Info(ctx, "skipping workspace build hang detector run due to lock") |
174 |
| - // This error is ignored. |
175 |
| - return acquireLockError{} |
| 179 | + // Find all provisioner jobs that are currently running but have not |
| 180 | + // received an update in the last 5 minutes. |
| 181 | + jobs, err := d.db.GetHungProvisionerJobs(ctx, t.Add(-HungJobDuration)) |
| 182 | + if err != nil { |
| 183 | + stats.Error = xerrors.Errorf("get hung provisioner jobs: %w", err) |
| 184 | + return stats |
| 185 | + } |
| 186 | + |
| 187 | + // Limit the number of jobs we'll unhang in a single run to avoid |
| 188 | + // timing out. |
| 189 | + if len(jobs) > MaxJobsPerRun { |
| 190 | + // Pick a random subset of the jobs to unhang. |
| 191 | + rand.Shuffle(len(jobs), func(i, j int) { |
| 192 | + jobs[i], jobs[j] = jobs[j], jobs[i] |
| 193 | + }) |
| 194 | + jobs = jobs[:MaxJobsPerRun] |
| 195 | + } |
| 196 | + |
| 197 | + // Send a message into the build log for each hung job saying that it |
| 198 | + // has been detected and will be terminated, then mark the job as |
| 199 | + // failed. |
| 200 | + for _, job := range jobs { |
| 201 | + log := d.log.With(slog.F("job_id", job.ID)) |
| 202 | + |
| 203 | + err := unhangJob(ctx, log, d.db, d.pubsub, job.ID) |
| 204 | + if err != nil && !(xerrors.As(err, &acquireLockError{}) || xerrors.As(err, &jobNotRunningError{})) { |
| 205 | + log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err)) |
| 206 | + continue |
176 | 207 | }
|
| 208 | + |
| 209 | + stats.TerminatedJobIDs = append(stats.TerminatedJobIDs, job.ID) |
| 210 | + } |
| 211 | + |
| 212 | + return stats |
| 213 | +} |
| 214 | + |
| 215 | +func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubsub.Pubsub, jobID uuid.UUID) error { |
| 216 | + var lowestLogID int64 |
| 217 | + |
| 218 | + err := db.InTx(func(db database.Store) error { |
| 219 | + locked, err := db.TryAcquireLock(ctx, database.GenLockID(fmt.Sprintf("hang-detector:%s", jobID))) |
177 | 220 | if err != nil {
|
178 |
| - d.log.Warn(ctx, "skipping workspace build hang detector run due to error acquiring lock", slog.Error(err)) |
179 | 221 | return xerrors.Errorf("acquire lock: %w", err)
|
180 | 222 | }
|
181 |
| - d.log.Info(ctx, "running workspace build hang detector") |
| 223 | + if !locked { |
| 224 | + // This error is ignored. |
| 225 | + return acquireLockError{} |
| 226 | + } |
182 | 227 |
|
183 |
| - // Find all provisioner jobs that are currently running but have not |
184 |
| - // received an update in the last 5 minutes. |
185 |
| - jobs, err := db.GetHungProvisionerJobs(ctx, t.Add(-HungJobDuration)) |
| 228 | + // Refetch the job while we hold the lock. |
| 229 | + job, err := db.GetProvisionerJobByID(ctx, jobID) |
186 | 230 | if err != nil {
|
187 |
| - return xerrors.Errorf("get hung provisioner jobs: %w", err) |
| 231 | + return xerrors.Errorf("get provisioner job: %w", err) |
188 | 232 | }
|
189 |
| - |
190 |
| - // Limit the number of jobs we'll unhang in a single run to avoid |
191 |
| - // timing out. |
192 |
| - if len(jobs) > MaxJobsPerRun { |
193 |
| - jobs = jobs[:MaxJobsPerRun] |
| 233 | + jobStatus := db2sdk.ProvisionerJobStatus(job) |
| 234 | + if jobStatus != codersdk.ProvisionerJobRunning { |
| 235 | + return jobNotRunningError{ |
| 236 | + Status: jobStatus, |
| 237 | + } |
194 | 238 | }
|
195 | 239 |
|
196 |
| - // Send a message into the build log for each hung job saying that it |
197 |
| - // has been detected and will be terminated, then mark the job as |
198 |
| - // failed. |
199 |
| - for _, job := range jobs { |
200 |
| - log := d.log.With(slog.F("job_id", job.ID)) |
| 240 | + log.Info(ctx, "detected hung (>5m) provisioner job, forcefully terminating") |
201 | 241 |
|
202 |
| - jobStatus := db2sdk.ProvisionerJobStatus(job) |
203 |
| - if jobStatus != codersdk.ProvisionerJobRunning { |
204 |
| - log.Error(ctx, "hang detector query discovered non-running job, this is a bug", slog.F("status", jobStatus)) |
205 |
| - continue |
206 |
| - } |
| 242 | + // First, get the latest logs from the build so we can make sure |
| 243 | + // our messages are in the latest stage. |
| 244 | + logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{ |
| 245 | + JobID: job.ID, |
| 246 | + CreatedAfter: 0, |
| 247 | + }) |
| 248 | + if err != nil { |
| 249 | + return xerrors.Errorf("get logs for hung job: %w", err) |
| 250 | + } |
| 251 | + logStage := "" |
| 252 | + if len(logs) != 0 { |
| 253 | + logStage = logs[len(logs)-1].Stage |
| 254 | + } |
| 255 | + if logStage == "" { |
| 256 | + logStage = "Unknown" |
| 257 | + } |
207 | 258 |
|
208 |
| - log.Info(ctx, "detected hung (>5m) provisioner job, forcefully terminating") |
| 259 | + // Insert the messages into the build log. |
| 260 | + insertParams := database.InsertProvisionerJobLogsParams{ |
| 261 | + JobID: job.ID, |
| 262 | + } |
| 263 | + now := database.Now() |
| 264 | + for i, msg := range HungJobLogMessages { |
| 265 | + // Set the created at in a way that ensures each message has |
| 266 | + // a unique timestamp so they will be sorted correctly. |
| 267 | + insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i))) |
| 268 | + insertParams.Level = append(insertParams.Level, database.LogLevelError) |
| 269 | + insertParams.Stage = append(insertParams.Stage, logStage) |
| 270 | + insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon) |
| 271 | + insertParams.Output = append(insertParams.Output, msg) |
| 272 | + } |
| 273 | + newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams) |
| 274 | + if err != nil { |
| 275 | + return xerrors.Errorf("insert logs for hung job: %w", err) |
| 276 | + } |
| 277 | + lowestLogID = newLogs[0].ID |
| 278 | + |
| 279 | + // Mark the job as failed. |
| 280 | + now = database.Now() |
| 281 | + err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{ |
| 282 | + ID: job.ID, |
| 283 | + UpdatedAt: now, |
| 284 | + CompletedAt: sql.NullTime{ |
| 285 | + Time: now, |
| 286 | + Valid: true, |
| 287 | + }, |
| 288 | + Error: sql.NullString{ |
| 289 | + String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.", |
| 290 | + Valid: true, |
| 291 | + }, |
| 292 | + ErrorCode: sql.NullString{ |
| 293 | + Valid: false, |
| 294 | + }, |
| 295 | + }) |
| 296 | + if err != nil { |
| 297 | + return xerrors.Errorf("mark job as failed: %w", err) |
| 298 | + } |
209 | 299 |
|
210 |
| - err := unhangJob(ctx, db, d.pubsub, job) |
| 300 | + // If the provisioner job is a workspace build, copy the |
| 301 | + // provisioner state from the previous build to this workspace |
| 302 | + // build. |
| 303 | + if job.Type == database.ProvisionerJobTypeWorkspaceBuild { |
| 304 | + build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID) |
211 | 305 | if err != nil {
|
212 |
| - log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err)) |
213 |
| - continue |
| 306 | + return xerrors.Errorf("get workspace build for workspace build job by job id: %w", err) |
214 | 307 | }
|
215 | 308 |
|
216 |
| - stats.TerminatedJobIDs = append(stats.TerminatedJobIDs, job.ID) |
| 309 | + // Only copy the provisioner state if there's no state in |
| 310 | + // the current build. |
| 311 | + if len(build.ProvisionerState) == 0 { |
| 312 | + // Get the previous build if it exists. |
| 313 | + prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{ |
| 314 | + WorkspaceID: build.WorkspaceID, |
| 315 | + BuildNumber: build.BuildNumber - 1, |
| 316 | + }) |
| 317 | + if err != nil && !xerrors.Is(err, sql.ErrNoRows) { |
| 318 | + return xerrors.Errorf("get previous workspace build: %w", err) |
| 319 | + } |
| 320 | + if err == nil { |
| 321 | + _, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ |
| 322 | + ID: build.ID, |
| 323 | + UpdatedAt: database.Now(), |
| 324 | + ProvisionerState: prevBuild.ProvisionerState, |
| 325 | + Deadline: time.Time{}, |
| 326 | + MaxDeadline: time.Time{}, |
| 327 | + }) |
| 328 | + if err != nil { |
| 329 | + return xerrors.Errorf("update workspace build by id: %w", err) |
| 330 | + } |
| 331 | + } |
| 332 | + } |
217 | 333 | }
|
218 | 334 |
|
219 | 335 | return nil
|
220 | 336 | }, nil)
|
221 | 337 | if err != nil {
|
222 |
| - stats.Error = err |
223 |
| - return stats |
| 338 | + return xerrors.Errorf("in tx: %w", err) |
224 | 339 | }
|
225 | 340 |
|
226 |
| - return stats |
227 |
| -} |
228 |
| - |
229 |
| -func unhangJob(ctx context.Context, db database.Store, pub pubsub.Pubsub, job database.ProvisionerJob) error { |
230 |
| - jobStatus := db2sdk.ProvisionerJobStatus(job) |
231 |
| - if jobStatus != codersdk.ProvisionerJobRunning { |
232 |
| - return xerrors.Errorf("hang detector query discovered non-running job, this is a bug: %s", jobStatus) |
233 |
| - } |
234 |
| - |
235 |
| - // First, get the latest logs from the build so we can make sure |
236 |
| - // our messages are in the latest stage. |
237 |
| - logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{ |
238 |
| - JobID: job.ID, |
239 |
| - CreatedAfter: 0, |
240 |
| - }) |
241 |
| - if err != nil { |
242 |
| - return xerrors.Errorf("get logs for hung job: %w", err) |
243 |
| - } |
244 |
| - logStage := "" |
245 |
| - if len(logs) != 0 { |
246 |
| - logStage = logs[len(logs)-1].Stage |
247 |
| - } |
248 |
| - if logStage == "" { |
249 |
| - logStage = "Unknown" |
250 |
| - } |
251 |
| - |
252 |
| - // Insert the messages into the build log. |
253 |
| - insertParams := database.InsertProvisionerJobLogsParams{ |
254 |
| - JobID: job.ID, |
255 |
| - } |
256 |
| - now := database.Now() |
257 |
| - for i, msg := range HungJobLogMessages { |
258 |
| - // Set the created at in a way that ensures each message has |
259 |
| - // a unique timestamp so they will be sorted correctly. |
260 |
| - insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i))) |
261 |
| - insertParams.Level = append(insertParams.Level, database.LogLevelError) |
262 |
| - insertParams.Stage = append(insertParams.Stage, logStage) |
263 |
| - insertParams.Source = append(insertParams.Source, database.LogSourceProvisionerDaemon) |
264 |
| - insertParams.Output = append(insertParams.Output, msg) |
265 |
| - } |
266 |
| - newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams) |
267 |
| - if err != nil { |
268 |
| - return xerrors.Errorf("insert logs for hung job: %w", err) |
269 |
| - } |
270 |
| - |
271 |
| - // Publish the new log notification to pubsub. Use the lowest |
272 |
| - // log ID inserted so the log stream will fetch everything after |
273 |
| - // that point. |
274 |
| - lowestID := newLogs[0].ID |
| 341 | + // Publish the new log notification to pubsub. Use the lowest log ID |
| 342 | + // inserted so the log stream will fetch everything after that point. |
275 | 343 | data, err := json.Marshal(provisionersdk.ProvisionerJobLogsNotifyMessage{
|
276 |
| - CreatedAfter: lowestID - 1, |
| 344 | + CreatedAfter: lowestLogID - 1, |
277 | 345 | EndOfLogs: true,
|
278 | 346 | })
|
279 | 347 | if err != nil {
|
280 | 348 | return xerrors.Errorf("marshal log notification: %w", err)
|
281 | 349 | }
|
282 |
| - err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), data) |
| 350 | + err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(jobID), data) |
283 | 351 | if err != nil {
|
284 | 352 | return xerrors.Errorf("publish log notification: %w", err)
|
285 | 353 | }
|
286 | 354 |
|
287 |
| - // Mark the job as failed. |
288 |
| - now = database.Now() |
289 |
| - err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{ |
290 |
| - ID: job.ID, |
291 |
| - UpdatedAt: now, |
292 |
| - CompletedAt: sql.NullTime{ |
293 |
| - Time: now, |
294 |
| - Valid: true, |
295 |
| - }, |
296 |
| - Error: sql.NullString{ |
297 |
| - String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.", |
298 |
| - Valid: true, |
299 |
| - }, |
300 |
| - ErrorCode: sql.NullString{ |
301 |
| - Valid: false, |
302 |
| - }, |
303 |
| - }) |
304 |
| - if err != nil { |
305 |
| - return xerrors.Errorf("mark job as failed: %w", err) |
306 |
| - } |
307 |
| - |
308 |
| - // If the provisioner job is a workspace build, copy the |
309 |
| - // provisioner state from the previous build to this workspace |
310 |
| - // build. |
311 |
| - if job.Type == database.ProvisionerJobTypeWorkspaceBuild { |
312 |
| - build, err := db.GetWorkspaceBuildByJobID(ctx, job.ID) |
313 |
| - if err != nil { |
314 |
| - return xerrors.Errorf("get workspace build for workspace build job by job id: %w", err) |
315 |
| - } |
316 |
| - |
317 |
| - // Only copy the provisioner state if there's no state in |
318 |
| - // the current build. |
319 |
| - if len(build.ProvisionerState) == 0 { |
320 |
| - // Get the previous build if it exists. |
321 |
| - prevBuild, err := db.GetWorkspaceBuildByWorkspaceIDAndBuildNumber(ctx, database.GetWorkspaceBuildByWorkspaceIDAndBuildNumberParams{ |
322 |
| - WorkspaceID: build.WorkspaceID, |
323 |
| - BuildNumber: build.BuildNumber - 1, |
324 |
| - }) |
325 |
| - if err != nil && !xerrors.Is(err, sql.ErrNoRows) { |
326 |
| - return xerrors.Errorf("get previous workspace build: %w", err) |
327 |
| - } |
328 |
| - if err == nil { |
329 |
| - _, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ |
330 |
| - ID: build.ID, |
331 |
| - UpdatedAt: database.Now(), |
332 |
| - ProvisionerState: prevBuild.ProvisionerState, |
333 |
| - Deadline: time.Time{}, |
334 |
| - MaxDeadline: time.Time{}, |
335 |
| - }) |
336 |
| - if err != nil { |
337 |
| - return xerrors.Errorf("update workspace build by id: %w", err) |
338 |
| - } |
339 |
| - } |
340 |
| - } |
341 |
| - } |
342 |
| - |
343 | 355 | return nil
|
344 | 356 | }
|
0 commit comments