Skip to content

Commit f547103

Browse files
committed
change upload behavior
1 parent b61faaf commit f547103

File tree

3 files changed

+94
-5
lines changed

3 files changed

+94
-5
lines changed

coderd/provisionerdserver/provisionerdserver.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,39 @@ func (s *server) prepareForNotifyWorkspaceManualBuildFailed(ctx context.Context,
13211321
return templateAdmins, template, templateVersion, workspaceOwner, nil
13221322
}
13231323

1324+
func (s *server) CompleteJobWithFiles(stream proto.DRPCProvisionerDaemon_CompleteJobWithFilesStream) error {
1325+
var file *sdkproto.DataBuilder
1326+
1327+
// stream expects files first
1328+
for {
1329+
msg, err := stream.Recv()
1330+
if err != nil {
1331+
return xerrors.Errorf("receive complete job with files: %w", err)
1332+
}
1333+
1334+
switch typed := msg.Type.(type) {
1335+
case *proto.CompleteWithFilesRequest_Complete:
1336+
case *proto.CompleteWithFilesRequest_ChunkPiece:
1337+
1338+
case *proto.CompleteWithFilesRequest_DataUpload:
1339+
if file != nil {
1340+
return xerrors.New("unexpected file upload while waiting for file completion")
1341+
}
1342+
1343+
file, err = sdkproto.NewDataBuilder(&sdkproto.DataUpload{
1344+
UploadType: sdkproto.DataUploadType(typed.DataUpload.UploadType),
1345+
DataHash: typed.DataUpload.DataHash,
1346+
FileSize: typed.DataUpload.FileSize,
1347+
Chunks: typed.DataUpload.Chunks,
1348+
})
1349+
if err != nil {
1350+
return xerrors.Errorf("unable to create file upload: %w", err)
1351+
}
1352+
}
1353+
}
1354+
1355+
}
1356+
13241357
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.
13251358
func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
13261359
ctx, span := s.startTrace(ctx, tracing.FuncName())

provisionerd/proto/provisionerd.proto

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,10 @@ message CommitQuotaResponse {
181181

182182
message CancelAcquire {}
183183

184-
message CompleteWithFilesRequest {
184+
message UploadFileRequest {
185185
oneof type {
186-
CompletedJob complete = 1;
187-
DataUpload data_upload = 2;
188-
ChunkPiece chunk_piece = 3;
186+
DataUpload data_upload = 1;
187+
ChunkPiece chunk_piece = 2;
189188
}
190189
}
191190

@@ -216,5 +215,8 @@ service ProvisionerDaemon {
216215

217216
// CompleteJob indicates a job has been completed.
218217
rpc CompleteJob(CompletedJob) returns (Empty);
219-
rpc CompleteJobWithFiles(stream CompleteWithFilesRequest) returns (Empty);
218+
219+
// UploadFile streams files to be inserted into the database.
220+
// The file upload_type should be used to determine how to handle the file.
221+
rpc UploadFile(stream UploadFileRequest) returns (Empty);
220222
}

provisionerd/provisionerd.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ import (
1818
semconv "go.opentelemetry.io/otel/semconv/v1.14.0"
1919
"go.opentelemetry.io/otel/trace"
2020
"golang.org/x/xerrors"
21+
protobuf "google.golang.org/protobuf/proto"
2122

2223
"cdr.dev/slog"
24+
"github.com/coder/coder/v2/codersdk/drpcsdk"
2325
"github.com/coder/retry"
2426

2527
"github.com/coder/coder/v2/coderd/tracing"
@@ -515,7 +517,59 @@ func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
515517
return err
516518
}
517519

520+
func (p *Server) CompleteJobWithModuleFiles(ctx context.Context, in *proto.CompletedJob, moduleFiles []byte) error {
521+
return nil
522+
// Send the files separately if the message size is too large.
523+
//_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
524+
// stream, err := client.CompleteJobWithFiles(ctx)
525+
// if err != nil {
526+
// return nil, xerrors.Errorf("failed to start CompleteJobWithFiles stream: %w", err)
527+
// }
528+
//
529+
// dataUp, chunks := sdkproto.BytesToDataUpload(moduleFiles)
530+
//
531+
// err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_DataUpload{DataUpload: &proto.DataUpload{
532+
// UploadType: proto.DataUploadType(dataUp.UploadType),
533+
// DataHash: dataUp.DataHash,
534+
// FileSize: dataUp.FileSize,
535+
// Chunks: dataUp.Chunks,
536+
// }}})
537+
// if err != nil {
538+
// return nil, xerrors.Errorf("send data upload: %w", err)
539+
// }
540+
//
541+
// for i, chunk := range chunks {
542+
// err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_ChunkPiece{ChunkPiece: &proto.ChunkPiece{
543+
// Data: chunk.Data,
544+
// FullDataHash: chunk.FullDataHash,
545+
// PieceIndex: chunk.PieceIndex,
546+
// }}})
547+
// if err != nil {
548+
// return nil, xerrors.Errorf("send chunk piece %d: %w", i, err)
549+
// }
550+
// }
551+
//
552+
// err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_Complete{Complete: in}})
553+
// if err != nil {
554+
// return nil, xerrors.Errorf("send complete job: %w", err)
555+
// }
556+
//
557+
// return &proto.Empty{}, nil
558+
//})
559+
//return err
560+
}
561+
518562
func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error {
563+
if ti, ok := in.Type.(*proto.CompletedJob_TemplateImport_); ok {
564+
messageSize := protobuf.Size(in)
565+
if messageSize > drpcsdk.MaxMessageSize &&
566+
messageSize-len(ti.TemplateImport.ModuleFiles) < drpcsdk.MaxMessageSize {
567+
moduleFiles := ti.TemplateImport.ModuleFiles
568+
ti.TemplateImport.ModuleFiles = nil // Clear the files in the final message
569+
return p.CompleteJobWithModuleFiles(ctx, in, moduleFiles)
570+
}
571+
}
572+
519573
_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
520574
return client.CompleteJob(ctx, in)
521575
})

0 commit comments

Comments
 (0)