Skip to content

Commit d878c13

Browse files
committed
feat: Add provisionerd service
Creates the provisionerd service that interfaces with coderd to process provision jobs!
1 parent 2bd0c42 commit d878c13

File tree

8 files changed

+724
-3
lines changed

8 files changed

+724
-3
lines changed

coderd/coderd.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ func New(options *Options) http.Handler {
2323
projects := &projects{
2424
Database: options.Database,
2525
}
26+
provisionerd := &provisionerd{
27+
Database: options.Database,
28+
}
2629
users := &users{
2730
Database: options.Database,
2831
}
@@ -39,6 +42,7 @@ func New(options *Options) http.Handler {
3942
})
4043
r.Post("/login", users.loginWithPassword)
4144
r.Post("/logout", users.logout)
45+
r.Get("/provisionerd", provisionerd.listen)
4246
// Used for setup.
4347
r.Post("/user", users.createInitialUser)
4448
r.Route("/users", func(r chi.Router) {

coderd/provisionerd.go

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package coderd
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"net/http"
10+
11+
"golang.org/x/xerrors"
12+
"storj.io/drpc/drpcmux"
13+
"storj.io/drpc/drpcserver"
14+
15+
"github.com/google/uuid"
16+
"github.com/hashicorp/yamux"
17+
"github.com/moby/moby/pkg/namesgenerator"
18+
19+
"github.com/coder/coder/coderd/projectparameter"
20+
"github.com/coder/coder/database"
21+
"github.com/coder/coder/httpapi"
22+
"github.com/coder/coder/provisionerd/proto"
23+
sdkproto "github.com/coder/coder/provisionersdk/proto"
24+
25+
"nhooyr.io/websocket"
26+
)
27+
28+
type provisionerd struct {
29+
Database database.Store
30+
}
31+
32+
func (p *provisionerd) listen(rw http.ResponseWriter, r *http.Request) {
33+
conn, err := websocket.Accept(rw, r, nil)
34+
if err != nil {
35+
httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{
36+
Message: fmt.Sprintf("accept websocket: %s", err),
37+
})
38+
return
39+
}
40+
41+
daemon, err := p.Database.InsertProvisionerDaemon(r.Context(), database.InsertProvisionerDaemonParams{
42+
ID: uuid.New(),
43+
CreatedAt: database.Now(),
44+
Name: namesgenerator.GetRandomName(1),
45+
Provisioners: []database.ProvisionerType{database.ProvisionerTypeCdrBasic, database.ProvisionerTypeTerraform},
46+
})
47+
if err != nil {
48+
_ = conn.Close(websocket.StatusInternalError, fmt.Sprintf("insert provisioner daemon:% s", err))
49+
return
50+
}
51+
52+
session, err := yamux.Server(websocket.NetConn(r.Context(), conn, websocket.MessageBinary), nil)
53+
if err != nil {
54+
_ = conn.Close(websocket.StatusInternalError, fmt.Sprintf("multiplex server: %s", err))
55+
return
56+
}
57+
mux := drpcmux.New()
58+
err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdServer{
59+
ID: daemon.ID,
60+
Database: p.Database,
61+
})
62+
if err != nil {
63+
_ = conn.Close(websocket.StatusInternalError, fmt.Sprintf("drpc register provisioner daemon: %s", err))
64+
return
65+
}
66+
server := drpcserver.New(mux)
67+
err = server.Serve(r.Context(), session)
68+
if err != nil {
69+
_ = conn.Close(websocket.StatusInternalError, fmt.Sprintf("serve: %s", err))
70+
}
71+
}
72+
73+
// The input for a "workspace_provision" job.
74+
type workspaceProvisionJob struct {
75+
WorkspaceHistoryID uuid.UUID `json:"workspace_id"`
76+
}
77+
78+
// The input for a "project_import" job.
79+
type projectImportJob struct {
80+
ProjectHistoryID uuid.UUID `json:"project_history_id"`
81+
}
82+
83+
// An implementation of the provisionerd protobuf server definition.
84+
type provisionerdServer struct {
85+
ID uuid.UUID
86+
Database database.Store
87+
}
88+
89+
func (s *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
90+
// This locks the job. No other provisioners can acquire this job.
91+
job, err := s.Database.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
92+
StartedAt: sql.NullTime{
93+
Time: database.Now(),
94+
Valid: true,
95+
},
96+
WorkerID: uuid.NullUUID{
97+
UUID: s.ID,
98+
Valid: true,
99+
},
100+
Types: []database.ProvisionerType{database.ProvisionerTypeTerraform},
101+
})
102+
if errors.Is(err, sql.ErrNoRows) {
103+
// If no jobs are available, an empty struct is sent back.
104+
return &proto.AcquiredJob{}, nil
105+
}
106+
if err != nil {
107+
return nil, xerrors.Errorf("acquire job: %w", err)
108+
}
109+
failJob := func(errorMessage string) error {
110+
err = s.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
111+
ID: job.ID,
112+
CompletedAt: sql.NullTime{
113+
Time: database.Now(),
114+
Valid: true,
115+
},
116+
Error: sql.NullString{
117+
String: errorMessage,
118+
Valid: true,
119+
},
120+
})
121+
if err != nil {
122+
return xerrors.Errorf("update provisioner job: %w", err)
123+
}
124+
return xerrors.Errorf("request job was invalidated: %s", errorMessage)
125+
}
126+
127+
project, err := s.Database.GetProjectByID(ctx, job.ProjectID)
128+
if err != nil {
129+
return nil, failJob(fmt.Sprintf("get project: %s", err))
130+
}
131+
132+
organization, err := s.Database.GetOrganizationByID(ctx, project.OrganizationID)
133+
if err != nil {
134+
return nil, failJob(fmt.Sprintf("get organization: %s", err))
135+
}
136+
137+
user, err := s.Database.GetUserByID(ctx, job.InitiatorID)
138+
if err != nil {
139+
return nil, failJob(fmt.Sprintf("get user: %s", err))
140+
}
141+
142+
acquiredJob := &proto.AcquiredJob{
143+
JobId: job.ID.String(),
144+
CreatedAt: job.CreatedAt.UnixMilli(),
145+
Provisioner: string(job.Provisioner),
146+
OrganizationName: organization.Name,
147+
ProjectName: project.Name,
148+
UserName: user.Username,
149+
}
150+
var projectHistory database.ProjectHistory
151+
switch job.Type {
152+
case database.ProvisionerJobTypeWorkspaceProvision:
153+
var input workspaceProvisionJob
154+
err = json.Unmarshal(job.Input, &input)
155+
if err != nil {
156+
return nil, failJob(fmt.Sprintf("unmarshal job input %q: %s", job.Input, err))
157+
}
158+
workspaceHistory, err := s.Database.GetWorkspaceHistoryByID(ctx, input.WorkspaceHistoryID)
159+
if err != nil {
160+
return nil, failJob(fmt.Sprintf("get workspace history: %s", err))
161+
}
162+
163+
workspace, err := s.Database.GetWorkspaceByID(ctx, workspaceHistory.WorkspaceID)
164+
if err != nil {
165+
return nil, failJob(fmt.Sprintf("get workspace: %s", err))
166+
}
167+
168+
projectHistory, err = s.Database.GetProjectHistoryByID(ctx, workspaceHistory.ProjectHistoryID)
169+
if err != nil {
170+
return nil, failJob(fmt.Sprintf("get project history: %s", err))
171+
}
172+
173+
parameters, err := projectparameter.Compute(ctx, s.Database, projectparameter.Scope{
174+
OrganizationID: organization.ID,
175+
ProjectID: project.ID,
176+
ProjectHistoryID: projectHistory.ID,
177+
UserID: user.ID,
178+
WorkspaceID: workspace.ID,
179+
WorkspaceHistoryID: workspaceHistory.ID,
180+
})
181+
if err != nil {
182+
return nil, failJob(fmt.Sprintf("compute parameters: %s", err))
183+
}
184+
protoParameters := make([]*sdkproto.ParameterValue, 0, len(parameters))
185+
for _, parameter := range parameters {
186+
protoParameters = append(protoParameters, parameter.Proto)
187+
}
188+
189+
provisionerState := []byte{}
190+
if workspaceHistory.BeforeID.Valid {
191+
beforeHistory, err := s.Database.GetWorkspaceHistoryByID(ctx, workspaceHistory.BeforeID.UUID)
192+
if err != nil {
193+
return nil, failJob(fmt.Sprintf("get workspace history: %s", err))
194+
}
195+
provisionerState = beforeHistory.ProvisionerState
196+
}
197+
198+
acquiredJob.Type = &proto.AcquiredJob_WorkspaceProvision_{
199+
WorkspaceProvision: &proto.AcquiredJob_WorkspaceProvision{
200+
WorkspaceHistoryId: workspaceHistory.ID.String(),
201+
WorkspaceName: workspace.Name,
202+
State: provisionerState,
203+
ParameterValues: protoParameters,
204+
},
205+
}
206+
case database.ProvisionerJobTypeProjectImport:
207+
var input projectImportJob
208+
err = json.Unmarshal(job.Input, &input)
209+
if err != nil {
210+
return nil, failJob(fmt.Sprintf("unmarshal job input %q: %s", job.Input, err))
211+
}
212+
projectHistory, err = s.Database.GetProjectHistoryByID(ctx, input.ProjectHistoryID)
213+
if err != nil {
214+
return nil, failJob(fmt.Sprintf("get project history: %s", err))
215+
}
216+
}
217+
switch projectHistory.StorageMethod {
218+
case database.ProjectStorageMethodInlineArchive:
219+
acquiredJob.ProjectSourceArchive = projectHistory.StorageSource
220+
default:
221+
return nil, failJob(fmt.Sprintf("unsupported storage source: %q", projectHistory.StorageMethod))
222+
}
223+
224+
return acquiredJob, err
225+
}
226+
227+
func (s *provisionerdServer) UpdateJob(stream proto.DRPCProvisionerDaemon_UpdateJobStream) error {
228+
for {
229+
update, err := stream.Recv()
230+
if err != nil {
231+
return err
232+
}
233+
parsedID, err := uuid.Parse(update.JobId)
234+
if err != nil {
235+
return xerrors.Errorf("parse job id: %w", err)
236+
}
237+
err = s.Database.UpdateProvisionerJobByID(context.Background(), database.UpdateProvisionerJobByIDParams{
238+
ID: parsedID,
239+
UpdatedAt: database.Now(),
240+
})
241+
if err != nil {
242+
return xerrors.Errorf("update job: %w", err)
243+
}
244+
}
245+
}
246+
247+
func (s *provisionerdServer) CancelJob(ctx context.Context, cancelJob *proto.CancelledJob) (*proto.Empty, error) {
248+
jobID, err := uuid.Parse(cancelJob.JobId)
249+
if err != nil {
250+
return nil, xerrors.Errorf("parse job id: %w", err)
251+
}
252+
err = s.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
253+
ID: jobID,
254+
CancelledAt: sql.NullTime{
255+
Time: database.Now(),
256+
Valid: true,
257+
},
258+
UpdatedAt: database.Now(),
259+
Error: sql.NullString{
260+
String: cancelJob.Error,
261+
Valid: cancelJob.Error != "",
262+
},
263+
})
264+
if err != nil {
265+
return nil, xerrors.Errorf("update provisioner job: %w", err)
266+
}
267+
return &proto.Empty{}, nil
268+
}
269+
270+
func (s *provisionerdServer) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
271+
return nil, nil
272+
}

