Skip to content

Commit 1bd6b69

Browse files
committed
upload files independently
1 parent bcbf6ca commit 1bd6b69

File tree

5 files changed

+107
-48
lines changed

5 files changed

+107
-48
lines changed

coderd/provisionerdserver/provisionerdserver.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,21 +1321,36 @@ func (s *server) prepareForNotifyWorkspaceManualBuildFailed(ctx context.Context,
13211321
return templateAdmins, template, templateVersion, workspaceOwner, nil
13221322
}
13231323

1324-
func (s *server) CompleteJobWithFiles(stream proto.DRPCProvisionerDaemon_CompleteJobWithFilesStream) error {
1324+
func (s *server) UploadFile(stream proto.DRPCProvisionerDaemon_UploadFileStream) error {
13251325
var file *sdkproto.DataBuilder
1326+
defer stream.Close()
13261327

1327-
// stream expects files first
1328+
UploadFileStream:
13281329
for {
13291330
msg, err := stream.Recv()
13301331
if err != nil {
13311332
return xerrors.Errorf("receive complete job with files: %w", err)
13321333
}
13331334

13341335
switch typed := msg.Type.(type) {
1335-
case *proto.CompleteWithFilesRequest_Complete:
1336-
case *proto.CompleteWithFilesRequest_ChunkPiece:
1336+
case *proto.UploadFileRequest_ChunkPiece:
1337+
if file == nil {
1338+
return xerrors.New("unexpected chunk piece while waiting for file upload")
1339+
}
1340+
1341+
done, err := file.Add(&sdkproto.ChunkPiece{
1342+
Data: typed.ChunkPiece.Data,
1343+
FullDataHash: typed.ChunkPiece.FullDataHash,
1344+
PieceIndex: typed.ChunkPiece.PieceIndex,
1345+
})
1346+
if err != nil {
1347+
return xerrors.Errorf("unable to add chunk piece: %w", err)
1348+
}
13371349

1338-
case *proto.CompleteWithFilesRequest_DataUpload:
1350+
if done {
1351+
break UploadFileStream
1352+
}
1353+
case *proto.UploadFileRequest_DataUpload:
13391354
if file != nil {
13401355
return xerrors.New("unexpected file upload while waiting for file completion")
13411356
}
@@ -1352,6 +1367,39 @@ func (s *server) CompleteJobWithFiles(stream proto.DRPCProvisionerDaemon_Complet
13521367
}
13531368
}
13541369

1370+
fileData, err := file.Complete()
1371+
if err != nil {
1372+
return xerrors.Errorf("complete file upload: %w", err)
1373+
}
1374+
1375+
// Just rehash the data to be sure it is correct.
1376+
hashBytes := sha256.Sum256(fileData)
1377+
hash := hex.EncodeToString(hashBytes[:])
1378+
1379+
var insert database.InsertFileParams
1380+
1381+
switch file.Type {
1382+
case sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES:
1383+
insert = database.InsertFileParams{
1384+
ID: uuid.New(),
1385+
Hash: hash,
1386+
CreatedAt: dbtime.Now(),
1387+
CreatedBy: uuid.Nil,
1388+
Mimetype: tarMimeType,
1389+
Data: fileData,
1390+
}
1391+
default:
1392+
return xerrors.Errorf("unsupported file upload type: %s", file.Type)
1393+
}
1394+
1395+
_, err = s.Database.InsertFile(s.lifecycleCtx, insert)
1396+
if err != nil {
1397+
// Duplicated files already exist in the database, so we can ignore this error.
1398+
if !database.IsUniqueViolation(err, database.UniqueFilesHashCreatedByKey) {
1399+
return xerrors.Errorf("insert file: %w", err)
1400+
}
1401+
}
1402+
return nil
13551403
}
13561404

13571405
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.

provisionerd/proto/provisionerd.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ message CompletedJob {
9292
repeated provisioner.Preset presets = 8;
9393
bytes plan = 9;
9494
bytes module_files = 10;
95+
bytes module_files_hash = 11;
9596
}
9697
message TemplateDryRun {
9798
repeated provisioner.Resource resources = 1;

provisionerd/provisionerd.go

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -517,56 +517,66 @@ func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
517517
return err
518518
}
519519

520-
func (p *Server) CompleteJobWithModuleFiles(ctx context.Context, in *proto.CompletedJob, moduleFiles []byte) error {
520+
func (p *Server) UploadModuleFiles(ctx context.Context, moduleFiles []byte) error {
521+
//Send the files separately if the message size is too large.
522+
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
523+
stream, err := client.UploadFile(ctx)
524+
if err != nil {
525+
return nil, xerrors.Errorf("failed to start CompleteJobWithFiles stream: %w", err)
526+
}
527+
defer stream.Close()
528+
529+
dataUp, chunks := sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES, moduleFiles)
530+
531+
err = stream.Send(&proto.UploadFileRequest{Type: &proto.UploadFileRequest_DataUpload{DataUpload: &proto.DataUpload{
532+
UploadType: proto.DataUploadType(dataUp.UploadType),
533+
DataHash: dataUp.DataHash,
534+
FileSize: dataUp.FileSize,
535+
Chunks: dataUp.Chunks,
536+
}}})
537+
if err != nil {
538+
if retryable(err) { // Do not retry
539+
return nil, xerrors.Errorf("send data upload: %s", err.Error())
540+
}
541+
return nil, xerrors.Errorf("send data upload: %w", err)
542+
}
543+
544+
for i, chunk := range chunks {
545+
err = stream.Send(&proto.UploadFileRequest{Type: &proto.UploadFileRequest_ChunkPiece{ChunkPiece: &proto.ChunkPiece{
546+
Data: chunk.Data,
547+
FullDataHash: chunk.FullDataHash,
548+
PieceIndex: chunk.PieceIndex,
549+
}}})
550+
if err != nil {
551+
if retryable(err) { // Do not retry
552+
return nil, xerrors.Errorf("send chunk piece: %s", err.Error())
553+
}
554+
return nil, xerrors.Errorf("send chunk piece %d: %w", i, err)
555+
}
556+
}
557+
558+
return &proto.Empty{}, nil
559+
})
560+
if err != nil {
561+
return xerrors.Errorf("upload module files: %w", err)
562+
}
563+
521564
return nil
522-
// Send the files separately if the message size is too large.
523-
//_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
524-
// stream, err := client.CompleteJobWithFiles(ctx)
525-
// if err != nil {
526-
// return nil, xerrors.Errorf("failed to start CompleteJobWithFiles stream: %w", err)
527-
// }
528-
//
529-
// dataUp, chunks := sdkproto.BytesToDataUpload(moduleFiles)
530-
//
531-
// err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_DataUpload{DataUpload: &proto.DataUpload{
532-
// UploadType: proto.DataUploadType(dataUp.UploadType),
533-
// DataHash: dataUp.DataHash,
534-
// FileSize: dataUp.FileSize,
535-
// Chunks: dataUp.Chunks,
536-
// }}})
537-
// if err != nil {
538-
// return nil, xerrors.Errorf("send data upload: %w", err)
539-
// }
540-
//
541-
// for i, chunk := range chunks {
542-
// err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_ChunkPiece{ChunkPiece: &proto.ChunkPiece{
543-
// Data: chunk.Data,
544-
// FullDataHash: chunk.FullDataHash,
545-
// PieceIndex: chunk.PieceIndex,
546-
// }}})
547-
// if err != nil {
548-
// return nil, xerrors.Errorf("send chunk piece %d: %w", i, err)
549-
// }
550-
// }
551-
//
552-
// err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_Complete{Complete: in}})
553-
// if err != nil {
554-
// return nil, xerrors.Errorf("send complete job: %w", err)
555-
// }
556-
//
557-
// return &proto.Empty{}, nil
558-
//})
559-
//return err
560565
}
561566

