@@ -517,56 +517,66 @@ func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
517
517
return err
518
518
}
519
519
520
- func (p * Server ) CompleteJobWithModuleFiles (ctx context.Context , in * proto.CompletedJob , moduleFiles []byte ) error {
520
+ func (p * Server ) UploadModuleFiles (ctx context.Context , moduleFiles []byte ) error {
521
+ //Send the files separately if the message size is too large.
522
+ _ , err := clientDoWithRetries (ctx , p .client , func (ctx context.Context , client proto.DRPCProvisionerDaemonClient ) (* proto.Empty , error ) {
523
+ stream , err := client .UploadFile (ctx )
524
+ if err != nil {
525
+ return nil , xerrors .Errorf ("failed to start CompleteJobWithFiles stream: %w" , err )
526
+ }
527
+ defer stream .Close ()
528
+
529
+ dataUp , chunks := sdkproto .BytesToDataUpload (sdkproto .DataUploadType_UPLOAD_TYPE_MODULE_FILES , moduleFiles )
530
+
531
+ err = stream .Send (& proto.UploadFileRequest {Type : & proto.UploadFileRequest_DataUpload {DataUpload : & proto.DataUpload {
532
+ UploadType : proto .DataUploadType (dataUp .UploadType ),
533
+ DataHash : dataUp .DataHash ,
534
+ FileSize : dataUp .FileSize ,
535
+ Chunks : dataUp .Chunks ,
536
+ }}})
537
+ if err != nil {
538
+ if retryable (err ) { // Do not retry
539
+ return nil , xerrors .Errorf ("send data upload: %s" , err .Error ())
540
+ }
541
+ return nil , xerrors .Errorf ("send data upload: %w" , err )
542
+ }
543
+
544
+ for i , chunk := range chunks {
545
+ err = stream .Send (& proto.UploadFileRequest {Type : & proto.UploadFileRequest_ChunkPiece {ChunkPiece : & proto.ChunkPiece {
546
+ Data : chunk .Data ,
547
+ FullDataHash : chunk .FullDataHash ,
548
+ PieceIndex : chunk .PieceIndex ,
549
+ }}})
550
+ if err != nil {
551
+ if retryable (err ) { // Do not retry
552
+ return nil , xerrors .Errorf ("send chunk piece: %s" , err .Error ())
553
+ }
554
+ return nil , xerrors .Errorf ("send chunk piece %d: %w" , i , err )
555
+ }
556
+ }
557
+
558
+ return & proto.Empty {}, nil
559
+ })
560
+ if err != nil {
561
+ return xerrors .Errorf ("upload module files: %w" , err )
562
+ }
563
+
521
564
return nil
522
- // Send the files separately if the message size is too large.
523
- //_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
524
- // stream, err := client.CompleteJobWithFiles(ctx)
525
- // if err != nil {
526
- // return nil, xerrors.Errorf("failed to start CompleteJobWithFiles stream: %w", err)
527
- // }
528
- //
529
- // dataUp, chunks := sdkproto.BytesToDataUpload(moduleFiles)
530
- //
531
- // err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_DataUpload{DataUpload: &proto.DataUpload{
532
- // UploadType: proto.DataUploadType(dataUp.UploadType),
533
- // DataHash: dataUp.DataHash,
534
- // FileSize: dataUp.FileSize,
535
- // Chunks: dataUp.Chunks,
536
- // }}})
537
- // if err != nil {
538
- // return nil, xerrors.Errorf("send data upload: %w", err)
539
- // }
540
- //
541
- // for i, chunk := range chunks {
542
- // err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_ChunkPiece{ChunkPiece: &proto.ChunkPiece{
543
- // Data: chunk.Data,
544
- // FullDataHash: chunk.FullDataHash,
545
- // PieceIndex: chunk.PieceIndex,
546
- // }}})
547
- // if err != nil {
548
- // return nil, xerrors.Errorf("send chunk piece %d: %w", i, err)
549
- // }
550
- // }
551
- //
552
- // err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_Complete{Complete: in}})
553
- // if err != nil {
554
- // return nil, xerrors.Errorf("send complete job: %w", err)
555
- // }
556
- //
557
- // return &proto.Empty{}, nil
558
- //})
559
- //return err
560
565
}
561
566
562
567
func (p * Server ) CompleteJob (ctx context.Context , in * proto.CompletedJob ) error {
563
568
if ti , ok := in .Type .(* proto.CompletedJob_TemplateImport_ ); ok {
564
569
messageSize := protobuf .Size (in )
565
570
if messageSize > drpcsdk .MaxMessageSize &&
566
571
messageSize - len (ti .TemplateImport .ModuleFiles ) < drpcsdk .MaxMessageSize {
572
+
573
+ // Split the module files from the message if it exceeds the max size.
567
574
moduleFiles := ti .TemplateImport .ModuleFiles
568
575
ti .TemplateImport .ModuleFiles = nil // Clear the files in the final message
569
- return p .CompleteJobWithModuleFiles (ctx , in , moduleFiles )
576
+ err := p .UploadModuleFiles (ctx , moduleFiles )
577
+ if err != nil {
578
+ return err
579
+ }
570
580
}
571
581
}
572
582
0 commit comments