Skip to content

Commit e2830c2

Browse files
committed
Fix log bufferring
1 parent d7d4a33 commit e2830c2

23 files changed

+88
-147
lines changed

cli/cliui/provisionerjob.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ import (
1616
"github.com/coder/coder/codersdk"
1717
)
1818

19-
func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID, before time.Time) error {
19+
func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID) error {
2020
return ProvisionerJob(ctx, writer, ProvisionerJobOptions{
2121
Fetch: func() (codersdk.ProvisionerJob, error) {
2222
build, err := client.WorkspaceBuild(ctx, build)
2323
return build.Job, err
2424
},
2525
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
26-
return client.WorkspaceBuildLogsAfter(ctx, build, before)
26+
return client.WorkspaceBuildLogsAfter(ctx, build, 0)
2727
},
2828
})
2929
}

cli/create.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ func create() *cobra.Command {
139139
return err
140140
}
141141

142-
after := time.Now()
143142
workspace, err := client.CreateWorkspace(cmd.Context(), organization.ID, codersdk.Me, codersdk.CreateWorkspaceRequest{
144143
TemplateID: template.ID,
145144
Name: workspaceName,
@@ -151,7 +150,7 @@ func create() *cobra.Command {
151150
return err
152151
}
153152

154-
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID, after)
153+
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID)
155154
if err != nil {
156155
return err
157156
}
@@ -238,7 +237,6 @@ PromptParamLoop:
238237
_, _ = fmt.Fprintln(cmd.OutOrStdout())
239238

240239
// Run a dry-run with the given parameters to check correctness
241-
after := time.Now()
242240
dryRun, err := client.CreateTemplateVersionDryRun(cmd.Context(), templateVersion.ID, codersdk.CreateTemplateVersionDryRunRequest{
243241
WorkspaceName: args.NewWorkspaceName,
244242
ParameterValues: parameters,
@@ -255,7 +253,7 @@ PromptParamLoop:
255253
return client.CancelTemplateVersionDryRun(cmd.Context(), templateVersion.ID, dryRun.ID)
256254
},
257255
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
258-
return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, after)
256+
return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, 0)
259257
},
260258
// Don't show log output for the dry-run unless there's an error.
261259
Silent: true,

cli/delete.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ func deleteWorkspace() *cobra.Command {
4747
)
4848
}
4949

50-
before := time.Now()
5150
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
5251
Transition: codersdk.WorkspaceTransitionDelete,
5352
ProvisionerState: state,
@@ -57,7 +56,7 @@ func deleteWorkspace() *cobra.Command {
5756
return err
5857
}
5958

60-
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before)
59+
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID)
6160
if err != nil {
6261
return err
6362
}

cli/portforward.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func portForward() *cobra.Command {
7979
return xerrors.New("workspace must be in start transition to port-forward")
8080
}
8181
if workspace.LatestBuild.Job.CompletedAt == nil {
82-
err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt)
82+
err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID)
8383
if err != nil {
8484
return err
8585
}

cli/ssh.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func getWorkspaceAndAgent(ctx context.Context, cmd *cobra.Command, client *coder
250250
return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, xerrors.New("workspace must be in start transition to ssh")
251251
}
252252
if workspace.LatestBuild.Job.CompletedAt == nil {
253-
err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt)
253+
err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID)
254254
if err != nil {
255255
return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, err
256256
}

cli/start.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,14 @@ func start() *cobra.Command {
2525
if err != nil {
2626
return err
2727
}
28-
before := time.Now()
2928
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
3029
Transition: codersdk.WorkspaceTransitionStart,
3130
})
3231
if err != nil {
3332
return err
3433
}
3534

36-
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before)
35+
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID)
3736
if err != nil {
3837
return err
3938
}

cli/state.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"io"
66
"os"
77
"strconv"
8-
"time"
98

109
"github.com/spf13/cobra"
1110

@@ -100,7 +99,6 @@ func statePush() *cobra.Command {
10099
return err
101100
}
102101

