Skip to content

Commit c1341cc

Browse files
authored
feat: use proto streams to increase maximum module files payload (#18268)
This PR implements protobuf streaming to handle large module files by: 1. **Streaming large payloads**: When module files exceed the 4MB limit, they're streamed in chunks using a new UploadFile RPC method 2. **Database storage**: Streamed files are stored in the database and referenced by hash for deduplication 3. **Backward compatibility**: Small module files continue using the existing direct payload method
1 parent 8e29ee5 commit c1341cc

22 files changed

+1885
-493
lines changed

cli/testdata/coder_provisioner_list_--output_json.golden

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"last_seen_at": "====[timestamp]=====",
88
"name": "test-daemon",
99
"version": "v0.0.0-devel",
10-
"api_version": "1.6",
10+
"api_version": "1.7",
1111
"provisioners": [
1212
"echo"
1313
],

coderd/database/dbauthz/dbauthz.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ var (
171171
DisplayName: "Provisioner Daemon",
172172
Site: rbac.Permissions(map[string][]policy.Action{
173173
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
174-
rbac.ResourceFile.Type: {policy.ActionRead},
174+
rbac.ResourceFile.Type: {policy.ActionCreate, policy.ActionRead},
175175
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
176176
rbac.ResourceTemplate.Type: {policy.ActionRead, policy.ActionUpdate},
177177
// Unsure why provisionerd needs update and read personal

coderd/database/dbmem/dbmem.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8743,6 +8743,12 @@ func (q *FakeQuerier) InsertFile(_ context.Context, arg database.InsertFileParam
87438743
q.mutex.Lock()
87448744
defer q.mutex.Unlock()
87458745

8746+
if slices.ContainsFunc(q.files, func(file database.File) bool {
8747+
return file.CreatedBy == arg.CreatedBy && file.Hash == arg.Hash
8748+
}) {
8749+
return database.File{}, newUniqueConstraintError(database.UniqueFilesHashCreatedByKey)
8750+
}
8751+
87468752
//nolint:gosimple
87478753
file := database.File{
87488754
ID: arg.ID,

coderd/provisionerdserver/provisionerdserver.go

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ func (s *server) acquireProtoJob(ctx context.Context, job database.ProvisionerJo
773773
case database.ProvisionerStorageMethodFile:
774774
file, err := s.Database.GetFileByID(ctx, job.FileID)
775775
if err != nil {
776-
return nil, failJob(fmt.Sprintf("get file by hash: %s", err))
776+
return nil, failJob(fmt.Sprintf("get file by id: %s", err))
777777
}
778778
protoJob.TemplateSourceArchive = file.Data
779779
default:
@@ -1321,6 +1321,104 @@ func (s *server) prepareForNotifyWorkspaceManualBuildFailed(ctx context.Context,
13211321
return templateAdmins, template, templateVersion, workspaceOwner, nil
13221322
}
13231323

1324+
func (s *server) UploadFile(stream proto.DRPCProvisionerDaemon_UploadFileStream) error {
1325+
var file *sdkproto.DataBuilder
1326+
// Always terminate the stream with an empty response.
1327+
defer stream.SendAndClose(&proto.Empty{})
1328+
1329+
UploadFileStream:
1330+
for {
1331+
msg, err := stream.Recv()
1332+
if err != nil {
1333+
return xerrors.Errorf("receive complete job with files: %w", err)
1334+
}
1335+
1336+
switch typed := msg.Type.(type) {
1337+
case *proto.UploadFileRequest_DataUpload:
1338+
if file != nil {
1339+
return xerrors.New("unexpected file upload while waiting for file completion")
1340+
}
1341+
1342+
file, err = sdkproto.NewDataBuilder(&sdkproto.DataUpload{
1343+
UploadType: typed.DataUpload.UploadType,
1344+
DataHash: typed.DataUpload.DataHash,
1345+
FileSize: typed.DataUpload.FileSize,
1346+
Chunks: typed.DataUpload.Chunks,
1347+
})
1348+
if err != nil {
1349+
return xerrors.Errorf("unable to create file upload: %w", err)
1350+
}
1351+
1352+
if file.IsDone() {
1353+
// If a file is 0 bytes, we can consider it done immediately.
1354+
// This should never really happen in practice, but we handle it gracefully.
1355+
break UploadFileStream
1356+
}
1357+
case *proto.UploadFileRequest_ChunkPiece:
1358+
if file == nil {
1359+
return xerrors.New("unexpected chunk piece while waiting for file upload")
1360+
}
1361+
1362+
done, err := file.Add(&sdkproto.ChunkPiece{
1363+
Data: typed.ChunkPiece.Data,
1364+
FullDataHash: typed.ChunkPiece.FullDataHash,
1365+
PieceIndex: typed.ChunkPiece.PieceIndex,
1366+
})
1367+
if err != nil {
1368+
return xerrors.Errorf("unable to add chunk piece: %w", err)
1369+
}
1370+
1371+
if done {
1372+
break UploadFileStream
1373+
}
1374+
}
1375+
}
1376+
1377+
fileData, err := file.Complete()
1378+
if err != nil {
1379+
return xerrors.Errorf("complete file upload: %w", err)
1380+
}
1381+
1382+
// Just rehash the data to be sure it is correct.
1383+
hashBytes := sha256.Sum256(fileData)
1384+
hash := hex.EncodeToString(hashBytes[:])
1385+
1386+
var insert database.InsertFileParams
1387+
1388+
switch file.Type {
1389+
case sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES:
1390+
insert = database.InsertFileParams{
1391+
ID: uuid.New(),
1392+
Hash: hash,
1393+
CreatedAt: dbtime.Now(),
1394+
CreatedBy: uuid.Nil,
1395+
Mimetype: tarMimeType,
1396+
Data: fileData,
1397+
}
1398+
default:
1399+
return xerrors.Errorf("unsupported file upload type: %s", file.Type)
1400+
}
1401+
1402+
//nolint:gocritic // Provisionerd actor
1403+
_, err = s.Database.InsertFile(dbauthz.AsProvisionerd(s.lifecycleCtx), insert)
1404+
if err != nil {
1405+
// Duplicated files already exist in the database, so we can ignore this error.
1406+
if !database.IsUniqueViolation(err, database.UniqueFilesHashCreatedByKey) {
1407+
return xerrors.Errorf("insert file: %w", err)
1408+
}
1409+
}
1410+
1411+
s.Logger.Info(s.lifecycleCtx, "file uploaded to database",
1412+
slog.F("type", file.Type.String()),
1413+
slog.F("hash", hash),
1414+
slog.F("size", len(fileData)),
1415+
// new_insert indicates whether the file was newly inserted or already existed.
1416+
slog.F("new_insert", err == nil),
1417+
)
1418+
1419+
return nil
1420+
}
1421+
13241422
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.
13251423
func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
13261424
ctx, span := s.startTrace(ctx, tracing.FuncName())
@@ -1606,6 +1704,20 @@ func (s *server) completeTemplateImportJob(ctx context.Context, job database.Pro
16061704
}
16071705
}
16081706

1707+
if len(jobType.TemplateImport.ModuleFilesHash) > 0 {
1708+
hashString := hex.EncodeToString(jobType.TemplateImport.ModuleFilesHash)
1709+
//nolint:gocritic // Acting as provisioner
1710+
file, err := db.GetFileByHashAndCreator(dbauthz.AsProvisionerd(ctx), database.GetFileByHashAndCreatorParams{Hash: hashString, CreatedBy: uuid.Nil})
1711+
if err != nil {
1712+
return xerrors.Errorf("get file by hash, it should have been uploaded: %w", err)
1713+
}
1714+
1715+
fileID = uuid.NullUUID{
1716+
Valid: true,
1717+
UUID: file.ID,
1718+
}
1719+
}
1720+
16091721
err = db.InsertTemplateVersionTerraformValuesByJobID(ctx, database.InsertTemplateVersionTerraformValuesByJobIDParams{
16101722
JobID: jobID,
16111723
UpdatedAt: now,
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package provisionerdserver_test
2+
3+
import (
4+
"context"
5+
crand "crypto/rand"
6+
"fmt"
7+
"testing"
8+
9+
"github.com/google/uuid"
10+
"github.com/stretchr/testify/require"
11+
"golang.org/x/xerrors"
12+
"storj.io/drpc"
13+
14+
"github.com/coder/coder/v2/coderd/database"
15+
"github.com/coder/coder/v2/coderd/externalauth"
16+
"github.com/coder/coder/v2/codersdk/drpcsdk"
17+
proto "github.com/coder/coder/v2/provisionerd/proto"
18+
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
19+
"github.com/coder/coder/v2/testutil"
20+
)
21+
22+
// TestUploadFileLargeModuleFiles tests the UploadFile RPC with large module files
23+
func TestUploadFileLargeModuleFiles(t *testing.T) {
24+
t.Parallel()
25+
26+
ctx := testutil.Context(t, testutil.WaitMedium)
27+
28+
// Create server
29+
server, db, _, _ := setup(t, false, &overrides{
30+
externalAuthConfigs: []*externalauth.Config{{}},
31+
})
32+
33+
testSizes := []int{
34+
0, // Empty file
35+
512, // A small file
36+
drpcsdk.MaxMessageSize + 1024, // Just over the limit
37+
drpcsdk.MaxMessageSize * 2, // 2x the limit
38+
sdkproto.ChunkSize*3 + 512, // Multiple chunks with partial last
39+
}
40+
41+
for _, size := range testSizes {
42+
t.Run(fmt.Sprintf("size_%d_bytes", size), func(t *testing.T) {
43+
t.Parallel()
44+
45+
// Generate test module files data
46+
moduleData := make([]byte, size)
47+
_, err := crand.Read(moduleData)
48+
require.NoError(t, err)
49+
50+
// Convert to upload format
51+
upload, chunks := sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES, moduleData)
52+
53+
stream := newMockUploadStream(upload, chunks...)
54+
55+
// Execute upload
56+
err = server.UploadFile(stream)
57+
require.NoError(t, err)
58+
59+
// Upload should be done
60+
require.True(t, stream.isDone(), "stream should be done after upload")
61+
62+
// Verify file was stored in database
63+
hashString := fmt.Sprintf("%x", upload.DataHash)
64+
file, err := db.GetFileByHashAndCreator(ctx, database.GetFileByHashAndCreatorParams{
65+
Hash: hashString,
66+
CreatedBy: uuid.Nil, // Provisionerd creates with Nil UUID
67+
})
68+
require.NoError(t, err)
69+
require.Equal(t, hashString, file.Hash)
70+
require.Equal(t, moduleData, file.Data)
71+
require.Equal(t, "application/x-tar", file.Mimetype)
72+
73+
// Try to upload it again, and it should still be successful
74+
stream = newMockUploadStream(upload, chunks...)
75+
err = server.UploadFile(stream)
76+
require.NoError(t, err, "re-upload should succeed without error")
77+
require.True(t, stream.isDone(), "stream should be done after re-upload")
78+
})
79+
}
80+
}
81+
82+
// TestUploadFileErrorScenarios tests various error conditions in file upload
83+
func TestUploadFileErrorScenarios(t *testing.T) {
84+
t.Parallel()
85+
86+
//nolint:dogsled
87+
server, _, _, _ := setup(t, false, &overrides{
88+
externalAuthConfigs: []*externalauth.Config{{}},
89+
})
90+
91+
// Generate test data
92+
moduleData := make([]byte, sdkproto.ChunkSize*2)
93+
_, err := crand.Read(moduleData)
94+
require.NoError(t, err)
95+
96+
upload, chunks := sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES, moduleData)
97+
98+
t.Run("chunk_before_upload", func(t *testing.T) {
99+
t.Parallel()
100+
101+
stream := newMockUploadStream(nil, chunks[0])
102+
103+
err := server.UploadFile(stream)
104+
require.ErrorContains(t, err, "unexpected chunk piece while waiting for file upload")
105+
require.True(t, stream.isDone(), "stream should be done after error")
106+
})
107+
108+
t.Run("duplicate_upload", func(t *testing.T) {
109+
t.Parallel()
110+
111+
stream := &mockUploadStream{
112+
done: make(chan struct{}),
113+
messages: make(chan *proto.UploadFileRequest, 2),
114+
}
115+
116+
up := &proto.UploadFileRequest{Type: &proto.UploadFileRequest_DataUpload{DataUpload: upload}}
117+
118+
// Send it twice
119+
stream.messages <- up
120+
stream.messages <- up
121+
122+
err := server.UploadFile(stream)
123+
require.ErrorContains(t, err, "unexpected file upload while waiting for file completion")
124+
require.True(t, stream.isDone(), "stream should be done after error")
125+
})
126+
127+
t.Run("unsupported_upload_type", func(t *testing.T) {
128+
t.Parallel()
129+
130+
//nolint:govet // Ignore lock copy
131+
cpy := *upload
132+
cpy.UploadType = sdkproto.DataUploadType_UPLOAD_TYPE_UNKNOWN // Set to an unsupported type
133+
stream := newMockUploadStream(&cpy, chunks...)
134+
135+
err := server.UploadFile(stream)
136+
require.ErrorContains(t, err, "unsupported file upload type")
137+
require.True(t, stream.isDone(), "stream should be done after error")
138+
})
139+
}
140+
141+
type mockUploadStream struct {
142+
done chan struct{}
143+
messages chan *proto.UploadFileRequest
144+
}
145+
146+
func (m mockUploadStream) SendAndClose(empty *proto.Empty) error {
147+
close(m.done)
148+
return nil
149+
}
150+
151+
func (m mockUploadStream) Recv() (*proto.UploadFileRequest, error) {
152+
msg, ok := <-m.messages
153+
if !ok {
154+
return nil, xerrors.New("no more messages to receive")
155+
}
156+
return msg, nil
157+
}
158+
func (*mockUploadStream) Context() context.Context { panic(errUnimplemented) }
159+
func (*mockUploadStream) MsgSend(msg drpc.Message, enc drpc.Encoding) error {
160+
panic(errUnimplemented)
161+
}
162+
163+
func (*mockUploadStream) MsgRecv(msg drpc.Message, enc drpc.Encoding) error {
164+
panic(errUnimplemented)
165+
}
166+
func (*mockUploadStream) CloseSend() error { panic(errUnimplemented) }
167+
func (*mockUploadStream) Close() error { panic(errUnimplemented) }
168+
func (m *mockUploadStream) isDone() bool {
169+
select {
170+
case <-m.done:
171+
return true
172+
default:
173+
return false
174+
}
175+
}
176+
177+
func newMockUploadStream(up *sdkproto.DataUpload, chunks ...*sdkproto.ChunkPiece) *mockUploadStream {
178+
stream := &mockUploadStream{
179+
done: make(chan struct{}),
180+
messages: make(chan *proto.UploadFileRequest, 1+len(chunks)),
181+
}
182+
if up != nil {
183+
stream.messages <- &proto.UploadFileRequest{Type: &proto.UploadFileRequest_DataUpload{DataUpload: up}}
184+
}
185+
186+
for _, chunk := range chunks {
187+
stream.messages <- &proto.UploadFileRequest{Type: &proto.UploadFileRequest_ChunkPiece{ChunkPiece: chunk}}
188+
}
189+
close(stream.messages)
190+
return stream
191+
}

0 commit comments

Comments
 (0)