562567
func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error {
563568
if ti, ok := in.Type.(*proto.CompletedJob_TemplateImport_); ok {
564569
messageSize := protobuf.Size(in)
565570
if messageSize > drpcsdk.MaxMessageSize &&
566571
messageSize-len(ti.TemplateImport.ModuleFiles) < drpcsdk.MaxMessageSize {
572+
573+
// Split the module files from the message if it exceeds the max size.
567574
moduleFiles := ti.TemplateImport.ModuleFiles
568575
ti.TemplateImport.ModuleFiles = nil // Clear the files in the final message
569-
return p.CompleteJobWithModuleFiles(ctx, in, moduleFiles)
576+
err := p.UploadModuleFiles(ctx, moduleFiles)
577+
if err != nil {
578+
return err
579+
}
570580
}
571581
}
572582

provisionersdk/proto/dataupload.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

provisionersdk/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (s *Session) handleRequests() error {
168168
if protobuf.Size(resp) > drpcsdk.MaxMessageSize {
169169
// Send the modules over as a stream
170170
s.Logger.Info(s.Context(), "plan response too large, sending modules as stream")
171-
dataUp, chunks := proto.BytesToDataUpload(complete.ModuleFiles)
171+
dataUp, chunks := proto.BytesToDataUpload(proto.DataUploadType_UPLOAD_TYPE_MODULE_FILES, complete.ModuleFiles)
172172

173173
complete.ModuleFiles = nil // sent over the stream
174174
resp.Type = &proto.Response_Plan{Plan: complete}

0 commit comments

Comments
 (0)