103-
before := time.Now()
104102
build, err = client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
105103
TemplateVersionID: build.TemplateVersionID,
106104
Transition: build.Transition,
@@ -109,7 +107,7 @@ func statePush() *cobra.Command {
109107
if err != nil {
110108
return err
111109
}
112-
return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID, before)
110+
return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID)
113111
},
114112
}
115113
cmd.Flags().IntVarP(&buildNumber, "build", "b", 0, "Specify a workspace build to target by name.")

cli/stop.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,14 @@ func stop() *cobra.Command {
3333
if err != nil {
3434
return err
3535
}
36-
before := time.Now()
3736
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
3837
Transition: codersdk.WorkspaceTransitionStop,
3938
})
4039
if err != nil {
4140
return err
4241
}
4342

44-
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before)
43+
err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID)
4544
if err != nil {
4645
return err
4746
}

cli/templatecreate.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ type createValidTemplateVersionArgs struct {
160160
}
161161

162162
func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVersionArgs, parameters ...codersdk.CreateParameterRequest) (*codersdk.TemplateVersion, []codersdk.CreateParameterRequest, error) {
163-
before := time.Now()
164163
client := args.Client
165164

166165
req := codersdk.CreateTemplateVersionRequest{
@@ -187,7 +186,7 @@ func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVers
187186
return client.CancelTemplateVersion(cmd.Context(), version.ID)
188187
},
189188
Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) {
190-
return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, before)
189+
return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, 0)
191190
},
192191
})
193192
if err != nil {

cli/update.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cli
22

33
import (
44
"fmt"
5-
"time"
65

76
"github.com/spf13/cobra"
87

@@ -57,7 +56,6 @@ func update() *cobra.Command {
5756
return nil
5857
}
5958

60-
before := time.Now()
6159
build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{
6260
TemplateVersionID: template.ActiveVersionID,
6361
Transition: workspace.LatestBuild.Transition,
@@ -66,7 +64,7 @@ func update() *cobra.Command {
6664
if err != nil {
6765
return err
6866
}
69-
logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, before)
67+
logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, 0)
7068
if err != nil {
7169
return err
7270
}

coderd/database/databasefake/databasefake.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2052,17 +2052,14 @@ func (q *fakeQuerier) GetProvisionerLogsByIDBetween(_ context.Context, arg datab
20522052
if jobLog.JobID != arg.JobID {
20532053
continue
20542054
}
2055-
if !arg.CreatedBefore.IsZero() && jobLog.CreatedAt.After(arg.CreatedBefore) {
2055+
if arg.CreatedBefore != 0 && jobLog.ID > arg.CreatedBefore {
20562056
continue
20572057
}
2058-
if !arg.CreatedAfter.IsZero() && jobLog.CreatedAt.Before(arg.CreatedAfter) {
2058+
if arg.CreatedAfter != 0 && jobLog.ID < arg.CreatedAfter {
20592059
continue
20602060
}
20612061
logs = append(logs, jobLog)
20622062
}
2063-
if len(logs) == 0 {
2064-
return nil, sql.ErrNoRows
2065-
}
20662063
return logs, nil
20672064
}
20682065

coderd/database/queries.sql.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/provisionerjoblogs.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ FROM
66
WHERE
77
job_id = @job_id
88
AND (
9-
created_at >= @created_after
10-
OR created_at <= @created_before
9+
id >= @created_after
10+
OR id <= @created_before
1111
) ORDER BY id;
1212

1313
-- name: InsertProvisionerJobLogs :many

coderd/provisionerdaemons.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,13 +378,22 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.
378378
slog.F("stage", log.Stage),
379379
slog.F("output", log.Output))
380380
}
381-
_, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
381+
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
382382
if err != nil {
383383
server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err))
384384
return nil, xerrors.Errorf("insert job logs: %w", err)
385385
}
386+
// Publish by the lowest log ID inserted so the
387+
// log stream will fetch everything from that point.
388+
lowestID := logs[0].ID
386389
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
387-
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), []byte("{}"))
390+
data, err := json.Marshal(provisionerJobLogsMessage{
391+
CreatedAfter: lowestID,
392+
})
393+
if err != nil {
394+
return nil, xerrors.Errorf("marshal: %w", err)
395+
}
396+
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
388397
if err != nil {
389398
server.Logger.Error(ctx, "failed to publish job logs", slog.F("job_id", parsedID), slog.Error(err))
390399
return nil, xerrors.Errorf("publish job log: %w", err)

coderd/provisionerjobs.go

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
// Returns provisioner logs based on query parameters.
2525
// The intended usage for a client to stream all logs (with JS API):
2626
// const timestamp = new Date().getTime();
27-
// 1. GET /logs?before=<timestamp>
28-
// 2. GET /logs?after=<timestamp>&follow
27+
// 1. GET /logs?before=<id>
28+
// 2. GET /logs?after=<id>&follow
2929
// The combination of these responses should provide all current logs
3030
// to the consumer, and future logs are streamed in the follow request.
3131
func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
@@ -74,10 +74,11 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
7474
}
7575
}
7676

