Skip to content

feat: use proto streams to increase maximum module files payload #18268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
e6b4635
feat: provisioners to stream over modules >4mb limit
Emyrk Jun 3, 2025
d115c5a
make gen
Emyrk Jun 3, 2025
dea4895
add chunk piece in response
Emyrk Jun 3, 2025
b5fceda
rename hash field
Emyrk Jun 3, 2025
99d4d54
remove upload type from the chunk
Emyrk Jun 3, 2025
e8d8b98
feat: handle uploading data files in runner
Emyrk Jun 3, 2025
fbfa08b
chore: implement first part of file streaming
Emyrk Jun 4, 2025
73151d1
test adding a stream on the server
Emyrk Jun 4, 2025
b61faaf
add completejob stream
Emyrk Jun 4, 2025
f547103
change upload behavior
Emyrk Jun 4, 2025
bcbf6ca
make gen
Emyrk Jun 5, 2025
1bd6b69
upload files independently
Emyrk Jun 5, 2025
931ac95
make gen
Emyrk Jun 5, 2025
fb8c284
select file by hash
Emyrk Jun 5, 2025
6b645d6
fix permissions
Emyrk Jun 6, 2025
9dc81db
add log for uploaded files
Emyrk Jun 6, 2025
124d366
remove dead file
Emyrk Jun 6, 2025
857e88c
ability to omit module file downloads
Emyrk Jun 6, 2025
5cf9beb
linting
Emyrk Jun 6, 2025
58b2371
linting
Emyrk Jun 6, 2025
e880894
make gen
Emyrk Jun 6, 2025
31d8e62
fixups
Emyrk Jun 6, 2025
bb2000a
linting
Emyrk Jun 9, 2025
6633548
linting
Emyrk Jun 9, 2025
b070e5a
linting
Emyrk Jun 9, 2025
1a70429
add unit tests and fuzz test
Emyrk Jun 9, 2025
0f056e8
unit testing
Emyrk Jun 9, 2025
1a8c658
comments
Emyrk Jun 9, 2025
4ec22d8
fixup test
Emyrk Jun 9, 2025
1c9df09
fixup dbmem to return unique files error
Emyrk Jun 9, 2025
c30607e
linting
Emyrk Jun 9, 2025
0484777
fmt
Emyrk Jun 9, 2025
a3cf121
linting
Emyrk Jun 9, 2025
3f7707a
linting
Emyrk Jun 9, 2025
0ec5d1b
Merge remote-tracking branch 'origin/main' into stevenmasley/4mb
Emyrk Jun 10, 2025
e50c8c3
update proto to reuse
Emyrk Jun 10, 2025
8cf7f5c
make gen
Emyrk Jun 10, 2025
28f83a2
fixup
Emyrk Jun 10, 2025
bb3d4ef
test fixup types
Emyrk Jun 10, 2025
74073cf
Merge remote-tracking branch 'origin/main' into stevenmasley/4mb
Emyrk Jun 11, 2025
c9575c5
add full_hash to complete job
Emyrk Jun 11, 2025
6b7e56e
Revert "add full_hash to complete job"
Emyrk Jun 11, 2025
24d6227
Reapply "add full_hash to complete job"
Emyrk Jun 11, 2025
4f343e6
chore: add hash check of uploaded file after planComplete
Emyrk Jun 11, 2025
539755b
bump provider version
Emyrk Jun 11, 2025
e4f49ab
linting
Emyrk Jun 11, 2025
363498a
nil bytes fail on the typescript
Emyrk Jun 11, 2025
02d8563
handle empty slices
Emyrk Jun 11, 2025
f1ce0ac
move module max to a constant
Emyrk Jun 12, 2025
f6bdf96
add 0 byte file test
Emyrk Jun 12, 2025
4f387ec
Merge branch 'main' into stevenmasley/4mb
Emyrk Jun 12, 2025
f3525c3
Merge branch 'main' into stevenmasley/4mb
Emyrk Jun 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/testdata/coder_provisioner_list_--output_json.golden
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"last_seen_at": "====[timestamp]=====",
"name": "test-daemon",
"version": "v0.0.0-devel",
"api_version": "1.6",
"api_version": "1.7",
"provisioners": [
"echo"
],
Expand Down
2 changes: 1 addition & 1 deletion coderd/database/dbauthz/dbauthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ var (
DisplayName: "Provisioner Daemon",
Site: rbac.Permissions(map[string][]policy.Action{
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
rbac.ResourceFile.Type: {policy.ActionRead},
rbac.ResourceFile.Type: {policy.ActionCreate, policy.ActionRead},
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
rbac.ResourceTemplate.Type: {policy.ActionRead, policy.ActionUpdate},
// Unsure why provisionerd needs update and read personal
Expand Down
6 changes: 6 additions & 0 deletions coderd/database/dbmem/dbmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -8743,6 +8743,12 @@ func (q *FakeQuerier) InsertFile(_ context.Context, arg database.InsertFileParam
q.mutex.Lock()
defer q.mutex.Unlock()

if slices.ContainsFunc(q.files, func(file database.File) bool {
return file.CreatedBy == arg.CreatedBy && file.Hash == arg.Hash
}) {
return database.File{}, newUniqueConstraintError(database.UniqueFilesHashCreatedByKey)
}

//nolint:gosimple
file := database.File{
ID: arg.ID,
Expand Down
114 changes: 113 additions & 1 deletion coderd/provisionerdserver/provisionerdserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (s *server) acquireProtoJob(ctx context.Context, job database.ProvisionerJo
case database.ProvisionerStorageMethodFile:
file, err := s.Database.GetFileByID(ctx, job.FileID)
if err != nil {
return nil, failJob(fmt.Sprintf("get file by hash: %s", err))
return nil, failJob(fmt.Sprintf("get file by id: %s", err))
}
protoJob.TemplateSourceArchive = file.Data
default:
Expand Down Expand Up @@ -1321,6 +1321,104 @@ func (s *server) prepareForNotifyWorkspaceManualBuildFailed(ctx context.Context,
return templateAdmins, template, templateVersion, workspaceOwner, nil
}

func (s *server) UploadFile(stream proto.DRPCProvisionerDaemon_UploadFileStream) error {
var file *sdkproto.DataBuilder
// Always terminate the stream with an empty response.
defer stream.SendAndClose(&proto.Empty{})

UploadFileStream:
for {
msg, err := stream.Recv()
if err != nil {
return xerrors.Errorf("receive complete job with files: %w", err)
}

switch typed := msg.Type.(type) {
case *proto.UploadFileRequest_DataUpload:
if file != nil {
return xerrors.New("unexpected file upload while waiting for file completion")
}

file, err = sdkproto.NewDataBuilder(&sdkproto.DataUpload{
UploadType: typed.DataUpload.UploadType,
DataHash: typed.DataUpload.DataHash,
FileSize: typed.DataUpload.FileSize,
Chunks: typed.DataUpload.Chunks,
})
if err != nil {
return xerrors.Errorf("unable to create file upload: %w", err)
}

if file.IsDone() {
// If a file is 0 bytes, we can consider it done immediately.
// This should never really happen in practice, but we handle it gracefully.
break UploadFileStream
}
case *proto.UploadFileRequest_ChunkPiece:
if file == nil {
return xerrors.New("unexpected chunk piece while waiting for file upload")
}

done, err := file.Add(&sdkproto.ChunkPiece{
Data: typed.ChunkPiece.Data,
FullDataHash: typed.ChunkPiece.FullDataHash,
PieceIndex: typed.ChunkPiece.PieceIndex,
})
if err != nil {
return xerrors.Errorf("unable to add chunk piece: %w", err)
}

if done {
break UploadFileStream
}
}
}

fileData, err := file.Complete()
if err != nil {
return xerrors.Errorf("complete file upload: %w", err)
}

// Just rehash the data to be sure it is correct.
hashBytes := sha256.Sum256(fileData)
hash := hex.EncodeToString(hashBytes[:])

var insert database.InsertFileParams

switch file.Type {
case sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES:
insert = database.InsertFileParams{
ID: uuid.New(),
Hash: hash,
CreatedAt: dbtime.Now(),
CreatedBy: uuid.Nil,
Mimetype: tarMimeType,
Data: fileData,
}
default:
return xerrors.Errorf("unsupported file upload type: %s", file.Type)
}

//nolint:gocritic // Provisionerd actor
_, err = s.Database.InsertFile(dbauthz.AsProvisionerd(s.lifecycleCtx), insert)
if err != nil {
// Duplicated files already exist in the database, so we can ignore this error.
if !database.IsUniqueViolation(err, database.UniqueFilesHashCreatedByKey) {
return xerrors.Errorf("insert file: %w", err)
}
}

s.Logger.Info(s.lifecycleCtx, "file uploaded to database",
slog.F("type", file.Type.String()),
slog.F("hash", hash),
slog.F("size", len(fileData)),
// new_insert indicates whether the file was newly inserted or already existed.
slog.F("new_insert", err == nil),
)

return nil
}

// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.
func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
ctx, span := s.startTrace(ctx, tracing.FuncName())
Expand Down Expand Up @@ -1606,6 +1704,20 @@ func (s *server) completeTemplateImportJob(ctx context.Context, job database.Pro
}
}

if len(jobType.TemplateImport.ModuleFilesHash) > 0 {
hashString := hex.EncodeToString(jobType.TemplateImport.ModuleFilesHash)
//nolint:gocritic // Acting as provisioner
file, err := db.GetFileByHashAndCreator(dbauthz.AsProvisionerd(ctx), database.GetFileByHashAndCreatorParams{Hash: hashString, CreatedBy: uuid.Nil})
if err != nil {
return xerrors.Errorf("get file by hash, it should have been uploaded: %w", err)
}

fileID = uuid.NullUUID{
Valid: true,
UUID: file.ID,
}
}

err = db.InsertTemplateVersionTerraformValuesByJobID(ctx, database.InsertTemplateVersionTerraformValuesByJobIDParams{
JobID: jobID,
UpdatedAt: now,
Expand Down
191 changes: 191 additions & 0 deletions coderd/provisionerdserver/upload_file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package provisionerdserver_test

import (
"context"
crand "crypto/rand"
"fmt"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"storj.io/drpc"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/codersdk/drpcsdk"
proto "github.com/coder/coder/v2/provisionerd/proto"
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/testutil"
)

// TestUploadFileLargeModuleFiles tests the UploadFile RPC with large module files
func TestUploadFileLargeModuleFiles(t *testing.T) {
t.Parallel()

ctx := testutil.Context(t, testutil.WaitMedium)

// Create server
server, db, _, _ := setup(t, false, &overrides{
externalAuthConfigs: []*externalauth.Config{{}},
})

testSizes := []int{
0, // Empty file
512, // A small file
drpcsdk.MaxMessageSize + 1024, // Just over the limit
drpcsdk.MaxMessageSize * 2, // 2x the limit
sdkproto.ChunkSize*3 + 512, // Multiple chunks with partial last
}

for _, size := range testSizes {
t.Run(fmt.Sprintf("size_%d_bytes", size), func(t *testing.T) {
t.Parallel()

// Generate test module files data
moduleData := make([]byte, size)
_, err := crand.Read(moduleData)
require.NoError(t, err)

// Convert to upload format
upload, chunks := sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES, moduleData)

stream := newMockUploadStream(upload, chunks...)

// Execute upload
err = server.UploadFile(stream)
require.NoError(t, err)

// Upload should be done
require.True(t, stream.isDone(), "stream should be done after upload")

// Verify file was stored in database
hashString := fmt.Sprintf("%x", upload.DataHash)
file, err := db.GetFileByHashAndCreator(ctx, database.GetFileByHashAndCreatorParams{
Hash: hashString,
CreatedBy: uuid.Nil, // Provisionerd creates with Nil UUID
})
require.NoError(t, err)
require.Equal(t, hashString, file.Hash)
require.Equal(t, moduleData, file.Data)
require.Equal(t, "application/x-tar", file.Mimetype)

// Try to upload it again, and it should still be successful
stream = newMockUploadStream(upload, chunks...)
err = server.UploadFile(stream)
require.NoError(t, err, "re-upload should succeed without error")
require.True(t, stream.isDone(), "stream should be done after re-upload")
})
}
}

// TestUploadFileErrorScenarios tests various error conditions in file upload
func TestUploadFileErrorScenarios(t *testing.T) {
t.Parallel()

//nolint:dogsled
server, _, _, _ := setup(t, false, &overrides{
externalAuthConfigs: []*externalauth.Config{{}},
})

// Generate test data
moduleData := make([]byte, sdkproto.ChunkSize*2)
_, err := crand.Read(moduleData)
require.NoError(t, err)

upload, chunks := sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES, moduleData)

t.Run("chunk_before_upload", func(t *testing.T) {
t.Parallel()

stream := newMockUploadStream(nil, chunks[0])

err := server.UploadFile(stream)
require.ErrorContains(t, err, "unexpected chunk piece while waiting for file upload")
require.True(t, stream.isDone(), "stream should be done after error")
})

t.Run("duplicate_upload", func(t *testing.T) {
t.Parallel()

stream := &mockUploadStream{
done: make(chan struct{}),
messages: make(chan *proto.UploadFileRequest, 2),
}

up := &proto.UploadFileRequest{Type: &proto.UploadFileRequest_DataUpload{DataUpload: upload}}

// Send it twice
stream.messages <- up
stream.messages <- up

err := server.UploadFile(stream)
require.ErrorContains(t, err, "unexpected file upload while waiting for file completion")
require.True(t, stream.isDone(), "stream should be done after error")
})

t.Run("unsupported_upload_type", func(t *testing.T) {
t.Parallel()

//nolint:govet // Ignore lock copy
cpy := *upload
cpy.UploadType = sdkproto.DataUploadType_UPLOAD_TYPE_UNKNOWN // Set to an unsupported type
stream := newMockUploadStream(&cpy, chunks...)

err := server.UploadFile(stream)
require.ErrorContains(t, err, "unsupported file upload type")
require.True(t, stream.isDone(), "stream should be done after error")
})
}

type mockUploadStream struct {
done chan struct{}
messages chan *proto.UploadFileRequest
}

func (m mockUploadStream) SendAndClose(empty *proto.Empty) error {
close(m.done)
return nil
}

func (m mockUploadStream) Recv() (*proto.UploadFileRequest, error) {
msg, ok := <-m.messages
if !ok {
return nil, xerrors.New("no more messages to receive")
}
return msg, nil
}
func (*mockUploadStream) Context() context.Context { panic(errUnimplemented) }
func (*mockUploadStream) MsgSend(msg drpc.Message, enc drpc.Encoding) error {
panic(errUnimplemented)
}

func (*mockUploadStream) MsgRecv(msg drpc.Message, enc drpc.Encoding) error {
panic(errUnimplemented)
}
func (*mockUploadStream) CloseSend() error { panic(errUnimplemented) }
func (*mockUploadStream) Close() error { panic(errUnimplemented) }
func (m *mockUploadStream) isDone() bool {
select {
case <-m.done:
return true
default:
return false
}
}

func newMockUploadStream(up *sdkproto.DataUpload, chunks ...*sdkproto.ChunkPiece) *mockUploadStream {
stream := &mockUploadStream{
done: make(chan struct{}),
messages: make(chan *proto.UploadFileRequest, 1+len(chunks)),
}
if up != nil {
stream.messages <- &proto.UploadFileRequest{Type: &proto.UploadFileRequest_DataUpload{DataUpload: up}}
}

for _, chunk := range chunks {
stream.messages <- &proto.UploadFileRequest{Type: &proto.UploadFileRequest_ChunkPiece{ChunkPiece: chunk}}
}
close(stream.messages)
return stream
}
Loading
Loading