@@ -109,12 +109,6 @@ type workspaceProvisionJob struct {
109
109
DryRun bool `json:"dry_run"`
110
110
}
111
111
112
- // The input for a "project_import" job.
113
- type projectVersionImportJob struct {
114
- OrganizationID string `json:"organization_id"`
115
- ProjectID uuid.UUID `json:"project_id"`
116
- }
117
-
118
112
// Implementation of the provisioner daemon protobuf server.
119
113
type provisionerdServer struct {
120
114
ID uuid.UUID
@@ -242,39 +236,8 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
242
236
},
243
237
}
244
238
case database .ProvisionerJobTypeProjectVersionImport :
245
- var input projectVersionImportJob
246
- err = json .Unmarshal (job .Input , & input )
247
- if err != nil {
248
- return nil , failJob (fmt .Sprintf ("unmarshal job input %q: %s" , job .Input , err ))
249
- }
250
-
251
- // Compute parameters for the workspace to consume.
252
- parameters , err := parameter .Compute (ctx , server .Database , parameter.ComputeScope {
253
- ProjectImportJobID : job .ID ,
254
- OrganizationID : input .OrganizationID ,
255
- ProjectID : uuid.NullUUID {
256
- UUID : input .ProjectID ,
257
- Valid : input .ProjectID .String () != uuid .Nil .String (),
258
- },
259
- UserID : user .ID ,
260
- }, nil )
261
- if err != nil {
262
- return nil , failJob (fmt .Sprintf ("compute parameters: %s" , err ))
263
- }
264
- // Convert parameters to the protobuf type.
265
- protoParameters := make ([]* sdkproto.ParameterValue , 0 , len (parameters ))
266
- for _ , parameter := range parameters {
267
- converted , err := convertComputedParameterValue (parameter )
268
- if err != nil {
269
- return nil , failJob (fmt .Sprintf ("convert parameter: %s" , err ))
270
- }
271
- protoParameters = append (protoParameters , converted )
272
- }
273
-
274
239
protoJob .Type = & proto.AcquiredJob_ProjectImport_ {
275
- ProjectImport : & proto.AcquiredJob_ProjectImport {
276
- ParameterValues : protoParameters ,
277
- },
240
+ ProjectImport : & proto.AcquiredJob_ProjectImport {},
278
241
}
279
242
}
280
243
switch job .StorageMethod {
@@ -291,119 +254,137 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
291
254
return protoJob , err
292
255
}
293
256
294
- func (server * provisionerdServer ) UpdateJob (stream proto.DRPCProvisionerDaemon_UpdateJobStream ) error {
295
- for {
296
- update , err := stream .Recv ()
297
- if err != nil {
298
- return err
257
+ func (server * provisionerdServer ) UpdateJob (ctx context.Context , request * proto.UpdateJobRequest ) (* proto.UpdateJobResponse , error ) {
258
+ parsedID , err := uuid .Parse (request .JobId )
259
+ if err != nil {
260
+ return nil , xerrors .Errorf ("parse job id: %w" , err )
261
+ }
262
+ job , err := server .Database .GetProvisionerJobByID (ctx , parsedID )
263
+ if err != nil {
264
+ return nil , xerrors .Errorf ("get job: %w" , err )
265
+ }
266
+ if ! job .WorkerID .Valid {
267
+ return nil , xerrors .New ("job isn't running yet" )
268
+ }
269
+ if job .WorkerID .UUID .String () != server .ID .String () {
270
+ return nil , xerrors .New ("you don't own this job" )
271
+ }
272
+ err = server .Database .UpdateProvisionerJobByID (ctx , database.UpdateProvisionerJobByIDParams {
273
+ ID : parsedID ,
274
+ UpdatedAt : database .Now (),
275
+ })
276
+ if err != nil {
277
+ return nil , xerrors .Errorf ("update job: %w" , err )
278
+ }
279
+
280
+ if len (request .Logs ) > 0 {
281
+ insertParams := database.InsertProvisionerJobLogsParams {
282
+ JobID : parsedID ,
299
283
}
300
- parsedID , err := uuid .Parse (update .JobId )
301
- if err != nil {
302
- return xerrors .Errorf ("parse job id: %w" , err )
284
+ for _ , log := range request .Logs {
285
+ logLevel , err := convertLogLevel (log .Level )
286
+ if err != nil {
287
+ return nil , xerrors .Errorf ("convert log level: %w" , err )
288
+ }
289
+ logSource , err := convertLogSource (log .Source )
290
+ if err != nil {
291
+ return nil , xerrors .Errorf ("convert log source: %w" , err )
292
+ }
293
+ insertParams .ID = append (insertParams .ID , uuid .New ())
294
+ insertParams .CreatedAt = append (insertParams .CreatedAt , time .UnixMilli (log .CreatedAt ))
295
+ insertParams .Level = append (insertParams .Level , logLevel )
296
+ insertParams .Source = append (insertParams .Source , logSource )
297
+ insertParams .Output = append (insertParams .Output , log .Output )
303
298
}
304
- job , err := server .Database .GetProvisionerJobByID ( stream . Context (), parsedID )
299
+ logs , err := server .Database .InsertProvisionerJobLogs ( context . Background (), insertParams )
305
300
if err != nil {
306
- return xerrors .Errorf ("get job: %w" , err )
301
+ return nil , xerrors .Errorf ("insert job logs : %w" , err )
307
302
}
308
- if ! job .WorkerID .Valid {
309
- return xerrors .New ("job isn't running yet" )
310
- }
311
- if job .WorkerID .UUID .String () != server .ID .String () {
312
- return xerrors .New ("you don't own this job" )
303
+ data , err := json .Marshal (logs )
304
+ if err != nil {
305
+ return nil , xerrors .Errorf ("marshal job log: %w" , err )
313
306
}
314
-
315
- err = server .Database .UpdateProvisionerJobByID (stream .Context (), database.UpdateProvisionerJobByIDParams {
316
- ID : parsedID ,
317
- UpdatedAt : database .Now (),
318
- })
307
+ err = server .Pubsub .Publish (provisionerJobLogsChannel (parsedID ), data )
319
308
if err != nil {
320
- return xerrors .Errorf ("update job: %w" , err )
309
+ return nil , xerrors .Errorf ("publish job log : %w" , err )
321
310
}
322
- if len (update .Logs ) > 0 {
323
- insertParams := database.InsertProvisionerJobLogsParams {
324
- JobID : parsedID ,
325
- }
326
- for _ , log := range update .Logs {
327
- logLevel , err := convertLogLevel (log .Level )
328
- if err != nil {
329
- return xerrors .Errorf ("convert log level: %w" , err )
330
- }
331
- logSource , err := convertLogSource (log .Source )
332
- if err != nil {
333
- return xerrors .Errorf ("convert log source: %w" , err )
334
- }
335
- insertParams .ID = append (insertParams .ID , uuid .New ())
336
- insertParams .CreatedAt = append (insertParams .CreatedAt , time .UnixMilli (log .CreatedAt ))
337
- insertParams .Level = append (insertParams .Level , logLevel )
338
- insertParams .Source = append (insertParams .Source , logSource )
339
- insertParams .Output = append (insertParams .Output , log .Output )
340
- }
341
- logs , err := server .Database .InsertProvisionerJobLogs (context .Background (), insertParams )
342
- if err != nil {
343
- return xerrors .Errorf ("insert job logs: %w" , err )
344
- }
345
- data , err := json .Marshal (logs )
311
+ }
312
+
313
+ if len (request .ParameterSchemas ) > 0 {
314
+ for _ , protoParameter := range request .ParameterSchemas {
315
+ validationTypeSystem , err := convertValidationTypeSystem (protoParameter .ValidationTypeSystem )
346
316
if err != nil {
347
- return xerrors .Errorf ("marshal job log : %w" , err )
317
+ return nil , xerrors .Errorf ("convert validation type system for %q : %w" , protoParameter . Name , err )
348
318
}
349
- err = server .Pubsub .Publish (provisionerJobLogsChannel (parsedID ), data )
350
- if err != nil {
351
- return xerrors .Errorf ("publish job log: %w" , err )
319
+
320
+ parameterSchema := database.InsertParameterSchemaParams {
321
+ ID : uuid .New (),
322
+ CreatedAt : database .Now (),
323
+ JobID : job .ID ,
324
+ Name : protoParameter .Name ,
325
+ Description : protoParameter .Description ,
326
+ RedisplayValue : protoParameter .RedisplayValue ,
327
+ ValidationError : protoParameter .ValidationError ,
328
+ ValidationCondition : protoParameter .ValidationCondition ,
329
+ ValidationValueType : protoParameter .ValidationValueType ,
330
+ ValidationTypeSystem : validationTypeSystem ,
331
+
332
+ DefaultSourceScheme : database .ParameterSourceSchemeNone ,
333
+ DefaultDestinationScheme : database .ParameterDestinationSchemeNone ,
334
+
335
+ AllowOverrideDestination : protoParameter .AllowOverrideDestination ,
336
+ AllowOverrideSource : protoParameter .AllowOverrideSource ,
352
337
}
353
- }
354
338
355
- if update .GetProjectImport () != nil {
356
- // Validate that all parameters send from the provisioner daemon
357
- // follow the protocol.
358
- parameterSchemas := make ([]database.InsertParameterSchemaParams , 0 , len (update .GetProjectImport ().ParameterSchemas ))
359
- for _ , protoParameter := range update .GetProjectImport ().ParameterSchemas {
360
- validationTypeSystem , err := convertValidationTypeSystem (protoParameter .ValidationTypeSystem )
339
+ // It's possible a parameter doesn't define a default source!
340
+ if protoParameter .DefaultSource != nil {
341
+ parameterSourceScheme , err := convertParameterSourceScheme (protoParameter .DefaultSource .Scheme )
361
342
if err != nil {
362
- return xerrors .Errorf ("convert validation type system for %q: %w" , protoParameter .Name , err )
363
- }
364
-
365
- parameterSchema := database.InsertParameterSchemaParams {
366
- ID : uuid .New (),
367
- CreatedAt : database .Now (),
368
- JobID : job .ID ,
369
- Name : protoParameter .Name ,
370
- Description : protoParameter .Description ,
371
- RedisplayValue : protoParameter .RedisplayValue ,
372
- ValidationError : protoParameter .ValidationError ,
373
- ValidationCondition : protoParameter .ValidationCondition ,
374
- ValidationValueType : protoParameter .ValidationValueType ,
375
- ValidationTypeSystem : validationTypeSystem ,
376
-
377
- DefaultSourceScheme : database .ParameterSourceSchemeNone ,
378
- DefaultDestinationScheme : database .ParameterDestinationSchemeNone ,
379
-
380
- AllowOverrideDestination : protoParameter .AllowOverrideDestination ,
381
- AllowOverrideSource : protoParameter .AllowOverrideSource ,
343
+ return nil , xerrors .Errorf ("convert parameter source scheme: %w" , err )
382
344
}
345
+ parameterSchema .DefaultSourceScheme = parameterSourceScheme
346
+ parameterSchema .DefaultSourceValue = protoParameter .DefaultSource .Value
347
+ }
383
348
384
- // It's possible a parameter doesn't define a default source!
385
- if protoParameter .DefaultSource != nil {
386
- parameterSourceScheme , err := convertParameterSourceScheme (protoParameter .DefaultSource .Scheme )
387
- if err != nil {
388
- return xerrors .Errorf ("convert parameter source scheme: %w" , err )
389
- }
390
- parameterSchema .DefaultSourceScheme = parameterSourceScheme
391
- parameterSchema .DefaultSourceValue = protoParameter .DefaultSource .Value
349
+ // It's possible a parameter doesn't define a default destination!
350
+ if protoParameter .DefaultDestination != nil {
351
+ parameterDestinationScheme , err := convertParameterDestinationScheme (protoParameter .DefaultDestination .Scheme )
352
+ if err != nil {
353
+ return nil , xerrors .Errorf ("convert parameter destination scheme: %w" , err )
392
354
}
355
+ parameterSchema .DefaultDestinationScheme = parameterDestinationScheme
356
+ }
393
357
394
- // It's possible a parameter doesn't define a default destination!
395
- if protoParameter .DefaultDestination != nil {
396
- parameterDestinationScheme , err := convertParameterDestinationScheme (protoParameter .DefaultDestination .Scheme )
397
- if err != nil {
398
- return xerrors .Errorf ("convert parameter destination scheme: %w" , err )
399
- }
400
- parameterSchema .DefaultDestinationScheme = parameterDestinationScheme
401
- }
358
+ _ , err = server .Database .InsertParameterSchema (ctx , parameterSchema )
359
+ if err != nil {
360
+ return nil , xerrors .Errorf ("insert parameter schema: %w" , err )
361
+ }
362
+ }
402
363
403
- parameterSchemas = append (parameterSchemas , parameterSchema )
364
+ parameters , err := parameter .Compute (ctx , server .Database , parameter.ComputeScope {
365
+ ProjectImportJobID : job .ID ,
366
+ OrganizationID : job .OrganizationID ,
367
+ UserID : job .InitiatorID ,
368
+ }, nil )
369
+ if err != nil {
370
+ return nil , xerrors .Errorf ("compute parameters: %w" , err )
371
+ }
372
+ // Convert parameters to the protobuf type.
373
+ protoParameters := make ([]* sdkproto.ParameterValue , 0 , len (parameters ))
374
+ for _ , parameter := range parameters {
375
+ converted , err := convertComputedParameterValue (parameter )
376
+ if err != nil {
377
+ return nil , xerrors .Errorf ("convert parameter: %s" , err )
404
378
}
379
+ protoParameters = append (protoParameters , converted )
405
380
}
381
+
382
+ return & proto.UpdateJobResponse {
383
+ ParameterValues : protoParameters ,
384
+ }, nil
406
385
}
386
+
387
+ return & proto.UpdateJobResponse {}, nil
407
388
}
408
389
409
390
func (server * provisionerdServer ) CancelJob (ctx context.Context , cancelJob * proto.CancelledJob ) (* proto.Empty , error ) {
@@ -450,17 +431,12 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
450
431
if err != nil {
451
432
return nil , xerrors .Errorf ("get job by id: %w" , err )
452
433
}
453
- // TODO: Check if the worker ID matches!
454
- // If it doesn't, a provisioner daemon could be impersonating another job!
434
+ if job .WorkerID .UUID .String () != server .ID .String () {
435
+ return nil , xerrors .Errorf ("you don't have permission to update this job" )
436
+ }
455
437
456
438
switch jobType := completed .Type .(type ) {
457
439
case * proto.CompletedJob_ProjectImport_ :
458
- var input projectVersionImportJob
459
- err = json .Unmarshal (job .Input , & input )
460
- if err != nil {
461
- return nil , xerrors .Errorf ("unmarshal job data: %w" , err )
462
- }
463
-
464
440
err = server .Database .UpdateProvisionerJobWithCompleteByID (ctx , database.UpdateProvisionerJobWithCompleteByIDParams {
465
441
ID : jobID ,
466
442
UpdatedAt : database .Now (),
0 commit comments