Skip to content

Commit bc8c0e0

Browse files
committed
Add support for completing a job
1 parent 1e8c421 commit bc8c0e0

File tree

6 files changed

+269
-14
lines changed

6 files changed

+269
-14
lines changed

coderd/provisionerd.go

Lines changed: 190 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"net/http"
10+
"reflect"
1011

1112
"golang.org/x/xerrors"
1213
"storj.io/drpc/drpcmux"
@@ -267,6 +268,194 @@ func (s *provisionerdServer) CancelJob(ctx context.Context, cancelJob *proto.Can
267268
return &proto.Empty{}, nil
268269
}
269270

271+
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.
270272
func (s *provisionerdServer) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
271-
return nil, nil
273+
jobID, err := uuid.Parse(completed.JobId)
274+
if err != nil {
275+
return nil, xerrors.Errorf("parse job id: %w", err)
276+
}
277+
job, err := s.Database.GetProvisionerJobByID(ctx, jobID)
278+
if err != nil {
279+
return nil, xerrors.Errorf("get job by id: %w", err)
280+
}
281+
// TODO: Check if the worker ID matches!
282+
// If it doesn't, a provisioner daemon could be impersonating another job!
283+
284+
switch jobType := completed.Type.(type) {
285+
case *proto.CompletedJob_ProjectImport_:
286+
var input projectImportJob
287+
err = json.Unmarshal(job.Input, &input)
288+
if err != nil {
289+
return nil, xerrors.Errorf("unmarshal job data: %w", err)
290+
}
291+
292+
// Validate that all parameters send from the provisioner daemon
293+
// follow the protocol.
294+
projectParameters := make([]database.InsertProjectParameterParams, 0, len(jobType.ProjectImport.ParameterSchemas))
295+
for _, protoParameter := range jobType.ProjectImport.ParameterSchemas {
296+
validationTypeSystem, err := convertValidationTypeSystem(protoParameter.ValidationTypeSystem)
297+
if err != nil {
298+
return nil, xerrors.Errorf("convert validation type system for %q: %w", protoParameter.Name, err)
299+
}
300+
301+
projectParameter := database.InsertProjectParameterParams{
302+
ID: uuid.New(),
303+
CreatedAt: database.Now(),
304+
ProjectHistoryID: input.ProjectHistoryID,
305+
Name: protoParameter.Name,
306+
Description: protoParameter.Description,
307+
RedisplayValue: protoParameter.RedisplayValue,
308+
ValidationError: protoParameter.ValidationError,
309+
ValidationCondition: protoParameter.ValidationCondition,
310+
ValidationValueType: protoParameter.ValidationValueType,
311+
ValidationTypeSystem: validationTypeSystem,
312+
313+
AllowOverrideDestination: protoParameter.AllowOverrideDestination,
314+
AllowOverrideSource: protoParameter.AllowOverrideSource,
315+
}
316+
317+
// It's possible a parameter doesn't define a default source!
318+
if protoParameter.DefaultSource != nil {
319+
parameterSourceScheme, err := convertParameterSourceScheme(protoParameter.DefaultSource.Scheme)
320+
if err != nil {
321+
return nil, xerrors.Errorf("convert parameter source scheme: %w", err)
322+
}
323+
projectParameter.DefaultSourceScheme = parameterSourceScheme
324+
projectParameter.DefaultSourceValue = sql.NullString{
325+
String: protoParameter.DefaultSource.Value,
326+
Valid: protoParameter.DefaultSource.Value != "",
327+
}
328+
}
329+
330+
// It's possible a parameter doesn't define a default destination!
331+
if protoParameter.DefaultDestination != nil {
332+
parameterDestinationScheme, err := convertParameterDestinationScheme(protoParameter.DefaultDestination.Scheme)
333+
if err != nil {
334+
return nil, xerrors.Errorf("convert parameter destination scheme: %w", err)
335+
}
336+
projectParameter.DefaultDestinationScheme = parameterDestinationScheme
337+
projectParameter.DefaultDestinationValue = sql.NullString{
338+
String: protoParameter.DefaultDestination.Value,
339+
Valid: protoParameter.DefaultDestination.Value != "",
340+
}
341+
}
342+
343+
projectParameters = append(projectParameters, projectParameter)
344+
}
345+
346+
// This must occur in a transaction in case of failure.
347+
err = s.Database.InTx(func(db database.Store) error {
348+
err = db.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
349+
ID: jobID,
350+
UpdatedAt: database.Now(),
351+
CompletedAt: sql.NullTime{
352+
Time: database.Now(),
353+
Valid: true,
354+
},
355+
})
356+
if err != nil {
357+
return xerrors.Errorf("update provisioner job: %w", err)
358+
}
359+
for _, projectParameter := range projectParameters {
360+
_, err = db.InsertProjectParameter(ctx, projectParameter)
361+
if err != nil {
362+
return xerrors.Errorf("insert project parameter %q: %w", projectParameter.Name, err)
363+
}
364+
}
365+
return nil
366+
})
367+
if err != nil {
368+
return nil, xerrors.Errorf("complete job: %w", err)
369+
}
370+
case *proto.CompletedJob_WorkspaceProvision_:
371+
var input workspaceProvisionJob
372+
err = json.Unmarshal(job.Input, &input)
373+
if err != nil {
374+
return nil, xerrors.Errorf("unmarshal job data: %w", err)
375+
}
376+
377+
workspaceHistory, err := s.Database.GetWorkspaceHistoryByID(ctx, input.WorkspaceHistoryID)
378+
if err != nil {
379+
return nil, xerrors.Errorf("get workspace history: %w", err)
380+
}
381+
382+
err = s.Database.InTx(func(db database.Store) error {
383+
err = db.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
384+
ID: jobID,
385+
UpdatedAt: database.Now(),
386+
CompletedAt: sql.NullTime{
387+
Time: database.Now(),
388+
Valid: true,
389+
},
390+
})
391+
if err != nil {
392+
return xerrors.Errorf("update provisioner job: %w", err)
393+
}
394+
err = db.UpdateWorkspaceHistoryByID(ctx, database.UpdateWorkspaceHistoryByIDParams{
395+
ID: workspaceHistory.ID,
396+
UpdatedAt: database.Now(),
397+
ProvisionerState: jobType.WorkspaceProvision.State,
398+
CompletedAt: sql.NullTime{
399+
Time: database.Now(),
400+
Valid: true,
401+
},
402+
})
403+
if err != nil {
404+
return xerrors.Errorf("update workspace history: %w", err)
405+
}
406+
for _, protoResource := range jobType.WorkspaceProvision.Resources {
407+
_, err = db.InsertWorkspaceResource(ctx, database.InsertWorkspaceResourceParams{
408+
ID: uuid.New(),
409+
CreatedAt: database.Now(),
410+
WorkspaceHistoryID: input.WorkspaceHistoryID,
411+
Type: protoResource.Type,
412+
Name: protoResource.Name,
413+
// TODO: Generate this at the variable validation phase.
414+
// Set the value in `default_source`, and disallow overwrite.
415+
WorkspaceAgentToken: uuid.NewString(),
416+
})
417+
if err != nil {
418+
return xerrors.Errorf("insert workspace resource %q: %w", protoResource.Name, err)
419+
}
420+
}
421+
return nil
422+
})
423+
if err != nil {
424+
return nil, xerrors.Errorf("complete job: %w", err)
425+
}
426+
default:
427+
return nil, xerrors.Errorf("unknown job type %q; ensure coderd and provisionerd versions match",
428+
reflect.TypeOf(completed.Type).String())
429+
}
430+
431+
return &proto.Empty{}, nil
432+
}
433+
434+
func convertValidationTypeSystem(typeSystem sdkproto.ParameterSchema_TypeSystem) (database.ParameterTypeSystem, error) {
435+
switch typeSystem {
436+
case sdkproto.ParameterSchema_HCL:
437+
return database.ParameterTypeSystemHCL, nil
438+
default:
439+
return database.ParameterTypeSystem(""), xerrors.Errorf("unknown type system: %d", typeSystem)
440+
}
441+
}
442+
443+
func convertParameterSourceScheme(sourceScheme sdkproto.ParameterSource_Scheme) (database.ParameterSourceScheme, error) {
444+
switch sourceScheme {
445+
case sdkproto.ParameterSource_DATA:
446+
return database.ParameterSourceSchemeData, nil
447+
default:
448+
return database.ParameterSourceScheme(""), xerrors.Errorf("unknown parameter source scheme: %d", sourceScheme)
449+
}
450+
}
451+
452+
func convertParameterDestinationScheme(destinationScheme sdkproto.ParameterDestination_Scheme) (database.ParameterDestinationScheme, error) {
453+
switch destinationScheme {
454+
case sdkproto.ParameterDestination_ENVIRONMENT_VARIABLE:
455+
return database.ParameterDestinationSchemeEnvironmentVariable, nil
456+
case sdkproto.ParameterDestination_PROVISIONER_VARIABLE:
457+
return database.ParameterDestinationSchemeProvisionerVariable, nil
458+
default:
459+
return database.ParameterDestinationScheme(""), xerrors.Errorf("unknown parameter destination scheme: %d", destinationScheme)
460+
}
272461
}

