Skip to content

Commit e53f0be

Browse files
committed
Refactor job update for provisionerd
1 parent a6ce22d commit e53f0be

File tree

7 files changed

+244
-288
lines changed

7 files changed

+244
-288
lines changed

coderd/provisionerdaemons.go

Lines changed: 115 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,6 @@ type workspaceProvisionJob struct {
109109
DryRun bool `json:"dry_run"`
110110
}
111111

112-
// The input for a "project_import" job.
113-
type projectVersionImportJob struct {
114-
OrganizationID string `json:"organization_id"`
115-
ProjectID uuid.UUID `json:"project_id"`
116-
}
117-
118112
// Implementation of the provisioner daemon protobuf server.
119113
type provisionerdServer struct {
120114
ID uuid.UUID
@@ -242,39 +236,8 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
242236
},
243237
}
244238
case database.ProvisionerJobTypeProjectVersionImport:
245-
var input projectVersionImportJob
246-
err = json.Unmarshal(job.Input, &input)
247-
if err != nil {
248-
return nil, failJob(fmt.Sprintf("unmarshal job input %q: %s", job.Input, err))
249-
}
250-
251-
// Compute parameters for the workspace to consume.
252-
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
253-
ProjectImportJobID: job.ID,
254-
OrganizationID: input.OrganizationID,
255-
ProjectID: uuid.NullUUID{
256-
UUID: input.ProjectID,
257-
Valid: input.ProjectID.String() != uuid.Nil.String(),
258-
},
259-
UserID: user.ID,
260-
}, nil)
261-
if err != nil {
262-
return nil, failJob(fmt.Sprintf("compute parameters: %s", err))
263-
}
264-
// Convert parameters to the protobuf type.
265-
protoParameters := make([]*sdkproto.ParameterValue, 0, len(parameters))
266-
for _, parameter := range parameters {
267-
converted, err := convertComputedParameterValue(parameter)
268-
if err != nil {
269-
return nil, failJob(fmt.Sprintf("convert parameter: %s", err))
270-
}
271-
protoParameters = append(protoParameters, converted)
272-
}
273-
274239
protoJob.Type = &proto.AcquiredJob_ProjectImport_{
275-
ProjectImport: &proto.AcquiredJob_ProjectImport{
276-
ParameterValues: protoParameters,
277-
},
240+
ProjectImport: &proto.AcquiredJob_ProjectImport{},
278241
}
279242
}
280243
switch job.StorageMethod {
@@ -291,119 +254,137 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
291254
return protoJob, err
292255
}
293256

