Skip to content

Commit 769c9ee

Browse files
authored
feat: cancel stuck pending jobs (#17803)
Closes: #16488
1 parent 613117b commit 769c9ee

File tree

23 files changed

+773
-291
lines changed

23 files changed

+773
-291
lines changed

cli/server.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ import (
8787
"github.com/coder/coder/v2/coderd/externalauth"
8888
"github.com/coder/coder/v2/coderd/gitsshkey"
8989
"github.com/coder/coder/v2/coderd/httpmw"
90+
"github.com/coder/coder/v2/coderd/jobreaper"
9091
"github.com/coder/coder/v2/coderd/notifications"
9192
"github.com/coder/coder/v2/coderd/oauthpki"
9293
"github.com/coder/coder/v2/coderd/prometheusmetrics"
@@ -95,7 +96,6 @@ import (
9596
"github.com/coder/coder/v2/coderd/schedule"
9697
"github.com/coder/coder/v2/coderd/telemetry"
9798
"github.com/coder/coder/v2/coderd/tracing"
98-
"github.com/coder/coder/v2/coderd/unhanger"
9999
"github.com/coder/coder/v2/coderd/updatecheck"
100100
"github.com/coder/coder/v2/coderd/util/ptr"
101101
"github.com/coder/coder/v2/coderd/util/slice"
@@ -1127,11 +1127,11 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
11271127
ctx, options.Database, options.Pubsub, options.PrometheusRegistry, coderAPI.TemplateScheduleStore, &coderAPI.Auditor, coderAPI.AccessControlStore, logger, autobuildTicker.C, options.NotificationsEnqueuer)
11281128
autobuildExecutor.Run()
11291129

1130-
hangDetectorTicker := time.NewTicker(vals.JobHangDetectorInterval.Value())
1131-
defer hangDetectorTicker.Stop()
1132-
hangDetector := unhanger.New(ctx, options.Database, options.Pubsub, logger, hangDetectorTicker.C)
1133-
hangDetector.Start()
1134-
defer hangDetector.Close()
1130+
jobReaperTicker := time.NewTicker(vals.JobReaperDetectorInterval.Value())
1131+
defer jobReaperTicker.Stop()
1132+
jobReaper := jobreaper.New(ctx, options.Database, options.Pubsub, logger, jobReaperTicker.C)
1133+
jobReaper.Start()
1134+
defer jobReaper.Close()
11351135

11361136
waitForProvisionerJobs := false
11371137
// Currently there is no way to ask the server to shut

cli/testdata/server-config.yaml.golden

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ networking:
183183
# Interval to poll for scheduled workspace builds.
184184
# (default: 1m0s, type: duration)
185185
autobuildPollInterval: 1m0s
186-
# Interval to poll for hung jobs and automatically terminate them.
186+
# Interval to poll for hung and pending jobs and automatically terminate them.
187187
# (default: 1m0s, type: duration)
188188
jobHangDetectorInterval: 1m0s
189189
introspection:

coderd/coderdtest/coderdtest.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ import (
6868
"github.com/coder/coder/v2/coderd/externalauth"
6969
"github.com/coder/coder/v2/coderd/gitsshkey"
7070
"github.com/coder/coder/v2/coderd/httpmw"
71+
"github.com/coder/coder/v2/coderd/jobreaper"
7172
"github.com/coder/coder/v2/coderd/notifications"
7273
"github.com/coder/coder/v2/coderd/notifications/notificationstest"
7374
"github.com/coder/coder/v2/coderd/rbac"
7475
"github.com/coder/coder/v2/coderd/rbac/policy"
7576
"github.com/coder/coder/v2/coderd/runtimeconfig"
7677
"github.com/coder/coder/v2/coderd/schedule"
7778
"github.com/coder/coder/v2/coderd/telemetry"
78-
"github.com/coder/coder/v2/coderd/unhanger"
7979
"github.com/coder/coder/v2/coderd/updatecheck"
8080
"github.com/coder/coder/v2/coderd/util/ptr"
8181
"github.com/coder/coder/v2/coderd/webpush"
@@ -368,11 +368,11 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
368368
).WithStatsChannel(options.AutobuildStats)
369369
lifecycleExecutor.Run()
370370

371-
hangDetectorTicker := time.NewTicker(options.DeploymentValues.JobHangDetectorInterval.Value())
372-
defer hangDetectorTicker.Stop()
373-
hangDetector := unhanger.New(ctx, options.Database, options.Pubsub, options.Logger.Named("unhanger.detector"), hangDetectorTicker.C)
374-
hangDetector.Start()
375-
t.Cleanup(hangDetector.Close)
371+
jobReaperTicker := time.NewTicker(options.DeploymentValues.JobReaperDetectorInterval.Value())
372+
defer jobReaperTicker.Stop()
373+
jobReaper := jobreaper.New(ctx, options.Database, options.Pubsub, options.Logger.Named("reaper.detector"), jobReaperTicker.C)
374+
jobReaper.Start()
375+
t.Cleanup(jobReaper.Close)
376376

377377
if options.TelemetryReporter == nil {
378378
options.TelemetryReporter = telemetry.NewNoop()

coderd/database/dbauthz/dbauthz.go

Lines changed: 80 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ var (
170170
Identifier: rbac.RoleIdentifier{Name: "provisionerd"},
171171
DisplayName: "Provisioner Daemon",
172172
Site: rbac.Permissions(map[string][]policy.Action{
173-
// TODO: Add ProvisionerJob resource type.
174-
rbac.ResourceFile.Type: {policy.ActionRead},
175-
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
176-
rbac.ResourceTemplate.Type: {policy.ActionRead, policy.ActionUpdate},
173+
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
174+
rbac.ResourceFile.Type: {policy.ActionRead},
175+
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
176+
rbac.ResourceTemplate.Type: {policy.ActionRead, policy.ActionUpdate},
177177
// Unsure why provisionerd needs update and read personal
178178
rbac.ResourceUser.Type: {policy.ActionRead, policy.ActionReadPersonal, policy.ActionUpdatePersonal},
179179
rbac.ResourceWorkspaceDormant.Type: {policy.ActionDelete, policy.ActionRead, policy.ActionUpdate, policy.ActionWorkspaceStop},
@@ -219,19 +219,20 @@ var (
219219
Scope: rbac.ScopeAll,
220220
}.WithCachedASTValue()
221221

222-
// See unhanger package.
223-
subjectHangDetector = rbac.Subject{
224-
Type: rbac.SubjectTypeHangDetector,
225-
FriendlyName: "Hang Detector",
222+
// See reaper package.
223+
subjectJobReaper = rbac.Subject{
224+
Type: rbac.SubjectTypeJobReaper,
225+
FriendlyName: "Job Reaper",
226226
ID: uuid.Nil.String(),
227227
Roles: rbac.Roles([]rbac.Role{
228228
{
229-
Identifier: rbac.RoleIdentifier{Name: "hangdetector"},
230-
DisplayName: "Hang Detector Daemon",
229+
Identifier: rbac.RoleIdentifier{Name: "jobreaper"},
230+
DisplayName: "Job Reaper Daemon",
231231
Site: rbac.Permissions(map[string][]policy.Action{
232-
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
233-
rbac.ResourceTemplate.Type: {policy.ActionRead},
234-
rbac.ResourceWorkspace.Type: {policy.ActionRead, policy.ActionUpdate},
232+
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
233+
rbac.ResourceTemplate.Type: {policy.ActionRead},
234+
rbac.ResourceWorkspace.Type: {policy.ActionRead, policy.ActionUpdate},
235+
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate},
235236
}),
236237
Org: map[string][]rbac.Permission{},
237238
User: []rbac.Permission{},
@@ -346,6 +347,7 @@ var (
346347
rbac.ResourceNotificationTemplate.Type: {policy.ActionCreate, policy.ActionUpdate, policy.ActionDelete},
347348
rbac.ResourceCryptoKey.Type: {policy.ActionCreate, policy.ActionUpdate, policy.ActionDelete},
348349
rbac.ResourceFile.Type: {policy.ActionCreate, policy.ActionRead},
350+
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
349351
}),
350352
Org: map[string][]rbac.Permission{},
351353
User: []rbac.Permission{},
@@ -407,10 +409,10 @@ func AsAutostart(ctx context.Context) context.Context {
407409
return As(ctx, subjectAutostart)
408410
}
409411

410-
// AsHangDetector returns a context with an actor that has permissions required
411-
// for unhanger.Detector to function.
412-
func AsHangDetector(ctx context.Context) context.Context {
413-
return As(ctx, subjectHangDetector)
412+
// AsJobReaper returns a context with an actor that has permissions required
413+
// for reaper.Detector to function.
414+
func AsJobReaper(ctx context.Context) context.Context {
415+
return As(ctx, subjectJobReaper)
414416
}
415417

416418
// AsKeyRotator returns a context with an actor that has permissions required for rotating crypto keys.
@@ -1085,11 +1087,10 @@ func (q *querier) AcquireNotificationMessages(ctx context.Context, arg database.
10851087
return q.db.AcquireNotificationMessages(ctx, arg)
10861088
}
10871089

1088-
// TODO: We need to create a ProvisionerJob resource type
10891090
func (q *querier) AcquireProvisionerJob(ctx context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) {
1090-
// if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
1091-
// return database.ProvisionerJob{}, err
1092-
// }
1091+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
1092+
return database.ProvisionerJob{}, err
1093+
}
10931094
return q.db.AcquireProvisionerJob(ctx, arg)
10941095
}
10951096

@@ -1912,14 +1913,6 @@ func (q *querier) GetHealthSettings(ctx context.Context) (string, error) {
19121913
return q.db.GetHealthSettings(ctx)
19131914
}
19141915

1915-
// TODO: We need to create a ProvisionerJob resource type
1916-
func (q *querier) GetHungProvisionerJobs(ctx context.Context, hungSince time.Time) ([]database.ProvisionerJob, error) {
1917-
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
1918-
// return nil, err
1919-
// }
1920-
return q.db.GetHungProvisionerJobs(ctx, hungSince)
1921-
}
1922-
19231916
func (q *querier) GetInboxNotificationByID(ctx context.Context, id uuid.UUID) (database.InboxNotification, error) {
19241917
return fetchWithAction(q.log, q.auth, policy.ActionRead, q.db.GetInboxNotificationByID)(ctx, id)
19251918
}
@@ -2307,6 +2300,13 @@ func (q *querier) GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (data
23072300
return job, nil
23082301
}
23092302

2303+
func (q *querier) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
2304+
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs); err != nil {
2305+
return database.ProvisionerJob{}, err
2306+
}
2307+
return q.db.GetProvisionerJobByIDForUpdate(ctx, id)
2308+
}
2309+
23102310
func (q *querier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
23112311
_, err := q.GetProvisionerJobByID(ctx, jobID)
23122312
if err != nil {
@@ -2315,31 +2315,49 @@ func (q *querier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uui
23152315
return q.db.GetProvisionerJobTimingsByJobID(ctx, jobID)
23162316
}
23172317

2318-
// TODO: We have a ProvisionerJobs resource, but it hasn't been checked for this use-case.
23192318
func (q *querier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
2320-
// if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
2321-
// return nil, err
2322-
// }
2323-
return q.db.GetProvisionerJobsByIDs(ctx, ids)
2319+
provisionerJobs, err := q.db.GetProvisionerJobsByIDs(ctx, ids)
2320+
if err != nil {
2321+
return nil, err
2322+
}
2323+
orgIDs := make(map[uuid.UUID]struct{})
2324+
for _, job := range provisionerJobs {
2325+
orgIDs[job.OrganizationID] = struct{}{}
2326+
}
2327+
for orgID := range orgIDs {
2328+
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs.InOrg(orgID)); err != nil {
2329+
return nil, err
2330+
}
2331+
}
2332+
return provisionerJobs, nil
23242333
}
23252334

2326-
// TODO: We have a ProvisionerJobs resource, but it hasn't been checked for this use-case.
23272335
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
2336+
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
2337+
// Details in https://github.com/coder/coder/issues/16160
23282338
return q.db.GetProvisionerJobsByIDsWithQueuePosition(ctx, ids)
23292339
}
23302340

23312341
func (q *querier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
2342+
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
2343+
// Details in https://github.com/coder/coder/issues/16160
23322344
return fetchWithPostFilter(q.auth, policy.ActionRead, q.db.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner)(ctx, arg)
23332345
}
23342346

2335-
// TODO: We have a ProvisionerJobs resource, but it hasn't been checked for this use-case.
23362347
func (q *querier) GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]database.ProvisionerJob, error) {
2337-
// if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
2338-
// return nil, err
2339-
// }
2348+
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs); err != nil {
2349+
return nil, err
2350+
}
23402351
return q.db.GetProvisionerJobsCreatedAfter(ctx, createdAt)
23412352
}
23422353

2354+
func (q *querier) GetProvisionerJobsToBeReaped(ctx context.Context, arg database.GetProvisionerJobsToBeReapedParams) ([]database.ProvisionerJob, error) {
2355+
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs); err != nil {
2356+
return nil, err
2357+
}
2358+
return q.db.GetProvisionerJobsToBeReaped(ctx, arg)
2359+
}
2360+
23432361
func (q *querier) GetProvisionerKeyByHashedSecret(ctx context.Context, hashedSecret []byte) (database.ProvisionerKey, error) {
23442362
return fetch(q.log, q.auth, q.db.GetProvisionerKeyByHashedSecret)(ctx, hashedSecret)
23452363
}
@@ -3533,27 +3551,22 @@ func (q *querier) InsertPresetParameters(ctx context.Context, arg database.Inser
35333551
return q.db.InsertPresetParameters(ctx, arg)
35343552
}
35353553

3536-
// TODO: We need to create a ProvisionerJob resource type
35373554
func (q *querier) InsertProvisionerJob(ctx context.Context, arg database.InsertProvisionerJobParams) (database.ProvisionerJob, error) {
3538-
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
3539-
// return database.ProvisionerJob{}, err
3540-
// }
3555+
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
3556+
// Details in https://github.com/coder/coder/issues/16160
35413557
return q.db.InsertProvisionerJob(ctx, arg)
35423558
}
35433559

3544-
// TODO: We need to create a ProvisionerJob resource type
35453560
func (q *querier) InsertProvisionerJobLogs(ctx context.Context, arg database.InsertProvisionerJobLogsParams) ([]database.ProvisionerJobLog, error) {
3546-
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
3547-
// return nil, err
3548-
// }
3561+
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
3562+
// Details in https://github.com/coder/coder/issues/16160
35493563
return q.db.InsertProvisionerJobLogs(ctx, arg)
35503564
}
35513565

3552-
// TODO: We need to create a ProvisionerJob resource type
35533566
func (q *querier) InsertProvisionerJobTimings(ctx context.Context, arg database.InsertProvisionerJobTimingsParams) ([]database.ProvisionerJobTiming, error) {
3554-
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
3555-
// return nil, err
3556-
// }
3567+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
3568+
return nil, err
3569+
}
35573570
return q.db.InsertProvisionerJobTimings(ctx, arg)
35583571
}
35593572

@@ -4176,15 +4189,17 @@ func (q *querier) UpdateProvisionerDaemonLastSeenAt(ctx context.Context, arg dat
41764189
return q.db.UpdateProvisionerDaemonLastSeenAt(ctx, arg)
41774190
}
41784191

4179-
// TODO: We need to create a ProvisionerJob resource type
41804192
func (q *querier) UpdateProvisionerJobByID(ctx context.Context, arg database.UpdateProvisionerJobByIDParams) error {
4181-
// if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
4182-
// return err
4183-
// }
4193+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
4194+
return err
4195+
}
41844196
return q.db.UpdateProvisionerJobByID(ctx, arg)
41854197
}
41864198

41874199
func (q *querier) UpdateProvisionerJobWithCancelByID(ctx context.Context, arg database.UpdateProvisionerJobWithCancelByIDParams) error {
4200+
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
4201+
// Details in https://github.com/coder/coder/issues/16160
4202+
41884203
job, err := q.db.GetProvisionerJobByID(ctx, arg.ID)
41894204
if err != nil {
41904205
return err
@@ -4251,14 +4266,20 @@ func (q *querier) UpdateProvisionerJobWithCancelByID(ctx context.Context, arg da
42514266
return q.db.UpdateProvisionerJobWithCancelByID(ctx, arg)
42524267
}
42534268

4254-
// TODO: We need to create a ProvisionerJob resource type
42554269
func (q *querier) UpdateProvisionerJobWithCompleteByID(ctx context.Context, arg database.UpdateProvisionerJobWithCompleteByIDParams) error {
4256-
// if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
4257-
// return err
4258-
// }
4270+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
4271+
return err
4272+
}
42594273
return q.db.UpdateProvisionerJobWithCompleteByID(ctx, arg)
42604274
}
42614275

4276+
func (q *querier) UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx context.Context, arg database.UpdateProvisionerJobWithCompleteWithStartedAtByIDParams) error {
4277+
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
4278+
return err
4279+
}
4280+
return q.db.UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx, arg)
4281+
}
4282+
42624283
func (q *querier) UpdateReplica(ctx context.Context, arg database.UpdateReplicaParams) (database.Replica, error) {
42634284
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
42644285
return database.Replica{}, err

0 commit comments

Comments
 (0)