database/databasefake/databasefake.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,9 @@ func (q *fakeQuerier) UpdateWorkspaceHistoryByID(_ context.Context, arg database
659659
continue
660660
}
661661
workspaceHistory.UpdatedAt = arg.UpdatedAt
662+
workspaceHistory.CompletedAt = arg.CompletedAt
662663
workspaceHistory.AfterID = arg.AfterID
664+
workspaceHistory.ProvisionerState = arg.ProvisionerState
663665
q.workspaceHistory[index] = workspaceHistory
664666
return nil
665667
}

database/query.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,8 @@ UPDATE
543543
workspace_history
544544
SET
545545
updated_at = $2,
546-
after_id = $3
546+
completed_at = $3,
547+
after_id = $4,
548+
provisioner_state = $5
547549
WHERE
548550
id = $1;

database/query.sql.go

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

provisionerd/provisionerd.go

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ func (a *API) acquireJob() {
168168
a.cancelActiveJob(fmt.Sprintf("remove all from %q directory: %s", a.opts.WorkDirectory, err))
169169
return
170170
}
171+
a.opts.Logger.Debug(context.Background(), "cleaned up work directory")
171172
}()
172173

173174
err = os.MkdirAll(a.opts.WorkDirectory, 0600)
@@ -233,20 +234,67 @@ func (a *API) acquireJob() {
233234
a.opts.Logger.Debug(context.Background(), "acquired job is project import",
234235
slog.F("project_history_name", jobType.ProjectImport.ProjectHistoryName),
235236
)
237+
238+
response, err := provisioner.Parse(a.closeContext, &provisionersdkproto.Parse_Request{
239+
Directory: a.opts.WorkDirectory,
240+
})
241+
if err != nil {
242+
a.cancelActiveJob(fmt.Sprintf("parse source: %s", err))
243+
return
244+
}
245+
_, err = a.client.CompleteJob(a.closeContext, &proto.CompletedJob{
246+
JobId: a.activeJob.JobId,
247+
Type: &proto.CompletedJob_ProjectImport_{
248+
ProjectImport: &proto.CompletedJob_ProjectImport{
249+
ParameterSchemas: response.ParameterSchemas,
250+
},
251+
},
252+
})
253+
if err != nil {
254+
a.cancelActiveJob(fmt.Sprintf("complete job: %s", err))
255+
return
256+
}
236257
case *proto.AcquiredJob_WorkspaceProvision_:
237258
a.opts.Logger.Debug(context.Background(), "acquired job is workspace provision",
238259
slog.F("workspace_name", jobType.WorkspaceProvision.WorkspaceName),
239260
slog.F("state_length", len(jobType.WorkspaceProvision.State)),
240261
slog.F("parameters", jobType.WorkspaceProvision.ParameterValues),
241262
)
242263

264+
response, err := provisioner.Provision(a.closeContext, &provisionersdkproto.Provision_Request{
265+
Directory: a.opts.WorkDirectory,
266+
ParameterValues: jobType.WorkspaceProvision.ParameterValues,
267+
State: jobType.WorkspaceProvision.State,
268+
})
269+
if err != nil {
270+
a.cancelActiveJob(fmt.Sprintf("provision: %s", err))
271+
return
272+
}
273+
a.opts.Logger.Debug(context.Background(), "provision successful; marking job as complete",
274+
slog.F("resource_count", len(response.Resources)),
275+
slog.F("resources", response.Resources),
276+
slog.F("state_length", len(response.State)),
277+
)
278+
279+
// Complete job may need to be async if we disconnected...
280+
// When we reconnect we can flush any of these cached values.
281+
_, err = a.client.CompleteJob(a.closeContext, &proto.CompletedJob{
282+
JobId: a.activeJob.JobId,
283+
Type: &proto.CompletedJob_WorkspaceProvision_{
284+
WorkspaceProvision: &proto.CompletedJob_WorkspaceProvision{
285+
State: response.State,
286+
Resources: response.Resources,
287+
},
288+
},
289+
})
290+
if err != nil {
291+
a.cancelActiveJob(fmt.Sprintf("complete job: %s", err))
292+
return
293+
}
243294
default:
244295
a.cancelActiveJob(fmt.Sprintf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(a.activeJob.Type).String()))
245296
return
246297
}
247-
248-
fmt.Printf("Provisioner: %s\n", provisioner)
249-
// Work!
250298
}
251299