294-
func (server *provisionerdServer) UpdateJob(stream proto.DRPCProvisionerDaemon_UpdateJobStream) error {
295-
for {
296-
update, err := stream.Recv()
297-
if err != nil {
298-
return err
257+
func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
258+
parsedID, err := uuid.Parse(request.JobId)
259+
if err != nil {
260+
return nil, xerrors.Errorf("parse job id: %w", err)
261+
}
262+
job, err := server.Database.GetProvisionerJobByID(ctx, parsedID)
263+
if err != nil {
264+
return nil, xerrors.Errorf("get job: %w", err)
265+
}
266+
if !job.WorkerID.Valid {
267+
return nil, xerrors.New("job isn't running yet")
268+
}
269+
if job.WorkerID.UUID.String() != server.ID.String() {
270+
return nil, xerrors.New("you don't own this job")
271+
}
272+
err = server.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
273+
ID: parsedID,
274+
UpdatedAt: database.Now(),
275+
})
276+
if err != nil {
277+
return nil, xerrors.Errorf("update job: %w", err)
278+
}
279+
280+
if len(request.Logs) > 0 {
281+
insertParams := database.InsertProvisionerJobLogsParams{
282+
JobID: parsedID,
299283
}
300-
parsedID, err := uuid.Parse(update.JobId)
301-
if err != nil {
302-
return xerrors.Errorf("parse job id: %w", err)
284+
for _, log := range request.Logs {
285+
logLevel, err := convertLogLevel(log.Level)
286+
if err != nil {
287+
return nil, xerrors.Errorf("convert log level: %w", err)
288+
}
289+
logSource, err := convertLogSource(log.Source)
290+
if err != nil {
291+
return nil, xerrors.Errorf("convert log source: %w", err)
292+
}
293+
insertParams.ID = append(insertParams.ID, uuid.New())
294+
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
295+
insertParams.Level = append(insertParams.Level, logLevel)
296+
insertParams.Source = append(insertParams.Source, logSource)
297+
insertParams.Output = append(insertParams.Output, log.Output)
303298
}
304-
job, err := server.Database.GetProvisionerJobByID(stream.Context(), parsedID)
299+
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
305300
if err != nil {
306-
return xerrors.Errorf("get job: %w", err)
301+
return nil, xerrors.Errorf("insert job logs: %w", err)
307302
}
308-
if !job.WorkerID.Valid {
309-
return xerrors.New("job isn't running yet")
310-
}
311-
if job.WorkerID.UUID.String() != server.ID.String() {
312-
return xerrors.New("you don't own this job")
303+
data, err := json.Marshal(logs)
304+
if err != nil {
305+
return nil, xerrors.Errorf("marshal job log: %w", err)
313306
}
314-
315-
err = server.Database.UpdateProvisionerJobByID(stream.Context(), database.UpdateProvisionerJobByIDParams{
316-
ID: parsedID,
317-
UpdatedAt: database.Now(),
318-
})
307+
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
319308
if err != nil {
320-
return xerrors.Errorf("update job: %w", err)
309+
return nil, xerrors.Errorf("publish job log: %w", err)
321310
}
322-
if len(update.Logs) > 0 {
323-
insertParams := database.InsertProvisionerJobLogsParams{
324-
JobID: parsedID,
325-
}
326-
for _, log := range update.Logs {
327-
logLevel, err := convertLogLevel(log.Level)
328-
if err != nil {
329-
return xerrors.Errorf("convert log level: %w", err)
330-
}
331-
logSource, err := convertLogSource(log.Source)
332-
if err != nil {
333-
return xerrors.Errorf("convert log source: %w", err)
334-
}
335-
insertParams.ID = append(insertParams.ID, uuid.New())
336-
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
337-
insertParams.Level = append(insertParams.Level, logLevel)
338-
insertParams.Source = append(insertParams.Source, logSource)
339-
insertParams.Output = append(insertParams.Output, log.Output)
340-
}
341-
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
342-
if err != nil {
343-
return xerrors.Errorf("insert job logs: %w", err)
344-
}
345-
data, err := json.Marshal(logs)
311+
}
312+
313+
if len(request.ParameterSchemas) > 0 {
314+
for _, protoParameter := range request.ParameterSchemas {
315+
validationTypeSystem, err := convertValidationTypeSystem(protoParameter.ValidationTypeSystem)
346316
if err != nil {
347-
return xerrors.Errorf("marshal job log: %w", err)
317+
return nil, xerrors.Errorf("convert validation type system for %q: %w", protoParameter.Name, err)
348318
}
349-
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
350-
if err != nil {
351-
return xerrors.Errorf("publish job log: %w", err)
319+
320+
parameterSchema := database.InsertParameterSchemaParams{
321+
ID: uuid.New(),
322+
CreatedAt: database.Now(),
323+
JobID: job.ID,
324+
Name: protoParameter.Name,
325+
Description: protoParameter.Description,
326+
RedisplayValue: protoParameter.RedisplayValue,
327+
ValidationError: protoParameter.ValidationError,
328+
ValidationCondition: protoParameter.ValidationCondition,
329+
ValidationValueType: protoParameter.ValidationValueType,
330+
ValidationTypeSystem: validationTypeSystem,
331+
332+
DefaultSourceScheme: database.ParameterSourceSchemeNone,
333+
DefaultDestinationScheme: database.ParameterDestinationSchemeNone,
334+
335+
AllowOverrideDestination: protoParameter.AllowOverrideDestination,
336+
AllowOverrideSource: protoParameter.AllowOverrideSource,
352337
}
353-
}
354338

355-
if update.GetProjectImport() != nil {
356-
// Validate that all parameters send from the provisioner daemon
357-
// follow the protocol.
358-
parameterSchemas := make([]database.InsertParameterSchemaParams, 0, len(update.GetProjectImport().ParameterSchemas))
359-
for _, protoParameter := range update.GetProjectImport().ParameterSchemas {
360-
validationTypeSystem, err := convertValidationTypeSystem(protoParameter.ValidationTypeSystem)
339+
// It's possible a parameter doesn't define a default source!
340+
if protoParameter.DefaultSource != nil {
341+
parameterSourceScheme, err := convertParameterSourceScheme(protoParameter.DefaultSource.Scheme)
361342
if err != nil {
362-
return xerrors.Errorf("convert validation type system for %q: %w", protoParameter.Name, err)
363-
}
364-
365-
parameterSchema := database.InsertParameterSchemaParams{
366-
ID: uuid.New(),
367-
CreatedAt: database.Now(),
368-
JobID: job.ID,
369-
Name: protoParameter.Name,
370-
Description: protoParameter.Description,
371-
RedisplayValue: protoParameter.RedisplayValue,
372-
ValidationError: protoParameter.ValidationError,
373-
ValidationCondition: protoParameter.ValidationCondition,
374-
ValidationValueType: protoParameter.ValidationValueType,
375-
ValidationTypeSystem: validationTypeSystem,
376-
377-
DefaultSourceScheme: database.ParameterSourceSchemeNone,
378-
DefaultDestinationScheme: database.ParameterDestinationSchemeNone,
379-
380-
AllowOverrideDestination: protoParameter.AllowOverrideDestination,
381-
AllowOverrideSource: protoParameter.AllowOverrideSource,
343+
return nil, xerrors.Errorf("convert parameter source scheme: %w", err)
382344
}
345+
parameterSchema.DefaultSourceScheme = parameterSourceScheme
346+
parameterSchema.DefaultSourceValue = protoParameter.DefaultSource.Value
347+
}
383348

384-
// It's possible a parameter doesn't define a default source!
385-
if protoParameter.DefaultSource != nil {
386-
parameterSourceScheme, err := convertParameterSourceScheme(protoParameter.DefaultSource.Scheme)
387-
if err != nil {
388-
return xerrors.Errorf("convert parameter source scheme: %w", err)
389-
}
390-
parameterSchema.DefaultSourceScheme = parameterSourceScheme
391-
parameterSchema.DefaultSourceValue = protoParameter.DefaultSource.Value
349+
// It's possible a parameter doesn't define a default destination!
350+
if protoParameter.DefaultDestination != nil {
351+
parameterDestinationScheme, err := convertParameterDestinationScheme(protoParameter.DefaultDestination.Scheme)
352+
if err != nil {
353+
return nil, xerrors.Errorf("convert parameter destination scheme: %w", err)
392354
}
355+
parameterSchema.DefaultDestinationScheme = parameterDestinationScheme
356+
}
393357

394-
// It's possible a parameter doesn't define a default destination!
395-
if protoParameter.DefaultDestination != nil {
396-
parameterDestinationScheme, err := convertParameterDestinationScheme(protoParameter.DefaultDestination.Scheme)
397-
if err != nil {
398-
return xerrors.Errorf("convert parameter destination scheme: %w", err)
399-
}
400-
parameterSchema.DefaultDestinationScheme = parameterDestinationScheme
401-
}
358+
_, err = server.Database.InsertParameterSchema(ctx, parameterSchema)
359+
if err != nil {
360+
return nil, xerrors.Errorf("insert parameter schema: %w", err)
361+
}
362+
}
402363

403-
parameterSchemas = append(parameterSchemas, parameterSchema)
364+
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
365+
ProjectImportJobID: job.ID,
366+
OrganizationID: job.OrganizationID,
367+
UserID: job.InitiatorID,
368+
}, nil)
369+
if err != nil {
370+
return nil, xerrors.Errorf("compute parameters: %w", err)
371+
}
372+
// Convert parameters to the protobuf type.
373+
protoParameters := make([]*sdkproto.ParameterValue, 0, len(parameters))
374+
for _, parameter := range parameters {
375+
converted, err := convertComputedParameterValue(parameter)
376+
if err != nil {
377+
return nil, xerrors.Errorf("convert parameter: %s", err)
404378
}
379+
protoParameters = append(protoParameters, converted)
405380
}
381+
382+
return &proto.UpdateJobResponse{
383+
ParameterValues: protoParameters,
384+
}, nil
406385
}
386+
387+
return &proto.UpdateJobResponse{}, nil
407388
}
408389

409390
func (server *provisionerdServer) CancelJob(ctx context.Context, cancelJob *proto.CancelledJob) (*proto.Empty, error) {
@@ -450,17 +431,12 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
450431
if err != nil {
451432
return nil, xerrors.Errorf("get job by id: %w", err)
452433
}
453-
// TODO: Check if the worker ID matches!
454-
// If it doesn't, a provisioner daemon could be impersonating another job!
434+
if job.WorkerID.UUID.String() != server.ID.String() {
435+
return nil, xerrors.Errorf("you don't have permission to update this job")
436+
}
455437

456438
switch jobType := completed.Type.(type) {
457439
case *proto.CompletedJob_ProjectImport_:
458-
var input projectVersionImportJob
459-
err = json.Unmarshal(job.Input, &input)
460-
if err != nil {
461-
return nil, xerrors.Errorf("unmarshal job data: %w", err)
462-
}
463-
464440
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
465441
ID: jobID,
466442
UpdatedAt: database.Now(),

coderd/provisionerjobs.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package coderd
22

33
import (
44
"database/sql"
5-
"encoding/json"
65
"errors"
76
"fmt"
87
"net/http"
@@ -80,16 +79,6 @@ func (api *api) postProvisionerImportJobByOrganization(rw http.ResponseWriter, r
8079
return
8180
}
8281

83-
input, err := json.Marshal(projectVersionImportJob{
84-
OrganizationID: organization.ID,
85-
})
86-
if err != nil {
87-
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
88-
Message: fmt.Sprintf("marshal job: %s", err),
89-
})
90-
return
91-
}
92-
9382
jobID := uuid.New()
9483
for _, parameterValue := range req.ParameterValues {
9584
_, err = api.Database.InsertParameterValue(r.Context(), database.InsertParameterValueParams{
@@ -121,7 +110,6 @@ func (api *api) postProvisionerImportJobByOrganization(rw http.ResponseWriter, r
121110
StorageMethod: database.ProvisionerStorageMethodFile,
122111
StorageSource: file.Hash,
123112
Type: database.ProvisionerJobTypeProjectVersionImport,
124-
Input: input,
125113
})
126114
if err != nil {
127115
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{

coderd/provisionerjobs_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package coderd_test
22

33
import (
44
"context"
5-
"fmt"
65
"net/http"
76
"testing"
87
"time"
@@ -89,7 +88,6 @@ func TestPostProvisionerImportJobByOrganization(t *testing.T) {
8988
})
9089
require.NoError(t, err)
9190
job = coderdtest.AwaitProvisionerJob(t, client, user.Organization, job.ID)
92-
fmt.Printf("Job %+v\n", job)
9391
values, err := client.ProvisionerJobParameterValues(context.Background(), user.Organization, job.ID)
9492
require.NoError(t, err)
9593
require.Equal(t, "somevalue", values[0].SourceValue)
@@ -120,16 +118,23 @@ func TestProvisionerJobParametersByID(t *testing.T) {
120118
Complete: &proto.Parse_Complete{
121119
ParameterSchemas: []*proto.ParameterSchema{{
122120
Name: "example",
121+
DefaultSource: &proto.ParameterSource{
122+
Scheme: proto.ParameterSource_DATA,
123+
Value: "hello",
124+
},
125+
DefaultDestination: &proto.ParameterDestination{
126+
Scheme: proto.ParameterDestination_PROVISIONER_VARIABLE,
127+
},
123128
}},
124129
},
125130
},
126131
}},
127132
Provision: echo.ProvisionComplete,
128133
})
129-
coderdtest.AwaitProvisionerJob(t, client, user.Organization, job.ID)
134+
job = coderdtest.AwaitProvisionerJob(t, client, user.Organization, job.ID)
130135
params, err := client.ProvisionerJobParameterValues(context.Background(), user.Organization, job.ID)
131136
require.NoError(t, err)
132-
require.Len(t, params, 0)
137+
require.Len(t, params, 1)
133138
})
134139

135140
t.Run("ListNoRedisplay", func(t *testing.T) {
@@ -149,7 +154,6 @@ func TestProvisionerJobParametersByID(t *testing.T) {
149154
},
150155
DefaultDestination: &proto.ParameterDestination{
151156
Scheme: proto.ParameterDestination_PROVISIONER_VARIABLE,
152-
Value: "example",
153157
},
154158
RedisplayValue: false,
155159
}},

provisioner/terraform/parse.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ func convertVariableToParameter(variable *tfconfig.Variable) (*proto.ParameterSc
5959
}
6060
schema.DefaultDestination = &proto.ParameterDestination{
6161
Scheme: proto.ParameterDestination_PROVISIONER_VARIABLE,
62-
Value: variable.Name,
6362
}
6463
}
6564

0 commit comments

Comments
 (0)