Skip to content

Commit ed705f6

Browse files
authored
refactor: Generalize log ownership to allow for scratch jobs (#182)
* refactor: Generalize log ownership to allow for scratch jobs Importing may fail when creating a project. We don't want to lose this output, but we don't want to allow users to create a failing project. This generalizes logs to soon enable one-off situations where a user can upload their archive, create a project, and watch the output parse to completion. * Improve file table schema by using hash * Fix racey test by allowing logs before * Add debug logging for PostgreSQL insert
1 parent bde732f commit ed705f6

26 files changed

+892
-935
lines changed

coderd/coderd.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,16 +96,21 @@ func New(options *Options) http.Handler {
9696
r.Route("/{workspacehistory}", func(r chi.Router) {
9797
r.Use(httpmw.ExtractWorkspaceHistoryParam(options.Database))
9898
r.Get("/", api.workspaceHistoryByName)
99-
r.Get("/logs", api.workspaceHistoryLogsByName)
10099
})
101100
})
102101
})
103102
})
104103
})
105104

106-
r.Route("/provisioners/daemons", func(r chi.Router) {
107-
r.Get("/", api.provisionerDaemons)
108-
r.Get("/serve", api.provisionerDaemonsServe)
105+
r.Route("/provisioners", func(r chi.Router) {
106+
r.Route("/daemons", func(r chi.Router) {
107+
r.Get("/", api.provisionerDaemons)
108+
r.Get("/serve", api.provisionerDaemonsServe)
109+
})
110+
r.Route("/jobs/{provisionerjob}", func(r chi.Router) {
111+
r.Use(httpmw.ExtractProvisionerJobParam(options.Database))
112+
r.Get("/logs", api.provisionerJobLogsByID)
113+
})
109114
})
110115
})
111116
r.NotFound(site.Handler().ServeHTTP)

coderd/projectversion.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ func (api *api) postProjectVersionByOrganization(rw http.ResponseWriter, r *http
153153
InitiatorID: apiKey.UserID,
154154
Provisioner: project.Provisioner,
155155
Type: database.ProvisionerJobTypeProjectImport,
156-
ProjectID: project.ID,
157156
Input: input,
158157
})
159158
if err != nil {
@@ -249,7 +248,3 @@ func convertProjectParameter(parameter database.ProjectParameter) ProjectParamet
249248
ValidationValueType: parameter.ValidationValueType,
250249
}
251250
}
252-
253-
func projectVersionLogsChannel(projectVersionID uuid.UUID) string {
254-
return fmt.Sprintf("project-version-logs:%s", projectVersionID)
255-
}

coderd/provisionerdaemons.go

Lines changed: 40 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -165,26 +165,16 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
165165
return xerrors.Errorf("request job was invalidated: %s", errorMessage)
166166
}
167167

168-
project, err := server.Database.GetProjectByID(ctx, job.ProjectID)
169-
if err != nil {
170-
return nil, failJob(fmt.Sprintf("get project: %s", err))
171-
}
172-
organization, err := server.Database.GetOrganizationByID(ctx, project.OrganizationID)
173-
if err != nil {
174-
return nil, failJob(fmt.Sprintf("get organization: %s", err))
175-
}
176168
user, err := server.Database.GetUserByID(ctx, job.InitiatorID)
177169
if err != nil {
178170
return nil, failJob(fmt.Sprintf("get user: %s", err))
179171
}
180172

