Skip to content

Commit 1e8c421

Browse files
committed
Improve provisioner testing
1 parent d878c13 commit 1e8c421

File tree

2 files changed

+118
-2
lines changed

2 files changed

+118
-2
lines changed

provisionerd/provisionerd.go

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
package provisionerd
22

33
import (
4+
"archive/tar"
5+
"bytes"
46
"context"
7+
"errors"
58
"fmt"
9+
"io"
10+
"os"
11+
"path/filepath"
12+
"reflect"
13+
"strings"
614
"sync"
715
"time"
816

@@ -23,6 +31,7 @@ type Provisioners map[string]provisionersdkproto.DRPCProvisionerClient
2331
type Options struct {
2432
AcquireInterval time.Duration
2533
Logger slog.Logger
34+
WorkDirectory string
2635
}
2736

2837
func New(apiDialer Dialer, provisioners Provisioners, opts *Options) *API {
@@ -152,6 +161,90 @@ func (a *API) acquireJob() {
152161
a.cancelActiveJob(fmt.Sprintf("provisioner %q not registered", a.activeJob.Provisioner))
153162
return
154163
}
164+
defer func() {
165+
// Cleanup the work directory after execution.
166+
err = os.RemoveAll(a.opts.WorkDirectory)
167+
if err != nil {
168+
a.cancelActiveJob(fmt.Sprintf("remove all from %q directory: %s", a.opts.WorkDirectory, err))
169+
return
170+
}
171+
}()
172+
173+
err = os.MkdirAll(a.opts.WorkDirectory, 0600)
174+
if err != nil {
175+
a.cancelActiveJob(fmt.Sprintf("create work directory %q: %s", a.opts.WorkDirectory, err))
176+
return
177+
}
178+
179+
a.opts.Logger.Debug(context.Background(), "unpacking project source archive", slog.F("size_bytes", len(a.activeJob.ProjectSourceArchive)))
180+
reader := tar.NewReader(bytes.NewBuffer(a.activeJob.ProjectSourceArchive))
181+
for {
182+
header, err := reader.Next()
183+
if errors.Is(err, io.EOF) {
184+
break
185+
}
186+
if err != nil {
187+
a.cancelActiveJob(fmt.Sprintf("read project source archive: %s", err))
188+
return
189+
}
190+
// #nosec
191+
path := filepath.Join(a.opts.WorkDirectory, header.Name)
192+
if !strings.HasPrefix(path, filepath.Clean(a.opts.WorkDirectory)) {
193+
a.cancelActiveJob("tar attempts to target relative upper directory")
194+
return
195+
}
196+
switch header.Typeflag {
197+
case tar.TypeDir:
198+
err = os.MkdirAll(path, header.FileInfo().Mode())
199+
if err != nil {
200+
a.cancelActiveJob(fmt.Sprintf("mkdir %q: %s", path, err))
201+
return
202+
}
203+
a.opts.Logger.Debug(context.Background(), "extracted directory", slog.F("path", path))
204+
case tar.TypeReg:
205+
file, err := os.Create(path)
206+
if err != nil {
207+
a.cancelActiveJob(fmt.Sprintf("create file %q: %s", path, err))
208+
return
209+
}
210+
// Max file size of 10MB.
211+
size, err := io.CopyN(file, reader, (1<<20)*10)
212+
if errors.Is(err, io.EOF) {
213+
err = nil
214+
}
215+
if err != nil {
216+
a.cancelActiveJob(fmt.Sprintf("copy file %q: %s", path, err))
217+
return
218+
}
219+
err = file.Close()
220+
if err != nil {
221+
a.cancelActiveJob(fmt.Sprintf("close file %q: %s", path, err))
222+
return
223+
}
224+
a.opts.Logger.Debug(context.Background(), "extracted file",
225+
slog.F("size_bytes", size),
226+
slog.F("path", path),
227+
)
228+
}
229+
}
230+
231+
switch jobType := a.activeJob.Type.(type) {
232+
case *proto.AcquiredJob_ProjectImport_:
233+
a.opts.Logger.Debug(context.Background(), "acquired job is project import",
234+
slog.F("project_history_name", jobType.ProjectImport.ProjectHistoryName),
235+
)
236+
case *proto.AcquiredJob_WorkspaceProvision_:
237+
a.opts.Logger.Debug(context.Background(), "acquired job is workspace provision",
238+
slog.F("workspace_name", jobType.WorkspaceProvision.WorkspaceName),
239+
slog.F("state_length", len(jobType.WorkspaceProvision.State)),
240+
slog.F("parameters", jobType.WorkspaceProvision.ParameterValues),
241+
)
242+
243+
default:
244+
a.cancelActiveJob(fmt.Sprintf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(a.activeJob.Type).String()))
245+
return
246+
}
247+
155248
fmt.Printf("Provisioner: %s\n", provisioner)
156249
// Work!
157250
}
@@ -204,7 +297,7 @@ func (a *API) closeWithError(err error) error {
204297
}
205298

206299
if a.activeJob != nil {
207-
errMsg := ""
300+
errMsg := "provisioner daemon was shutdown gracefully"
208301
if err != nil {
209302
errMsg = err.Error()
210303
}

provisionerd/provisionerd_test.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ import (
88
"time"
99

1010
"github.com/stretchr/testify/require"
11+
"storj.io/drpc/drpcconn"
1112

1213
"cdr.dev/slog"
1314
"cdr.dev/slog/sloggers/slogtest"
1415
"github.com/coder/coder/coderd"
1516
"github.com/coder/coder/coderd/coderdtest"
1617
"github.com/coder/coder/codersdk"
1718
"github.com/coder/coder/database"
19+
"github.com/coder/coder/provisioner/terraform"
1820
"github.com/coder/coder/provisionerd"
21+
"github.com/coder/coder/provisionersdk"
22+
"github.com/coder/coder/provisionersdk/proto"
1923
)
2024

2125
func TestProvisionerd(t *testing.T) {
@@ -74,9 +78,28 @@ func TestProvisionerd(t *testing.T) {
7478
})
7579
require.NoError(t, err)
7680

77-
api := provisionerd.New(server.Client.ProvisionerDaemonClient, provisionerd.Provisioners{}, &provisionerd.Options{
81+
clientPipe, serverPipe := provisionersdk.TransportPipe()
82+
ctx, cancelFunc := context.WithCancel(context.Background())
83+
t.Cleanup(func() {
84+
_ = clientPipe.Close()
85+
_ = serverPipe.Close()
86+
cancelFunc()
87+
})
88+
go func() {
89+
err := terraform.Serve(ctx, &terraform.ServeOptions{
90+
ServeOptions: &provisionersdk.ServeOptions{
91+
Transport: serverPipe,
92+
},
93+
})
94+
require.NoError(t, err)
95+
}()
96+
97+
api := provisionerd.New(server.Client.ProvisionerDaemonClient, provisionerd.Provisioners{
98+
string(database.ProvisionerTypeTerraform): proto.NewDRPCProvisionerClient(drpcconn.New(clientPipe)),
99+
}, &provisionerd.Options{
78100
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
79101
AcquireInterval: 50 * time.Millisecond,
102+
WorkDirectory: t.TempDir(),
80103
})
81104
defer api.Close()
82105
time.Sleep(time.Millisecond * 500)

0 commit comments

Comments
 (0)