coderd/workspaces.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package coderd
22

33
import (
4+
"context"
45
"database/sql"
6+
"encoding/json"
57
"errors"
68
"fmt"
79
"net/http"
@@ -270,6 +272,13 @@ func (w *workspaces) createWorkspaceHistory(rw http.ResponseWriter, r *http.Requ
270272
})
271273
return
272274
}
275+
project, err := w.Database.GetProjectByID(r.Context(), projectHistory.ProjectID)
276+
if err != nil {
277+
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
278+
Message: fmt.Sprintf("get project: %s", err),
279+
})
280+
return
281+
}
273282

274283
// Store prior history ID if it exists to update it after we create new!
275284
priorHistoryID := uuid.NullUUID{}
@@ -298,17 +307,39 @@ func (w *workspaces) createWorkspaceHistory(rw http.ResponseWriter, r *http.Requ
298307
// This must happen in a transaction to ensure history can be inserted, and
299308
// the prior history can update it's "after" column to point at the new.
300309
err = w.Database.InTx(func(db database.Store) error {
310+
// Generate the ID before-hand so the provisioner job is aware of it!
311+
workspaceHistoryID := uuid.New()
312+
input, err := json.Marshal(workspaceProvisionJob{
313+
WorkspaceHistoryID: workspaceHistoryID,
314+
})
315+
if err != nil {
316+
return xerrors.Errorf("marshal provision job: %w", err)
317+
}
318+
319+
provisionerJob, err := db.InsertProvisionerJob(context.Background(), database.InsertProvisionerJobParams{
320+
ID: uuid.New(),
321+
CreatedAt: database.Now(),
322+
UpdatedAt: database.Now(),
323+
InitiatorID: user.ID,
324+
Provisioner: project.Provisioner,
325+
Type: database.ProvisionerJobTypeWorkspaceProvision,
326+
ProjectID: project.ID,
327+
Input: input,
328+
})
329+
if err != nil {
330+
return xerrors.Errorf("insert provisioner job: %w", err)
331+
}
332+
301333
workspaceHistory, err = db.InsertWorkspaceHistory(r.Context(), database.InsertWorkspaceHistoryParams{
302-
ID: uuid.New(),
334+
ID: workspaceHistoryID,
303335
CreatedAt: database.Now(),
304336
UpdatedAt: database.Now(),
305337
WorkspaceID: workspace.ID,
306338
ProjectHistoryID: projectHistory.ID,
307339
BeforeID: priorHistoryID,
308340
Initiator: user.ID,
309341
Transition: createBuild.Transition,
310-
// This should create a provision job once that gets implemented!
311-
ProvisionJobID: uuid.New(),
342+
ProvisionJobID: provisionerJob.ID,
312343
})
313344
if err != nil {
314345
return xerrors.Errorf("insert workspace history: %w", err)

0 commit comments

Comments
 (0)