181173
protoJob := &proto.AcquiredJob{
182-
JobId: job.ID.String(),
183-
CreatedAt: job.CreatedAt.UnixMilli(),
184-
Provisioner: string(job.Provisioner),
185-
OrganizationName: organization.Name,
186-
ProjectName: project.Name,
187-
UserName: user.Username,
174+
JobId: job.ID.String(),
175+
CreatedAt: job.CreatedAt.UnixMilli(),
176+
Provisioner: string(job.Provisioner),
177+
UserName: user.Username,
188178
}
189179
var projectVersion database.ProjectVersion
190180
switch job.Type {
@@ -206,6 +196,14 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
206196
if err != nil {
207197
return nil, failJob(fmt.Sprintf("get project version: %s", err))
208198
}
199+
project, err := server.Database.GetProjectByID(ctx, projectVersion.ProjectID)
200+
if err != nil {
201+
return nil, failJob(fmt.Sprintf("get project: %s", err))
202+
}
203+
organization, err := server.Database.GetOrganizationByID(ctx, project.OrganizationID)
204+
if err != nil {
205+
return nil, failJob(fmt.Sprintf("get organization: %s", err))
206+
}
209207

210208
// Compute parameters for the workspace to consume.
211209
parameters, err := projectparameter.Compute(ctx, server.Database, projectparameter.Scope{
@@ -246,8 +244,8 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
246244

247245
protoJob.Type = &proto.AcquiredJob_ProjectImport_{
248246
ProjectImport: &proto.AcquiredJob_ProjectImport{
249-
ProjectVersionId: projectVersion.ID.String(),
250-
ProjectVersionName: projectVersion.Name,
247+
// This will be replaced once the project import has been refactored.
248+
ProjectName: "placeholder",
251249
},
252250
}
253251
}
@@ -289,85 +287,36 @@ func (server *provisionerdServer) UpdateJob(stream proto.DRPCProvisionerDaemon_U
289287
if err != nil {
290288
return xerrors.Errorf("update job: %w", err)
291289
}
292-
switch job.Type {
293-
case database.ProvisionerJobTypeProjectImport:
294-
if len(update.ProjectImportLogs) == 0 {
295-
continue
296-
}
297-
var input projectImportJob
298-
err = json.Unmarshal(job.Input, &input)
299-
if err != nil {
300-
return xerrors.Errorf("unmarshal job input %q: %s", job.Input, err)
301-
}
302-
insertParams := database.InsertProjectVersionLogsParams{
303-
ProjectVersionID: input.ProjectVersionID,
304-
}
305-
for _, log := range update.ProjectImportLogs {
306-
logLevel, err := convertLogLevel(log.Level)
307-
if err != nil {
308-
return xerrors.Errorf("convert log level: %w", err)
309-
}
310-
logSource, err := convertLogSource(log.Source)
311-
if err != nil {
312-
return xerrors.Errorf("convert log source: %w", err)
313-
}
314-
insertParams.ID = append(insertParams.ID, uuid.New())
315-
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
316-
insertParams.Level = append(insertParams.Level, logLevel)
317-
insertParams.Source = append(insertParams.Source, logSource)
318-
insertParams.Output = append(insertParams.Output, log.Output)
319-
}
320-
logs, err := server.Database.InsertProjectVersionLogs(stream.Context(), insertParams)
321-
if err != nil {
322-
return xerrors.Errorf("insert project logs: %w", err)
323-
}
324-
data, err := json.Marshal(logs)
325-
if err != nil {
326-
return xerrors.Errorf("marshal project log: %w", err)
327-
}
328-
err = server.Pubsub.Publish(projectVersionLogsChannel(input.ProjectVersionID), data)
329-
if err != nil {
330-
return xerrors.Errorf("publish history log: %w", err)
331-
}
332-
case database.ProvisionerJobTypeWorkspaceProvision:
333-
if len(update.WorkspaceProvisionLogs) == 0 {
334-
continue
335-
}
336-
var input workspaceProvisionJob
337-
err = json.Unmarshal(job.Input, &input)
338-
if err != nil {
339-
return xerrors.Errorf("unmarshal job input %q: %s", job.Input, err)
340-
}
341-
insertParams := database.InsertWorkspaceHistoryLogsParams{
342-
WorkspaceHistoryID: input.WorkspaceHistoryID,
343-
}
344-
for _, log := range update.WorkspaceProvisionLogs {
345-
logLevel, err := convertLogLevel(log.Level)
346-
if err != nil {
347-
return xerrors.Errorf("convert log level: %w", err)
348-
}
349-
logSource, err := convertLogSource(log.Source)
350-
if err != nil {
351-
return xerrors.Errorf("convert log source: %w", err)
352-
}
353-
insertParams.ID = append(insertParams.ID, uuid.New())
354-
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
355-
insertParams.Level = append(insertParams.Level, logLevel)
356-
insertParams.Source = append(insertParams.Source, logSource)
357-
insertParams.Output = append(insertParams.Output, log.Output)
358-
}
359-
logs, err := server.Database.InsertWorkspaceHistoryLogs(stream.Context(), insertParams)
360-
if err != nil {
361-
return xerrors.Errorf("insert workspace logs: %w", err)
362-
}
363-
data, err := json.Marshal(logs)
290+
insertParams := database.InsertProvisionerJobLogsParams{
291+
JobID: parsedID,
292+
}
293+
for _, log := range update.Logs {
294+
logLevel, err := convertLogLevel(log.Level)
364295
if err != nil {
365-
return xerrors.Errorf("marshal project log: %w", err)
296+
return xerrors.Errorf("convert log level: %w", err)
366297
}
367-
err = server.Pubsub.Publish(workspaceHistoryLogsChannel(input.WorkspaceHistoryID), data)
298+
logSource, err := convertLogSource(log.Source)
368299
if err != nil {
369-
return xerrors.Errorf("publish history log: %w", err)
300+
return xerrors.Errorf("convert log source: %w", err)
370301
}
302+
insertParams.ID = append(insertParams.ID, uuid.New())
303+
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
304+
insertParams.Level = append(insertParams.Level, logLevel)
305+
insertParams.Source = append(insertParams.Source, logSource)
306+
insertParams.Output = append(insertParams.Output, log.Output)
307+
}
308+
logs, err := server.Database.InsertProvisionerJobLogs(stream.Context(), insertParams)
309+
if err != nil {
310+
server.Logger.Error(stream.Context(), "insert provisioner job logs", slog.Error(err))
311+
return xerrors.Errorf("insert job logs: %w", err)
312+
}
313+
data, err := json.Marshal(logs)
314+
if err != nil {
315+
return xerrors.Errorf("marshal job log: %w", err)
316+
}
317+
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
318+
if err != nil {
319+
return xerrors.Errorf("publish job log: %w", err)
371320
}
372321
}
373322
}

coderd/workspacehistorylogs.go renamed to coderd/provisionerjoblogs.go

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,28 @@ import (
1414
"github.com/google/uuid"
1515

1616
"cdr.dev/slog"
17-
1817
"github.com/coder/coder/database"
1918
"github.com/coder/coder/httpapi"
2019
"github.com/coder/coder/httpmw"
2120
)
2221

23-
// WorkspaceHistoryLog represents a single log from workspace history.
24-
type WorkspaceHistoryLog struct {
22+
// ProvisionerJobLog represents a single log from a provisioner job.
23+
type ProvisionerJobLog struct {
2524
ID uuid.UUID
2625
CreatedAt time.Time `json:"created_at"`
2726
Source database.LogSource `json:"log_source"`
2827
Level database.LogLevel `json:"log_level"`
2928
Output string `json:"output"`
3029
}
3130

32-
// Returns workspace history logs based on query parameters.
31+
// Returns provisioner logs based on query parameters.
3332
// The intended usage for a client to stream all logs (with JS API):
3433
// const timestamp = new Date().getTime();
3534
// 1. GET /logs?before=<timestamp>
3635
// 2. GET /logs?after=<timestamp>&follow
3736
// The combination of these responses should provide all current logs
3837
// to the consumer, and future logs are streamed in the follow request.
39-
func (api *api) workspaceHistoryLogsByName(rw http.ResponseWriter, r *http.Request) {
38+
func (api *api) provisionerJobLogsByID(rw http.ResponseWriter, r *http.Request) {
4039
follow := r.URL.Query().Has("follow")
4140
afterRaw := r.URL.Query().Get("after")
4241
beforeRaw := r.URL.Query().Get("before")
@@ -78,36 +77,36 @@ func (api *api) workspaceHistoryLogsByName(rw http.ResponseWriter, r *http.Reque
7877
before = database.Now()
7978
}
8079

81-
history := httpmw.WorkspaceHistoryParam(r)
80+
job := httpmw.ProvisionerJobParam(r)
8281
if !follow {
83-
logs, err := api.Database.GetWorkspaceHistoryLogsByIDBetween(r.Context(), database.GetWorkspaceHistoryLogsByIDBetweenParams{
84-
WorkspaceHistoryID: history.ID,
85-
CreatedAfter: after,
86-
CreatedBefore: before,
82+
logs, err := api.Database.GetProvisionerLogsByIDBetween(r.Context(), database.GetProvisionerLogsByIDBetweenParams{
83+
JobID: job.ID,
84+
CreatedAfter: after,
85+
CreatedBefore: before,
8786
})
8887
if errors.Is(err, sql.ErrNoRows) {
8988
err = nil
9089
}
9190
if err != nil {
9291
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
93-
Message: fmt.Sprintf("get workspace history: %s", err),
92+
Message: fmt.Sprintf("get provisioner logs: %s", err),
9493
})
9594
return
9695
}
9796
if logs == nil {
98-
logs = []database.WorkspaceHistoryLog{}
97+
logs = []database.ProvisionerJobLog{}
9998
}
10099
render.Status(r, http.StatusOK)
101100
render.JSON(rw, r, logs)
102101
return
103102
}
104103

105-
bufferedLogs := make(chan database.WorkspaceHistoryLog, 128)
106-
closeSubscribe, err := api.Pubsub.Subscribe(workspaceHistoryLogsChannel(history.ID), func(ctx context.Context, message []byte) {
107-
var logs []database.WorkspaceHistoryLog
104+
bufferedLogs := make(chan database.ProvisionerJobLog, 128)
105+
closeSubscribe, err := api.Pubsub.Subscribe(provisionerJobLogsChannel(job.ID), func(ctx context.Context, message []byte) {
106+
var logs []database.ProvisionerJobLog
108107
err := json.Unmarshal(message, &logs)
109108
if err != nil {
110-
api.Logger.Warn(r.Context(), fmt.Sprintf("invalid workspace log on channel %q: %s", workspaceHistoryLogsChannel(history.ID), err.Error()))
109+
api.Logger.Warn(r.Context(), fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
111110
return
112111
}
113112

@@ -117,30 +116,30 @@ func (api *api) workspaceHistoryLogsByName(rw http.ResponseWriter, r *http.Reque
117116
default:
118117
// If this overflows users could miss logs streaming. This can happen
119118
// if a database request takes a long amount of time, and we get a lot of logs.
120-
api.Logger.Warn(r.Context(), "workspace history log overflowing channel")
119+
api.Logger.Warn(r.Context(), "provisioner job log overflowing channel")
121120
}
122121
}
123122
})
124123
if err != nil {
125124
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
126-
Message: fmt.Sprintf("subscribe to workspace history logs: %s", err),
125+
Message: fmt.Sprintf("subscribe to provisioner job logs: %s", err),
127126
})
128127
return
129128
}
130129
defer closeSubscribe()
131130

132-
workspaceHistoryLogs, err := api.Database.GetWorkspaceHistoryLogsByIDBetween(r.Context(), database.GetWorkspaceHistoryLogsByIDBetweenParams{
133-
WorkspaceHistoryID: history.ID,
134-
CreatedAfter: after,
135-
CreatedBefore: before,
131+
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(r.Context(), database.GetProvisionerLogsByIDBetweenParams{
132+
JobID: job.ID,
133+
CreatedAfter: after,
134+
CreatedBefore: before,
136135
})
137136
if errors.Is(err, sql.ErrNoRows) {
138137
err = nil
139-
workspaceHistoryLogs = []database.WorkspaceHistoryLog{}
138+
provisionerJobLogs = []database.ProvisionerJobLog{}
140139
}
141140
if err != nil {
142141
httpapi.Write(rw, http.StatusInternalServerError, httpapi.Response{
143-
Message: fmt.Sprint("get workspace history logs: %w", err),
142+
Message: fmt.Sprint("get provisioner job logs: %w", err),
144143
})
145144
return
146145
}
@@ -154,8 +153,8 @@ func (api *api) workspaceHistoryLogsByName(rw http.ResponseWriter, r *http.Reque
154153
// The Go stdlib JSON encoder appends a newline character after message write.
155154
encoder := json.NewEncoder(rw)
156155

157-
for _, workspaceHistoryLog := range workspaceHistoryLogs {
158-
err = encoder.Encode(convertWorkspaceHistoryLog(workspaceHistoryLog))
156+
for _, provisionerJobLog := range provisionerJobLogs {
157+
err = encoder.Encode(convertProvisionerJobLog(provisionerJobLog))
159158
if err != nil {
160159
return
161160
}
@@ -168,15 +167,15 @@ func (api *api) workspaceHistoryLogsByName(rw http.ResponseWriter, r *http.Reque
168167
case <-r.Context().Done():
169168
return
170169
case log := <-bufferedLogs:
171-
err = encoder.Encode(convertWorkspaceHistoryLog(log))
170+
err = encoder.Encode(convertProvisionerJobLog(log))
172171
if err != nil {
173172
return
174173
}
175174
rw.(http.Flusher).Flush()
176175
case <-ticker.C:
177-
job, err := api.Database.GetProvisionerJobByID(r.Context(), history.ProvisionJobID)
176+
job, err := api.Database.GetProvisionerJobByID(r.Context(), job.ID)
178177
if err != nil {
179-
api.Logger.Warn(r.Context(), "streaming workspace logs; checking if job completed", slog.Error(err), slog.F("job_id", history.ProvisionJobID))
178+
api.Logger.Warn(r.Context(), "streaming job logs; checking if completed", slog.Error(err), slog.F("job_id", job.ID.String()))
180179
continue
181180
}
182181
if convertProvisionerJob(job).Status.Completed() {
@@ -186,16 +185,12 @@ func (api *api) workspaceHistoryLogsByName(rw http.ResponseWriter, r *http.Reque
186185
}
187186
}
188187

189-
func convertWorkspaceHistoryLog(workspaceHistoryLog database.WorkspaceHistoryLog) WorkspaceHistoryLog {
190-
return WorkspaceHistoryLog{
191-
ID: workspaceHistoryLog.ID,
192-
CreatedAt: workspaceHistoryLog.CreatedAt,
193-
Source: workspaceHistoryLog.Source,
194-
Level: workspaceHistoryLog.Level,
195-
Output: workspaceHistoryLog.Output,
188+
func convertProvisionerJobLog(provisionerJobLog database.ProvisionerJobLog) ProvisionerJobLog {
189+
return ProvisionerJobLog{
190+
ID: provisionerJobLog.ID,
191+
CreatedAt: provisionerJobLog.CreatedAt,
192+
Source: provisionerJobLog.Source,
193+
Level: provisionerJobLog.Level,
194+
Output: provisionerJobLog.Output,
196195
}
197196
}
198-
199-
func workspaceHistoryLogsChannel(workspaceHistoryID uuid.UUID) string {
200-
return fmt.Sprintf("workspace-history-logs:%s", workspaceHistoryID)
201-
}

0 commit comments

Comments
 (0)