77-
var after time.Time
77+
var after int64
7878
// Only fetch logs created after the time provided.
7979
if afterRaw != "" {
80-
afterMS, err := strconv.ParseInt(afterRaw, 10, 64)
80+
var err error
81+
after, err = strconv.ParseInt(afterRaw, 10, 64)
8182
if err != nil {
8283
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
8384
Message: "Query param \"after\" must be an integer.",
@@ -87,16 +88,12 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
8788
})
8889
return
8990
}
90-
after = time.UnixMilli(afterMS)
91-
} else {
92-
if follow {
93-
after = database.Now()
94-
}
9591
}
96-
var before time.Time
92+
var before int64
9793
// Only fetch logs created before the time provided.
9894
if beforeRaw != "" {
99-
beforeMS, err := strconv.ParseInt(beforeRaw, 10, 64)
95+
var err error
96+
before, err = strconv.ParseInt(beforeRaw, 10, 64)
10097
if err != nil {
10198
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
10299
Message: "Query param \"before\" must be an integer.",
@@ -106,12 +103,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
106103
})
107104
return
108105
}
109-
before = time.UnixMilli(beforeMS)
110-
} else {
111-
// If we're following, we don't want logs before a timestamp!
112-
if !follow {
113-
before = database.Now()
114-
}
115106
}
116107

117108
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
@@ -370,7 +361,8 @@ func provisionerJobLogsChannel(jobID uuid.UUID) string {
370361

371362
// provisionerJobLogsMessage is the message type published on the provisionerJobLogsChannel() channel
372363
type provisionerJobLogsMessage struct {
373-
EndOfLogs bool `json:"end_of_logs,omitempty"`
364+
CreatedAfter int64 `json:"created_after"`
365+
EndOfLogs bool `json:"end_of_logs,omitempty"`
374366
}
375367

376368
func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) {
@@ -379,7 +371,6 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
379371
var (
380372
closed = make(chan struct{})
381373
bufferedLogs = make(chan database.ProvisionerJobLog, 128)
382-
since = database.Now()
383374
)
384375
closeSubscribe, err := api.Pubsub.Subscribe(
385376
provisionerJobLogsChannel(jobID),
@@ -395,25 +386,26 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
395386
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
396387
return
397388
}
398-
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
399-
JobID: jobID,
400-
CreatedAfter: since,
401-
})
402-
if err != nil {
403-
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
404-
return
405-
}
406-
since = database.Now()
389+
if jlMsg.CreatedAfter != 0 {
390+
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
391+
JobID: jobID,
392+
CreatedAfter: jlMsg.CreatedAfter,
393+
})
394+
if err != nil {
395+
logger.Warn(ctx, "get provisioner logs", slog.Error(err))
396+
return
397+
}
407398

408-
for _, log := range logs {
409-
select {
410-
case bufferedLogs <- log:
411-
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
412-
default:
413-
// If this overflows users could miss logs streaming. This can happen
414-
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
415-
// so just drop them.
416-
logger.Warn(ctx, "provisioner job log overflowing channel")
399+
for _, log := range logs {
400+
select {
401+
case bufferedLogs <- log:
402+
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
403+
default:
404+
// If this overflows users could miss logs streaming. This can happen
405+
// we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub,
406+
// so just drop them.
407+
logger.Warn(ctx, "provisioner job log overflowing channel")
408+
}
417409
}
418410
}
419411
if jlMsg.EndOfLogs {

0 commit comments

Comments
 (0)