252300
func (a *API) cancelActiveJob(errMsg string) {
@@ -257,6 +305,9 @@ func (a *API) cancelActiveJob(errMsg string) {
257305
a.activeJob = nil
258306
return
259307
}
308+
if a.activeJob == nil {
309+
return
310+
}
260311

261312
a.opts.Logger.Info(context.Background(), "canceling active job",
262313
slog.F("error_message", errMsg),

provisionerd/provisionerd_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ func TestProvisionerd(t *testing.T) {
4242
setupProjectVersion := func(t *testing.T, client *codersdk.Client, user coderd.CreateInitialUserRequest, project coderd.Project) coderd.ProjectHistory {
4343
var buffer bytes.Buffer
4444
writer := tar.NewWriter(&buffer)
45+
content := `resource "null_resource" "hi" {}`
4546
err := writer.WriteHeader(&tar.Header{
46-
Name: "file",
47-
Size: 1 << 10,
47+
Name: "main.tf",
48+
Size: int64(len(content)),
4849
})
4950
require.NoError(t, err)
50-
_, err = writer.Write(make([]byte, 1<<10))
51+
_, err = writer.Write([]byte(content))
5152
require.NoError(t, err)
5253
projectHistory, err := client.CreateProjectHistory(context.Background(), user.Organization, project.Name, coderd.CreateProjectVersionRequest{
5354
StorageMethod: database.ProjectStorageMethodInlineArchive,
@@ -102,6 +103,6 @@ func TestProvisionerd(t *testing.T) {
102103
WorkDirectory: t.TempDir(),
103104
})
104105
defer api.Close()
105-
time.Sleep(time.Millisecond * 500)
106+
time.Sleep(time.Millisecond * 1500)
106107
})
107108
}

0 commit comments

Comments
 (0)