diff --git a/cli/cliui/provisionerjob.go b/cli/cliui/provisionerjob.go index a7c2dcc86bad2..36f7d9c78c470 100644 --- a/cli/cliui/provisionerjob.go +++ b/cli/cliui/provisionerjob.go @@ -16,14 +16,14 @@ import ( "github.com/coder/coder/codersdk" ) -func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID, before time.Time) error { +func WorkspaceBuild(ctx context.Context, writer io.Writer, client *codersdk.Client, build uuid.UUID) error { return ProvisionerJob(ctx, writer, ProvisionerJobOptions{ Fetch: func() (codersdk.ProvisionerJob, error) { build, err := client.WorkspaceBuild(ctx, build) return build.Job, err }, Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) { - return client.WorkspaceBuildLogsAfter(ctx, build, before) + return client.WorkspaceBuildLogsAfter(ctx, build, 0) }, }) } diff --git a/cli/create.go b/cli/create.go index 4b268f7161d8f..854ba78565c51 100644 --- a/cli/create.go +++ b/cli/create.go @@ -139,7 +139,6 @@ func create() *cobra.Command { return err } - after := time.Now() workspace, err := client.CreateWorkspace(cmd.Context(), organization.ID, codersdk.Me, codersdk.CreateWorkspaceRequest{ TemplateID: template.ID, Name: workspaceName, @@ -151,7 +150,7 @@ func create() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID, after) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, workspace.LatestBuild.ID) if err != nil { return err } @@ -238,7 +237,6 @@ PromptParamLoop: _, _ = fmt.Fprintln(cmd.OutOrStdout()) // Run a dry-run with the given parameters to check correctness - after := time.Now() dryRun, err := client.CreateTemplateVersionDryRun(cmd.Context(), templateVersion.ID, codersdk.CreateTemplateVersionDryRunRequest{ WorkspaceName: args.NewWorkspaceName, ParameterValues: parameters, @@ -255,7 +253,7 @@ PromptParamLoop: return client.CancelTemplateVersionDryRun(cmd.Context(), templateVersion.ID, dryRun.ID) }, Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) { - return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, after) + return client.TemplateVersionDryRunLogsAfter(cmd.Context(), templateVersion.ID, dryRun.ID, 0) }, // Don't show log output for the dry-run unless there's an error. Silent: true, diff --git a/cli/delete.go b/cli/delete.go index fd30fe95a1c1e..4c655339d8a8f 100644 --- a/cli/delete.go +++ b/cli/delete.go @@ -47,7 +47,6 @@ func deleteWorkspace() *cobra.Command { ) } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionDelete, ProvisionerState: state, @@ -57,7 +56,7 @@ func deleteWorkspace() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID) if err != nil { return err } diff --git a/cli/portforward.go b/cli/portforward.go index 911e8fb5208d7..ca7cb51f14719 100644 --- a/cli/portforward.go +++ b/cli/portforward.go @@ -79,7 +79,7 @@ func portForward() *cobra.Command { return xerrors.New("workspace must be in start transition to port-forward") } if workspace.LatestBuild.Job.CompletedAt == nil { - err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt) + err = cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID) if err != nil { return err } diff --git a/cli/ssh.go b/cli/ssh.go index b4d4f6420da78..7d5b25337b3fb 100644 --- a/cli/ssh.go +++ b/cli/ssh.go @@ -250,7 +250,7 @@ func getWorkspaceAndAgent(ctx context.Context, cmd *cobra.Command, client *coder return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, xerrors.New("workspace must be in start transition to ssh") } if workspace.LatestBuild.Job.CompletedAt == nil { - err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID, workspace.CreatedAt) + err := cliui.WorkspaceBuild(ctx, cmd.ErrOrStderr(), client, workspace.LatestBuild.ID) if err != nil { return codersdk.Workspace{}, codersdk.WorkspaceAgent{}, err } diff --git a/cli/start.go b/cli/start.go index 7abcd036a4456..7bf4782e14bad 100644 --- a/cli/start.go +++ b/cli/start.go @@ -25,7 +25,6 @@ func start() *cobra.Command { if err != nil { return err } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionStart, }) @@ -33,7 +32,7 @@ func start() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID) if err != nil { return err } diff --git a/cli/state.go b/cli/state.go index 6a5858db704d7..bd1c0ccad4a6f 100644 --- a/cli/state.go +++ b/cli/state.go @@ -5,7 +5,6 @@ import ( "io" "os" "strconv" - "time" "github.com/spf13/cobra" @@ -100,7 +99,6 @@ func statePush() *cobra.Command { return err } - before := time.Now() build, err = client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ TemplateVersionID: build.TemplateVersionID, Transition: build.Transition, @@ -109,7 +107,7 @@ func statePush() *cobra.Command { if err != nil { return err } - return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID, before) + return cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStderr(), client, build.ID) }, } cmd.Flags().IntVarP(&buildNumber, "build", "b", 0, "Specify a workspace build to target by name.") diff --git a/cli/stop.go b/cli/stop.go index 1b99c0251ccee..9bb355ef0bd5a 100644 --- a/cli/stop.go +++ b/cli/stop.go @@ -33,7 +33,6 @@ func stop() *cobra.Command { if err != nil { return err } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionStop, }) @@ -41,7 +40,7 @@ func stop() *cobra.Command { return err } - err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID, before) + err = cliui.WorkspaceBuild(cmd.Context(), cmd.OutOrStdout(), client, build.ID) if err != nil { return err } diff --git a/cli/templatecreate.go b/cli/templatecreate.go index 7c0d25f0f7e2f..458cb1c2015b0 100644 --- a/cli/templatecreate.go +++ b/cli/templatecreate.go @@ -160,7 +160,6 @@ type createValidTemplateVersionArgs struct { } func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVersionArgs, parameters ...codersdk.CreateParameterRequest) (*codersdk.TemplateVersion, []codersdk.CreateParameterRequest, error) { - before := time.Now() client := args.Client req := codersdk.CreateTemplateVersionRequest{ @@ -187,7 +186,7 @@ func createValidTemplateVersion(cmd *cobra.Command, args createValidTemplateVers return client.CancelTemplateVersion(cmd.Context(), version.ID) }, Logs: func() (<-chan codersdk.ProvisionerJobLog, io.Closer, error) { - return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, before) + return client.TemplateVersionLogsAfter(cmd.Context(), version.ID, 0) }, }) if err != nil { diff --git a/cli/update.go b/cli/update.go index 1ccfaad33a0b3..d419cac9389c4 100644 --- a/cli/update.go +++ b/cli/update.go @@ -2,7 +2,6 @@ package cli import ( "fmt" - "time" "github.com/spf13/cobra" @@ -57,7 +56,6 @@ func update() *cobra.Command { return nil } - before := time.Now() build, err := client.CreateWorkspaceBuild(cmd.Context(), workspace.ID, codersdk.CreateWorkspaceBuildRequest{ TemplateVersionID: template.ActiveVersionID, Transition: workspace.LatestBuild.Transition, @@ -66,7 +64,7 @@ func update() *cobra.Command { if err != nil { return err } - logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, before) + logs, closer, err := client.WorkspaceBuildLogsAfter(cmd.Context(), build.ID, 0) if err != nil { return err } diff --git a/coderd/database/databasefake/databasefake.go b/coderd/database/databasefake/databasefake.go index a9117f2b2efca..19cab8054db5d 100644 --- a/coderd/database/databasefake/databasefake.go +++ b/coderd/database/databasefake/databasefake.go @@ -2052,17 +2052,14 @@ func (q *fakeQuerier) GetProvisionerLogsByIDBetween(_ context.Context, arg datab if jobLog.JobID != arg.JobID { continue } - if !arg.CreatedBefore.IsZero() && jobLog.CreatedAt.After(arg.CreatedBefore) { + if arg.CreatedBefore != 0 && jobLog.ID > arg.CreatedBefore { continue } - if !arg.CreatedAfter.IsZero() && jobLog.CreatedAt.Before(arg.CreatedAfter) { + if arg.CreatedAfter != 0 && jobLog.ID < arg.CreatedAfter { continue } logs = append(logs, jobLog) } - if len(logs) == 0 { - return nil, sql.ErrNoRows - } return logs, nil } @@ -2212,10 +2209,15 @@ func (q *fakeQuerier) InsertProvisionerJobLogs(_ context.Context, arg database.I defer q.mutex.Unlock() logs := make([]database.ProvisionerJobLog, 0) + id := int64(1) + if len(q.provisionerJobLogs) > 0 { + id = q.provisionerJobLogs[len(q.provisionerJobLogs)-1].ID + } for index, output := range arg.Output { + id++ logs = append(logs, database.ProvisionerJobLog{ + ID: id, JobID: arg.JobID, - ID: arg.ID[index], CreatedAt: arg.CreatedAt[index], Source: arg.Source[index], Level: arg.Level[index], diff --git a/coderd/database/dump.sql b/coderd/database/dump.sql index 8eed05162e176..2267fdc108232 100644 --- a/coderd/database/dump.sql +++ b/coderd/database/dump.sql @@ -272,15 +272,24 @@ CREATE TABLE provisioner_daemons ( ); CREATE TABLE provisioner_job_logs ( - id uuid NOT NULL, job_id uuid NOT NULL, created_at timestamp with time zone NOT NULL, source log_source NOT NULL, level log_level NOT NULL, stage character varying(128) NOT NULL, - output character varying(1024) NOT NULL + output character varying(1024) NOT NULL, + id bigint NOT NULL ); +CREATE SEQUENCE provisioner_job_logs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE provisioner_job_logs_id_seq OWNED BY provisioner_job_logs.id; + CREATE TABLE provisioner_jobs ( id uuid NOT NULL, created_at timestamp with time zone NOT NULL, @@ -463,6 +472,8 @@ CREATE TABLE workspaces ( ALTER TABLE ONLY licenses ALTER COLUMN id SET DEFAULT nextval('licenses_id_seq'::regclass); +ALTER TABLE ONLY provisioner_job_logs ALTER COLUMN id SET DEFAULT nextval('provisioner_job_logs_id_seq'::regclass); + ALTER TABLE ONLY agent_stats ADD CONSTRAINT agent_stats_pkey PRIMARY KEY (id); diff --git a/coderd/database/migrations/000071_provisioner_log_lines.down.sql b/coderd/database/migrations/000071_provisioner_log_lines.down.sql new file mode 100644 index 0000000000000..d851ab01360da --- /dev/null +++ b/coderd/database/migrations/000071_provisioner_log_lines.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE provisioner_job_logs DROP COLUMN id; + +ALTER TABLE provisioner_job_logs ADD COLUMN id uuid NOT NULL DEFAULT gen_random_uuid(); diff --git a/coderd/database/migrations/000071_provisioner_log_lines.up.sql b/coderd/database/migrations/000071_provisioner_log_lines.up.sql new file mode 100644 index 0000000000000..cb6678890d7e9 --- /dev/null +++ b/coderd/database/migrations/000071_provisioner_log_lines.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE provisioner_job_logs DROP COLUMN id; + +ALTER TABLE provisioner_job_logs ADD COLUMN id BIGSERIAL PRIMARY KEY; diff --git a/coderd/database/models.go b/coderd/database/models.go index 4f637ce5b1cec..bdebbf1a849cb 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -545,13 +545,13 @@ type ProvisionerJob struct { } type ProvisionerJobLog struct { - ID uuid.UUID `db:"id" json:"id"` JobID uuid.UUID `db:"job_id" json:"job_id"` CreatedAt time.Time `db:"created_at" json:"created_at"` Source LogSource `db:"source" json:"source"` Level LogLevel `db:"level" json:"level"` Stage string `db:"stage" json:"stage"` Output string `db:"output" json:"output"` + ID int64 `db:"id" json:"id"` } type Replica struct { diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 016110e045b05..5034c4d89caf6 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -2345,23 +2345,21 @@ func (q *sqlQuerier) UpdateProvisionerDaemonByID(ctx context.Context, arg Update const getProvisionerLogsByIDBetween = `-- name: GetProvisionerLogsByIDBetween :many SELECT - id, job_id, created_at, source, level, stage, output + job_id, created_at, source, level, stage, output, id FROM provisioner_job_logs WHERE job_id = $1 AND ( - created_at >= $2 - OR created_at <= $3 - ) -ORDER BY - created_at DESC + id > $2 + OR id < $3 + ) ORDER BY id ` type GetProvisionerLogsByIDBetweenParams struct { JobID uuid.UUID `db:"job_id" json:"job_id"` - CreatedAfter time.Time `db:"created_after" json:"created_after"` - CreatedBefore time.Time `db:"created_before" json:"created_before"` + CreatedAfter int64 `db:"created_after" json:"created_after"` + CreatedBefore int64 `db:"created_before" json:"created_before"` } func (q *sqlQuerier) GetProvisionerLogsByIDBetween(ctx context.Context, arg GetProvisionerLogsByIDBetweenParams) ([]ProvisionerJobLog, error) { @@ -2374,13 +2372,13 @@ func (q *sqlQuerier) GetProvisionerLogsByIDBetween(ctx context.Context, arg GetP for rows.Next() { var i ProvisionerJobLog if err := rows.Scan( - &i.ID, &i.JobID, &i.CreatedAt, &i.Source, &i.Level, &i.Stage, &i.Output, + &i.ID, ); err != nil { return nil, err } @@ -2399,17 +2397,15 @@ const insertProvisionerJobLogs = `-- name: InsertProvisionerJobLogs :many INSERT INTO provisioner_job_logs SELECT - unnest($1 :: uuid [ ]) AS id, - $2 :: uuid AS job_id, - unnest($3 :: timestamptz [ ]) AS created_at, - unnest($4 :: log_source [ ]) AS source, - unnest($5 :: log_level [ ]) AS LEVEL, - unnest($6 :: VARCHAR(128) [ ]) AS stage, - unnest($7 :: VARCHAR(1024) [ ]) AS output RETURNING id, job_id, created_at, source, level, stage, output + $1 :: uuid AS job_id, + unnest($2 :: timestamptz [ ]) AS created_at, + unnest($3 :: log_source [ ]) AS source, + unnest($4 :: log_level [ ]) AS LEVEL, + unnest($5 :: VARCHAR(128) [ ]) AS stage, + unnest($6 :: VARCHAR(1024) [ ]) AS output RETURNING job_id, created_at, source, level, stage, output, id ` type InsertProvisionerJobLogsParams struct { - ID []uuid.UUID `db:"id" json:"id"` JobID uuid.UUID `db:"job_id" json:"job_id"` CreatedAt []time.Time `db:"created_at" json:"created_at"` Source []LogSource `db:"source" json:"source"` @@ -2420,7 +2416,6 @@ type InsertProvisionerJobLogsParams struct { func (q *sqlQuerier) InsertProvisionerJobLogs(ctx context.Context, arg InsertProvisionerJobLogsParams) ([]ProvisionerJobLog, error) { rows, err := q.db.QueryContext(ctx, insertProvisionerJobLogs, - pq.Array(arg.ID), arg.JobID, pq.Array(arg.CreatedAt), pq.Array(arg.Source), @@ -2436,13 +2431,13 @@ func (q *sqlQuerier) InsertProvisionerJobLogs(ctx context.Context, arg InsertPro for rows.Next() { var i ProvisionerJobLog if err := rows.Scan( - &i.ID, &i.JobID, &i.CreatedAt, &i.Source, &i.Level, &i.Stage, &i.Output, + &i.ID, ); err != nil { return nil, err } diff --git a/coderd/database/queries/provisionerjoblogs.sql b/coderd/database/queries/provisionerjoblogs.sql index d9fb35af477c8..0034b8652dc4a 100644 --- a/coderd/database/queries/provisionerjoblogs.sql +++ b/coderd/database/queries/provisionerjoblogs.sql @@ -6,17 +6,14 @@ FROM WHERE job_id = @job_id AND ( - created_at >= @created_after - OR created_at <= @created_before - ) -ORDER BY - created_at DESC; + id > @created_after + OR id < @created_before + ) ORDER BY id; -- name: InsertProvisionerJobLogs :many INSERT INTO provisioner_job_logs SELECT - unnest(@id :: uuid [ ]) AS id, @job_id :: uuid AS job_id, unnest(@created_at :: timestamptz [ ]) AS created_at, unnest(@source :: log_source [ ]) AS source, diff --git a/coderd/provisionerdaemons.go b/coderd/provisionerdaemons.go index 87a9b9a98320a..8fd27c9a00b38 100644 --- a/coderd/provisionerdaemons.go +++ b/coderd/provisionerdaemons.go @@ -368,7 +368,6 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto. if err != nil { return nil, xerrors.Errorf("convert log source: %w", err) } - insertParams.ID = append(insertParams.ID, uuid.New()) insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt)) insertParams.Level = append(insertParams.Level, logLevel) insertParams.Stage = append(insertParams.Stage, log.Stage) @@ -384,10 +383,15 @@ func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto. server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err)) return nil, xerrors.Errorf("insert job logs: %w", err) } + // Publish by the lowest log ID inserted so the + // log stream will fetch everything from that point. + lowestID := logs[0].ID server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID)) - data, err := json.Marshal(provisionerJobLogsMessage{Logs: logs}) + data, err := json.Marshal(provisionerJobLogsMessage{ + CreatedAfter: lowestID, + }) if err != nil { - return nil, xerrors.Errorf("marshal job log: %w", err) + return nil, xerrors.Errorf("marshal: %w", err) } err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data) if err != nil { diff --git a/coderd/provisionerjobs.go b/coderd/provisionerjobs.go index 77a6589c71ac4..dda3c2607fffa 100644 --- a/coderd/provisionerjobs.go +++ b/coderd/provisionerjobs.go @@ -24,8 +24,8 @@ import ( // Returns provisioner logs based on query parameters. // The intended usage for a client to stream all logs (with JS API): // const timestamp = new Date().getTime(); -// 1. GET /logs?before= -// 2. GET /logs?after=&follow +// 1. GET /logs?before= +// 2. GET /logs?after=&follow // The combination of these responses should provide all current logs // to the consumer, and future logs are streamed in the follow request. func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) { @@ -74,10 +74,11 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job } } - var after time.Time + var after int64 // Only fetch logs created after the time provided. if afterRaw != "" { - afterMS, err := strconv.ParseInt(afterRaw, 10, 64) + var err error + after, err = strconv.ParseInt(afterRaw, 10, 64) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Query param \"after\" must be an integer.", @@ -87,16 +88,12 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job }) return } - after = time.UnixMilli(afterMS) - } else { - if follow { - after = database.Now() - } } - var before time.Time + var before int64 // Only fetch logs created before the time provided. if beforeRaw != "" { - beforeMS, err := strconv.ParseInt(beforeRaw, 10, 64) + var err error + before, err = strconv.ParseInt(beforeRaw, 10, 64) if err != nil { httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ Message: "Query param \"before\" must be an integer.", @@ -106,12 +103,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job }) return } - before = time.UnixMilli(beforeMS) - } else { - // If we're following, we don't want logs before a timestamp! - if !follow { - before = database.Now() - } } logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{ @@ -156,7 +147,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText) defer wsNetConn.Close() // Also closes conn. - logIdsDone := make(map[uuid.UUID]bool) + logIdsDone := make(map[int64]bool) // The Go stdlib JSON encoder appends a newline character after message write. encoder := json.NewEncoder(wsNetConn) @@ -370,8 +361,8 @@ func provisionerJobLogsChannel(jobID uuid.UUID) string { // provisionerJobLogsMessage is the message type published on the provisionerJobLogsChannel() channel type provisionerJobLogsMessage struct { - EndOfLogs bool `json:"end_of_logs,omitempty"` - Logs []database.ProvisionerJobLog `json:"logs,omitempty"` + CreatedAfter int64 `json:"created_after"` + EndOfLogs bool `json:"end_of_logs,omitempty"` } func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, func(), error) { @@ -389,23 +380,32 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog, return default: } - jlMsg := provisionerJobLogsMessage{} err := json.Unmarshal(message, &jlMsg) if err != nil { logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err)) return } + if jlMsg.CreatedAfter != 0 { + logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{ + JobID: jobID, + CreatedAfter: jlMsg.CreatedAfter, + }) + if err != nil { + logger.Warn(ctx, "get provisioner logs", slog.Error(err)) + return + } - for _, log := range jlMsg.Logs { - select { - case bufferedLogs <- log: - logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage)) - default: - // If this overflows users could miss logs streaming. This can happen - // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub, - // so just drop them. - logger.Warn(ctx, "provisioner job log overflowing channel") + for _, log := range logs { + select { + case bufferedLogs <- log: + logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage)) + default: + // If this overflows users could miss logs streaming. This can happen + // we get a lot of logs and consumer isn't keeping up. We don't want to block the pubsub, + // so just drop them. + logger.Warn(ctx, "provisioner job log overflowing channel") + } } } if jlMsg.EndOfLogs { diff --git a/coderd/provisionerjobs_internal_test.go b/coderd/provisionerjobs_internal_test.go index 6ac466a57644a..54512e5da5cdc 100644 --- a/coderd/provisionerjobs_internal_test.go +++ b/coderd/provisionerjobs_internal_test.go @@ -1,160 +1,15 @@ package coderd import ( - "context" - "crypto/sha256" "database/sql" - "encoding/json" - "net/http/httptest" - "net/url" - "sync" "testing" - "time" - "github.com/google/uuid" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "cdr.dev/slog" - "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/coderd/database" - "github.com/coder/coder/coderd/database/databasefake" - "github.com/coder/coder/coderd/rbac" "github.com/coder/coder/codersdk" - "github.com/coder/coder/testutil" ) -func TestProvisionerJobLogs_Unit(t *testing.T) { - t.Parallel() - - t.Run("QueryPubSubDupes", func(t *testing.T) { - t.Parallel() - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - // mDB := mocks.NewStore(t) - fDB := databasefake.New() - fPubsub := &fakePubSub{t: t, cond: sync.NewCond(&sync.Mutex{})} - opts := Options{ - Logger: logger, - Database: fDB, - Pubsub: fPubsub, - } - api := New(&opts) - defer api.Close() - - server := httptest.NewServer(api.RootHandler) - defer server.Close() - userID := uuid.New() - keyID, keySecret, err := generateAPIKeyIDSecret() - require.NoError(t, err) - hashed := sha256.Sum256([]byte(keySecret)) - - u, err := url.Parse(server.URL) - require.NoError(t, err) - client := codersdk.Client{ - HTTPClient: server.Client(), - SessionToken: keyID + "-" + keySecret, - URL: u, - } - - buildID := uuid.New() - workspaceID := uuid.New() - jobID := uuid.New() - - expectedLogs := []database.ProvisionerJobLog{ - {ID: uuid.New(), JobID: jobID, Stage: "Stage0"}, - {ID: uuid.New(), JobID: jobID, Stage: "Stage1"}, - {ID: uuid.New(), JobID: jobID, Stage: "Stage2"}, - {ID: uuid.New(), JobID: jobID, Stage: "Stage3"}, - } - - ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) - defer cancel() - - // wow there are a lot of DB rows we touch... - _, err = fDB.InsertAPIKey(ctx, database.InsertAPIKeyParams{ - ID: keyID, - HashedSecret: hashed[:], - UserID: userID, - ExpiresAt: time.Now().Add(5 * time.Hour), - LoginType: database.LoginTypePassword, - Scope: database.APIKeyScopeAll, - }) - require.NoError(t, err) - _, err = fDB.InsertUser(ctx, database.InsertUserParams{ - ID: userID, - RBACRoles: []string{rbac.RoleOwner()}, - }) - require.NoError(t, err) - _, err = fDB.InsertWorkspaceBuild(ctx, database.InsertWorkspaceBuildParams{ - ID: buildID, - WorkspaceID: workspaceID, - JobID: jobID, - }) - require.NoError(t, err) - _, err = fDB.InsertWorkspace(ctx, database.InsertWorkspaceParams{ - ID: workspaceID, - }) - require.NoError(t, err) - _, err = fDB.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{ - ID: jobID, - }) - require.NoError(t, err) - for _, l := range expectedLogs[:2] { - _, err := fDB.InsertProvisionerJobLogs(ctx, database.InsertProvisionerJobLogsParams{ - ID: []uuid.UUID{l.ID}, - JobID: jobID, - Stage: []string{l.Stage}, - }) - require.NoError(t, err) - } - - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, time.Now()) - require.NoError(t, err) - defer closer.Close() - - // when the endpoint calls subscribe, we get the listener here. - fPubsub.cond.L.Lock() - for fPubsub.listener == nil { - fPubsub.cond.Wait() - } - - // endpoint should now be listening - assert.False(t, fPubsub.canceled) - assert.False(t, fPubsub.closed) - - // send all the logs in two batches, duplicating what we already returned on the DB query. - msg := provisionerJobLogsMessage{} - msg.Logs = expectedLogs[:2] - data, err := json.Marshal(msg) - require.NoError(t, err) - fPubsub.listener(ctx, data) - msg.Logs = expectedLogs[2:] - data, err = json.Marshal(msg) - require.NoError(t, err) - fPubsub.listener(ctx, data) - - // send end of logs - msg.Logs = nil - msg.EndOfLogs = true - data, err = json.Marshal(msg) - require.NoError(t, err) - fPubsub.listener(ctx, data) - - var stages []string - for l := range logs { - logger.Info(ctx, "got log", - slog.F("id", l.ID), - slog.F("stage", l.Stage)) - stages = append(stages, l.Stage) - } - assert.Equal(t, []string{"Stage0", "Stage1", "Stage2", "Stage3"}, stages) - for !fPubsub.canceled { - fPubsub.cond.Wait() - } - assert.False(t, fPubsub.closed) - }) -} - func TestConvertProvisionerJob_Unit(t *testing.T) { t.Parallel() validNullTimeMock := sql.NullTime{ @@ -260,39 +115,3 @@ func TestConvertProvisionerJob_Unit(t *testing.T) { }) } } - -type fakePubSub struct { - t *testing.T - cond *sync.Cond - listener database.Listener - canceled bool - closed bool -} - -func (f *fakePubSub) Subscribe(_ string, listener database.Listener) (cancel func(), err error) { - f.cond.L.Lock() - defer f.cond.L.Unlock() - f.listener = listener - f.cond.Signal() - return f.cancel, nil -} - -func (f *fakePubSub) Publish(_ string, _ []byte) error { - f.t.Fail() - return nil -} - -func (f *fakePubSub) Close() error { - f.cond.L.Lock() - defer f.cond.L.Unlock() - f.closed = true - f.cond.Signal() - return nil -} - -func (f *fakePubSub) cancel() { - f.cond.L.Lock() - defer f.cond.L.Unlock() - f.canceled = true - f.cond.Signal() -} diff --git a/coderd/provisionerjobs_test.go b/coderd/provisionerjobs_test.go index f23c3672e3a13..d55374fef4c46 100644 --- a/coderd/provisionerjobs_test.go +++ b/coderd/provisionerjobs_test.go @@ -3,12 +3,10 @@ package coderd_test import ( "context" "testing" - "time" "github.com/stretchr/testify/require" "github.com/coder/coder/coderd/coderdtest" - "github.com/coder/coder/coderd/database" "github.com/coder/coder/provisioner/echo" "github.com/coder/coder/provisionersdk/proto" "github.com/coder/coder/testutil" @@ -38,13 +36,12 @@ func TestProvisionerJobLogs(t *testing.T) { template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - before := time.Now().UTC() coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) defer closer.Close() for { @@ -78,12 +75,11 @@ func TestProvisionerJobLogs(t *testing.T) { template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - before := database.Now() ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) defer closer.Close() for { @@ -121,7 +117,7 @@ func TestProvisionerJobLogs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, err := client.WorkspaceBuildLogsBefore(ctx, workspace.LatestBuild.ID, time.Now()) + logs, err := client.WorkspaceBuildLogsBefore(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) require.Greater(t, len(logs), 1) }) diff --git a/coderd/templateversions_test.go b/coderd/templateversions_test.go index 1bcb5ba8a0aaa..02ed6f1b9f30e 100644 --- a/coderd/templateversions_test.go +++ b/coderd/templateversions_test.go @@ -4,7 +4,6 @@ import ( "context" "net/http" "testing" - "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -430,7 +429,6 @@ func TestTemplateVersionLogs(t *testing.T) { t.Parallel() client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) user := coderdtest.CreateFirstUser(t, client) - before := time.Now() version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ Parse: echo.ParseComplete, ProvisionDryRun: echo.ProvisionComplete, @@ -465,7 +463,7 @@ func TestTemplateVersionLogs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, before) + logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0) require.NoError(t, err) defer closer.Close() for { @@ -625,7 +623,6 @@ func TestTemplateVersionDryRun(t *testing.T) { defer cancel() // Create template version dry-run - after := time.Now() job, err := client.CreateTemplateVersionDryRun(ctx, version.ID, codersdk.CreateTemplateVersionDryRunRequest{ ParameterValues: []codersdk.CreateParameterRequest{}, }) @@ -637,7 +634,7 @@ func TestTemplateVersionDryRun(t *testing.T) { require.Equal(t, job.ID, newJob.ID) // Stream logs - logs, closer, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, after) + logs, closer, err := client.TemplateVersionDryRunLogsAfter(ctx, version.ID, job.ID, 0) require.NoError(t, err) defer closer.Close() diff --git a/coderd/workspacebuilds_test.go b/coderd/workspacebuilds_test.go index 3d152932bf866..10245ce490bfc 100644 --- a/coderd/workspacebuilds_test.go +++ b/coderd/workspacebuilds_test.go @@ -452,7 +452,6 @@ func TestWorkspaceBuildLogs(t *testing.T) { t.Parallel() client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) user := coderdtest.CreateFirstUser(t, client) - before := time.Now() version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ Parse: echo.ParseComplete, Provision: []*proto.Provision_Response{{ @@ -487,7 +486,7 @@ func TestWorkspaceBuildLogs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, before.Add(-time.Hour)) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, workspace.LatestBuild.ID, 0) require.NoError(t, err) defer closer.Close() for { diff --git a/codersdk/provisionerdaemons.go b/codersdk/provisionerdaemons.go index adce0321be691..fb88e42aecb8d 100644 --- a/codersdk/provisionerdaemons.go +++ b/codersdk/provisionerdaemons.go @@ -76,7 +76,7 @@ type ProvisionerJob struct { } type ProvisionerJobLog struct { - ID uuid.UUID `json:"id"` + ID int64 `json:"id"` CreatedAt time.Time `json:"created_at"` Source LogSource `json:"log_source"` Level LogLevel `json:"log_level"` @@ -87,10 +87,10 @@ type ProvisionerJobLog struct { // provisionerJobLogsBefore provides log output that occurred before a time. // This is abstracted from a specific job type to provide consistency between // APIs. Logs is the only shared route between jobs. -func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, before time.Time) ([]ProvisionerJobLog, error) { +func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, before int64) ([]ProvisionerJobLog, error) { values := url.Values{} - if !before.IsZero() { - values["before"] = []string{strconv.FormatInt(before.UTC().UnixMilli(), 10)} + if before != 0 { + values["before"] = []string{strconv.FormatInt(before, 10)} } res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?%s", path, values.Encode()), nil) if err != nil { @@ -106,10 +106,10 @@ func (c *Client) provisionerJobLogsBefore(ctx context.Context, path string, befo } // provisionerJobLogsAfter streams logs that occurred after a specific time. -func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { afterQuery := "" - if !after.IsZero() { - afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli()) + if after != 0 { + afterQuery = fmt.Sprintf("&after=%d", after) } followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery)) if err != nil { diff --git a/codersdk/templateversions.go b/codersdk/templateversions.go index 3aee18ec80a18..3273b31b28f86 100644 --- a/codersdk/templateversions.go +++ b/codersdk/templateversions.go @@ -93,13 +93,13 @@ func (c *Client) TemplateVersionResources(ctx context.Context, version uuid.UUID return resources, json.NewDecoder(res.Body).Decode(&resources) } -// TemplateVersionLogsBefore returns logs that occurred before a specific time. -func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) { +// TemplateVersionLogsBefore returns logs that occurred before a specific log ID. +func (c *Client) TemplateVersionLogsBefore(ctx context.Context, version uuid.UUID, before int64) ([]ProvisionerJobLog, error) { return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), before) } -// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific time. -func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +// TemplateVersionLogsAfter streams logs for a template version that occurred after a specific log ID. +func (c *Client) TemplateVersionLogsAfter(ctx context.Context, version uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/logs", version), after) } @@ -159,14 +159,14 @@ func (c *Client) TemplateVersionDryRunResources(ctx context.Context, version, jo } // TemplateVersionDryRunLogsBefore returns logs for a template version dry-run -// that occurred before a specific time. -func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, job uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) { +// that occurred before a specific log ID. +func (c *Client) TemplateVersionDryRunLogsBefore(ctx context.Context, version, job uuid.UUID, before int64) ([]ProvisionerJobLog, error) { return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), before) } // TemplateVersionDryRunLogsAfter streams logs for a template version dry-run -// that occurred after a specific time. -func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +// that occurred after a specific log ID. +func (c *Client) TemplateVersionDryRunLogsAfter(ctx context.Context, version, job uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/templateversions/%s/dry-run/%s/logs", version, job), after) } diff --git a/codersdk/workspacebuilds.go b/codersdk/workspacebuilds.go index b9241d09ecafc..19ad6acecf6f5 100644 --- a/codersdk/workspacebuilds.go +++ b/codersdk/workspacebuilds.go @@ -117,13 +117,13 @@ func (c *Client) CancelWorkspaceBuild(ctx context.Context, id uuid.UUID) error { return nil } -// WorkspaceBuildLogsBefore returns logs that occurred before a specific time. -func (c *Client) WorkspaceBuildLogsBefore(ctx context.Context, build uuid.UUID, before time.Time) ([]ProvisionerJobLog, error) { +// WorkspaceBuildLogsBefore returns logs that occurred before a specific log ID. +func (c *Client) WorkspaceBuildLogsBefore(ctx context.Context, build uuid.UUID, before int64) ([]ProvisionerJobLog, error) { return c.provisionerJobLogsBefore(ctx, fmt.Sprintf("/api/v2/workspacebuilds/%s/logs", build), before) } -// WorkspaceBuildLogsAfter streams logs for a workspace build that occurred after a specific time. -func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after time.Time) (<-chan ProvisionerJobLog, io.Closer, error) { +// WorkspaceBuildLogsAfter streams logs for a workspace build that occurred after a specific log ID. +func (c *Client) WorkspaceBuildLogsAfter(ctx context.Context, build uuid.UUID, after int64) (<-chan ProvisionerJobLog, io.Closer, error) { return c.provisionerJobLogsAfter(ctx, fmt.Sprintf("/api/v2/workspacebuilds/%s/logs", build), after) } diff --git a/loadtest/workspacebuild/run.go b/loadtest/workspacebuild/run.go index d5ffd37596e1e..473a3fa9eb41f 100644 --- a/loadtest/workspacebuild/run.go +++ b/loadtest/workspacebuild/run.go @@ -41,14 +41,13 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error { req.Name = "test-" + randName } - after := time.Now() workspace, err := r.client.CreateWorkspace(ctx, r.cfg.OrganizationID, r.cfg.UserID, req) if err != nil { return xerrors.Errorf("create workspace: %w", err) } r.workspaceID = workspace.ID - err = waitForBuild(ctx, logs, r.client, workspace.LatestBuild.ID, after) + err = waitForBuild(ctx, logs, r.client, workspace.LatestBuild.ID) if err != nil { return xerrors.Errorf("wait for build: %w", err) } @@ -68,7 +67,6 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error { return nil } - after := time.Now() build, err := r.client.CreateWorkspaceBuild(ctx, r.workspaceID, codersdk.CreateWorkspaceBuildRequest{ Transition: codersdk.WorkspaceTransitionDelete, }) @@ -78,7 +76,7 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error { // TODO: capture these logs logs := io.Discard - err = waitForBuild(ctx, logs, r.client, build.ID, after) + err = waitForBuild(ctx, logs, r.client, build.ID) if err != nil { return xerrors.Errorf("wait for build: %w", err) } @@ -86,7 +84,7 @@ func (r *Runner) Cleanup(ctx context.Context, _ string) error { return nil } -func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, buildID uuid.UUID, after time.Time) error { +func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, buildID uuid.UUID) error { _, _ = fmt.Fprint(w, "Build is currently queued...") // Wait for build to start. @@ -106,7 +104,7 @@ func waitForBuild(ctx context.Context, w io.Writer, client *codersdk.Client, bui _, _ = fmt.Fprintln(w, "\nBuild started! Streaming logs below:") - logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, after) + logs, closer, err := client.WorkspaceBuildLogsAfter(ctx, buildID, 0) if err != nil { return xerrors.Errorf("start streaming build logs: %w", err) } diff --git a/provisionerd/provisionerd.go b/provisionerd/provisionerd.go index fa71ed3057a03..feb0637c62f3e 100644 --- a/provisionerd/provisionerd.go +++ b/provisionerd/provisionerd.go @@ -50,6 +50,7 @@ type Options struct { ForceCancelInterval time.Duration UpdateInterval time.Duration + LogDebounceInterval time.Duration PollInterval time.Duration Provisioners Provisioners WorkDirectory string @@ -66,6 +67,9 @@ func New(clientDialer Dialer, opts *Options) *Server { if opts.ForceCancelInterval == 0 { opts.ForceCancelInterval = time.Minute } + if opts.LogDebounceInterval == 0 { + opts.LogDebounceInterval = 50 * time.Millisecond + } if opts.Filesystem == nil { opts.Filesystem = afero.NewOsFs() } @@ -315,7 +319,7 @@ func (p *Server) acquireJob(ctx context.Context) { return } - p.activeJob = runner.NewRunner( + p.activeJob = runner.New( ctx, job, p, @@ -325,6 +329,7 @@ func (p *Server) acquireJob(ctx context.Context) { provisioner, p.opts.UpdateInterval, p.opts.ForceCancelInterval, + p.opts.LogDebounceInterval, p.tracer, p.opts.Metrics.Runner, ) diff --git a/provisionerd/provisionerd_test.go b/provisionerd/provisionerd_test.go index 05becb863ebd7..a65ae60c1150a 100644 --- a/provisionerd/provisionerd_test.go +++ b/provisionerd/provisionerd_test.go @@ -558,11 +558,17 @@ func TestProvisionerd(t *testing.T) { }, nil }, updateJob: func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { - if len(update.Logs) > 0 && update.Logs[0].Source == proto.LogSource_PROVISIONER { - // Close on a log so we know when the job is in progress! - updated.Do(func() { - close(updateChan) - }) + if len(update.Logs) > 0 { + for _, log := range update.Logs { + if log.Source != proto.LogSource_PROVISIONER { + continue + } + // Close on a log so we know when the job is in progress! + updated.Do(func() { + close(updateChan) + }) + break + } } return &proto.UpdateJobResponse{}, nil }, @@ -634,11 +640,17 @@ func TestProvisionerd(t *testing.T) { }, updateJob: func(ctx context.Context, update *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { resp := &proto.UpdateJobResponse{} - if len(update.Logs) > 0 && update.Logs[0].Source == proto.LogSource_PROVISIONER { - // Close on a log so we know when the job is in progress! - updated.Do(func() { - close(updateChan) - }) + if len(update.Logs) > 0 { + for _, log := range update.Logs { + if log.Source != proto.LogSource_PROVISIONER { + continue + } + // Close on a log so we know when the job is in progress! + updated.Do(func() { + close(updateChan) + }) + break + } } // start returning Canceled once we've gotten at least one log. select { diff --git a/provisionerd/runner/runner.go b/provisionerd/runner/runner.go index 13aafdbb1d373..dc4fd63954cd3 100644 --- a/provisionerd/runner/runner.go +++ b/provisionerd/runner/runner.go @@ -33,6 +33,10 @@ const ( MissingParameterErrorText = "missing parameter" ) +var ( + errUpdateSkipped = xerrors.New("update skipped; job complete or failed") +) + type Runner struct { tracer trace.Tracer metrics Metrics @@ -44,6 +48,7 @@ type Runner struct { provisioner sdkproto.DRPCProvisionerClient updateInterval time.Duration forceCancelInterval time.Duration + logBufferInterval time.Duration // closed when the Runner is finished sending any updates/failed/complete. done chan struct{} @@ -57,9 +62,11 @@ type Runner struct { // mutex controls access to all the following variables. mutex *sync.Mutex // used to wait for the failedJob or completedJob to be populated - cond *sync.Cond - failedJob *proto.FailedJob - completedJob *proto.CompletedJob + cond *sync.Cond + flushLogsTimer *time.Timer + queuedLogs []*proto.Log + failedJob *proto.FailedJob + completedJob *proto.CompletedJob // setting this false signals that no more messages about this job should be sent. Usually this // means that a terminal message like FailedJob or CompletedJob has been sent, even in the case // of a Cancel(). However, when someone calls Fail() or ForceStop(), we might not send the @@ -79,7 +86,7 @@ type JobUpdater interface { CompleteJob(ctx context.Context, in *proto.CompletedJob) error } -func NewRunner( +func New( ctx context.Context, job *proto.AcquiredJob, updater JobUpdater, @@ -89,6 +96,7 @@ func NewRunner( provisioner sdkproto.DRPCProvisionerClient, updateInterval time.Duration, forceCancelInterval time.Duration, + logDebounceInterval time.Duration, tracer trace.Tracer, metrics Metrics, ) *Runner { @@ -109,6 +117,8 @@ func NewRunner( provisioner: provisioner, updateInterval: updateInterval, forceCancelInterval: forceCancelInterval, + logBufferInterval: logDebounceInterval, + queuedLogs: make([]*proto.Log, 0), mutex: m, cond: sync.NewCond(m), done: make(chan struct{}), @@ -262,7 +272,7 @@ func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto. r.mutex.Lock() defer r.mutex.Unlock() if !r.okToSend { - return nil, xerrors.New("update skipped; job complete or failed") + return nil, errUpdateSkipped } return r.sender.UpdateJob(ctx, u) } @@ -291,19 +301,12 @@ func (r *Runner) doCleanFinish(ctx context.Context) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Cleaning Up", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Cleaning Up", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - r.logger.Warn(ctx, "failed to log cleanup") - return - } // Cleanup the work directory after execution. for attempt := 0; attempt < 5; attempt++ { @@ -320,6 +323,8 @@ func (r *Runner) doCleanFinish(ctx context.Context) { r.logger.Debug(ctx, "cleaned up work directory") break } + + r.flushQueuedLogs(ctx) }() completedJob, failedJob = r.do(ctx) @@ -335,14 +340,11 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) return nil, r.failedJobf("create work directory %q: %s", r.workDirectory, err) } - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Setting up", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Setting up", + CreatedAt: time.Now().UnixMilli(), }) if err != nil { return nil, r.failedJobf("write log: %s", err) @@ -402,7 +404,6 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) ) } } - switch jobType := r.job.Type.(type) { case *proto.AcquiredJob_TemplateImport_: r.logger.Debug(context.Background(), "acquired job is template import") @@ -489,19 +490,12 @@ func (r *Runner) runReadmeParse(ctx context.Context) *proto.FailedJob { fi, err := afero.ReadFile(r.filesystem, path.Join(r.workDirectory, ReadmeFile)) if err != nil { - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_DEBUG, - Stage: "No README.md provided", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_DEBUG, + Stage: "No README.md provided", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return r.failedJobf("write log: %s", err) - } - return nil } @@ -526,18 +520,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p defer span.End() // Parse parameters and update the job with the parameter specs - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Parsing template parameters", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Parsing template parameters", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } parameterSchemas, err := r.runTemplateImportParse(ctx) if err != nil { return nil, r.failedJobf("run parse: %s", err) @@ -562,18 +550,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p } // Determine persistent resources - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Detecting persistent resources", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Detecting persistent resources", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } startResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{ CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl, WorkspaceTransition: sdkproto.WorkspaceTransition_START, @@ -583,18 +565,12 @@ func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *p } // Determine ephemeral resources. - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: "Detecting ephemeral resources", - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: "Detecting ephemeral resources", + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } stopResources, err := r.runTemplateImportProvision(ctx, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{ CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl, WorkspaceTransition: sdkproto.WorkspaceTransition_STOP, @@ -638,19 +614,13 @@ func (r *Runner) runTemplateImportParse(ctx context.Context) ([]*sdkproto.Parame slog.F("output", msgType.Log.Output), ) - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER, - Level: msgType.Log.Level, - CreatedAt: time.Now().UnixMilli(), - Output: msgType.Log.Output, - Stage: "Parse parameters", - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER, + Level: msgType.Log.Level, + CreatedAt: time.Now().UnixMilli(), + Output: msgType.Log.Output, + Stage: "Parse parameters", }) - if err != nil { - return nil, xerrors.Errorf("update job: %w", err) - } case *sdkproto.Parse_Response_Complete: r.logger.Info(context.Background(), "parse complete", slog.F("parameter_schemas", msgType.Complete.ParameterSchemas)) @@ -721,19 +691,13 @@ func (r *Runner) runTemplateImportProvision(ctx context.Context, values []*sdkpr slog.F("level", msgType.Log.Level), slog.F("output", msgType.Log.Output), ) - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER, - Level: msgType.Log.Level, - CreatedAt: time.Now().UnixMilli(), - Output: msgType.Log.Output, - Stage: stage, - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER, + Level: msgType.Log.Level, + CreatedAt: time.Now().UnixMilli(), + Output: msgType.Log.Output, + Stage: stage, }) - if err != nil { - return nil, xerrors.Errorf("send job update: %w", err) - } case *sdkproto.Provision_Response_Complete: if msgType.Complete.Error != "" { r.logger.Info(context.Background(), "dry-run provision failure", @@ -822,18 +786,12 @@ func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *p stage = "Destroying workspace" } - _, err := r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER_DAEMON, - Level: sdkproto.LogLevel_INFO, - Stage: stage, - CreatedAt: time.Now().UnixMilli(), - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER_DAEMON, + Level: sdkproto.LogLevel_INFO, + Stage: stage, + CreatedAt: time.Now().UnixMilli(), }) - if err != nil { - return nil, r.failedJobf("write log: %s", err) - } // use the notStopped so that if we attempt to gracefully cancel, the stream will still be available for us // to send the cancel to the provisioner @@ -881,19 +839,13 @@ func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *p slog.F("workspace_build_id", r.job.GetWorkspaceBuild().WorkspaceBuildId), ) - _, err = r.update(ctx, &proto.UpdateJobRequest{ - JobId: r.job.JobId, - Logs: []*proto.Log{{ - Source: proto.LogSource_PROVISIONER, - Level: msgType.Log.Level, - CreatedAt: time.Now().UnixMilli(), - Output: msgType.Log.Output, - Stage: stage, - }}, + r.queueLog(ctx, &proto.Log{ + Source: proto.LogSource_PROVISIONER, + Level: msgType.Log.Level, + CreatedAt: time.Now().UnixMilli(), + Output: msgType.Log.Output, + Stage: stage, }) - if err != nil { - return nil, r.failedJobf("send job update: %s", err) - } case *sdkproto.Provision_Response_Complete: if msgType.Complete.Error != "" { r.logger.Info(context.Background(), "provision failed; updating state", @@ -945,3 +897,41 @@ func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.Span semconv.ServiceNameKey.String("coderd.provisionerd"), ))...) } + +func (r *Runner) queueLog(ctx context.Context, log *proto.Log) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.queuedLogs = append(r.queuedLogs, log) + if r.flushLogsTimer != nil { + r.flushLogsTimer.Reset(r.logBufferInterval) + return + } + if len(r.queuedLogs) > 100 { + // Flushing logs requires a lock, so this can happen async. + go r.flushQueuedLogs(ctx) + return + } + r.flushLogsTimer = time.AfterFunc(r.logBufferInterval, func() { + r.flushQueuedLogs(ctx) + }) +} + +func (r *Runner) flushQueuedLogs(ctx context.Context) { + r.mutex.Lock() + if r.flushLogsTimer != nil { + r.flushLogsTimer.Stop() + } + logs := r.queuedLogs[:] + r.queuedLogs = make([]*proto.Log, 0) + r.mutex.Unlock() + _, err := r.update(ctx, &proto.UpdateJobRequest{ + JobId: r.job.JobId, + Logs: logs, + }) + if err != nil { + if errors.Is(err, errUpdateSkipped) { + return + } + r.logger.Error(ctx, "flush queued logs", slog.Error(err)) + } +} diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index 764aab5c6e761..26a8e79115a2b 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -538,7 +538,7 @@ export interface ProvisionerJob { // From codersdk/provisionerdaemons.go export interface ProvisionerJobLog { - readonly id: string + readonly id: number readonly created_at: string readonly log_source: LogSource readonly log_level: LogLevel diff --git a/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts b/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts index ec5d24463a3a6..e9fc6aed079c1 100644 --- a/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts +++ b/site/src/components/WorkspaceBuildLogs/WorkspaceBuildLogs.test.ts @@ -5,7 +5,7 @@ describe("groupLogsByStage", () => { it("should group them by stage", () => { const input: ProvisionerJobLog[] = [ { - id: "1", + id: 1, created_at: "oct 13", log_source: "provisioner", log_level: "debug", @@ -13,7 +13,7 @@ describe("groupLogsByStage", () => { output: "test", }, { - id: "2", + id: 2, created_at: "oct 13", log_source: "provisioner", log_level: "debug", @@ -21,7 +21,7 @@ describe("groupLogsByStage", () => { output: "test", }, { - id: "3", + id: 3, created_at: "oct 13", log_source: "provisioner", log_level: "debug", diff --git a/site/src/testHelpers/entities.ts b/site/src/testHelpers/entities.ts index 81cfe8f6a5041..8b93801dcd70b 100644 --- a/site/src/testHelpers/entities.ts +++ b/site/src/testHelpers/entities.ts @@ -543,7 +543,7 @@ export const MockGitSSHKey: TypesGen.GitSSHKey = { export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ { - id: "836f8ab6-5202-4711-afa5-293394ced011", + id: 1, created_at: "2022-05-19T16:45:31.005Z", log_source: "provisioner_daemon", log_level: "info", @@ -551,7 +551,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "2db0ae92-b310-4a6e-8b1f-23380b70ac7f", + id: 2, created_at: "2022-05-19T16:45:31.006Z", log_source: "provisioner_daemon", log_level: "info", @@ -559,7 +559,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "37a5b7b1-b3eb-47cf-b80b-bd16e2e08a3d", + id: 3, created_at: "2022-05-19T16:45:31.072Z", log_source: "provisioner", log_level: "debug", @@ -567,7 +567,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "5e4e37a1-c217-48bc-84f5-7f1c3efbd042", + id: 4, created_at: "2022-05-19T16:45:31.073Z", log_source: "provisioner", log_level: "debug", @@ -575,7 +575,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Initializing the backend...", }, { - id: "060ed132-5d12-4584-9005-5c9557febe2f", + id: 5, created_at: "2022-05-19T16:45:31.077Z", log_source: "provisioner", log_level: "debug", @@ -583,7 +583,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "b2e70a1c-1943-4616-8ac9-25326c9f7e7b", + id: 6, created_at: "2022-05-19T16:45:31.078Z", log_source: "provisioner", log_level: "debug", @@ -591,7 +591,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Initializing provider plugins...", }, { - id: "993107fe-6dfb-42ec-912a-b32f50e60d62", + id: 7, created_at: "2022-05-19T16:45:31.078Z", log_source: "provisioner", log_level: "debug", @@ -599,7 +599,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: '- Finding hashicorp/google versions matching "~\u003e 4.15"...', }, { - id: "2ad2e2a1-7a75-4827-8cb9-928acfc6fc07", + id: 8, created_at: "2022-05-19T16:45:31.123Z", log_source: "provisioner", log_level: "debug", @@ -607,7 +607,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: '- Finding coder/coder versions matching "0.3.4"...', }, { - id: "7c723a90-0190-4c2f-9d97-ede39ef3d55f", + id: 9, created_at: "2022-05-19T16:45:31.137Z", log_source: "provisioner", log_level: "debug", @@ -615,7 +615,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "- Using hashicorp/google v4.21.0 from the shared cache directory", }, { - id: "3910144b-411b-4a53-9900-88d406ed9bf4", + id: 10, created_at: "2022-05-19T16:45:31.344Z", log_source: "provisioner", log_level: "debug", @@ -623,7 +623,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "- Using coder/coder v0.3.4 from the shared cache directory", }, { - id: "e3a02ad4-edc0-442f-8b9a-39d01d56b43b", + id: 11, created_at: "2022-05-19T16:45:31.388Z", log_source: "provisioner", log_level: "debug", @@ -631,7 +631,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "440cceb3-aabf-4838-979b-1fd37fe2d8d8", + id: 12, created_at: "2022-05-19T16:45:31.388Z", log_source: "provisioner", log_level: "debug", @@ -640,7 +640,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "Terraform has created a lock file .terraform.lock.hcl to record the provider", }, { - id: "90e1f244-78ff-4d95-871e-b2bebcabc39a", + id: 13, created_at: "2022-05-19T16:45:31.389Z", log_source: "provisioner", log_level: "debug", @@ -649,7 +649,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "selections it made above. Include this file in your version control repository", }, { - id: "e4527d6c-2412-452b-a946-5870787caf6b", + id: 14, created_at: "2022-05-19T16:45:31.389Z", log_source: "provisioner", log_level: "debug", @@ -658,7 +658,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "so that Terraform can guarantee to make the same selections by default when", }, { - id: "02f96d19-d94b-4d0e-a1c4-313a0d2ff9e3", + id: 15, created_at: "2022-05-19T16:45:31.39Z", log_source: "provisioner", log_level: "debug", @@ -666,7 +666,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: 'you run "terraform init" in the future.', }, { - id: "667c03ca-1b24-4f36-a598-f0322cf3e2a1", + id: 16, created_at: "2022-05-19T16:45:31.39Z", log_source: "provisioner", log_level: "debug", @@ -674,7 +674,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "", }, { - id: "48039d6a-9b21-460f-9ca3-4b0e2becfd18", + id: 17, created_at: "2022-05-19T16:45:31.391Z", log_source: "provisioner", log_level: "debug", @@ -682,7 +682,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Terraform has been successfully initialized!", }, { - id: "6fe4b64f-3aa6-4850-96e9-6db8478a53be", + id: 18, created_at: "2022-05-19T16:45:31.42Z", log_source: "provisioner", log_level: "info", @@ -690,7 +690,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Terraform 1.1.9", }, { - id: "fa7b6321-7ecd-492d-a671-6366186fad08", + id: 19, created_at: "2022-05-19T16:45:33.537Z", log_source: "provisioner", log_level: "info", @@ -698,7 +698,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "coder_agent.dev: Plan to create", }, { - id: "e677e49f-c5ba-417c-8c9d-78bdad744ce1", + id: 20, created_at: "2022-05-19T16:45:33.537Z", log_source: "provisioner", log_level: "info", @@ -706,7 +706,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_disk.root: Plan to create", }, { - id: "4b0e6168-29e4-4419-bf81-b57e31087666", + id: 21, created_at: "2022-05-19T16:45:33.538Z", log_source: "provisioner", log_level: "info", @@ -714,7 +714,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_instance.dev[0]: Plan to create", }, { - id: "5902f89c-8acd-45e2-9bd6-de4d6fd8fc9c", + id: 22, created_at: "2022-05-19T16:45:33.539Z", log_source: "provisioner", log_level: "info", @@ -722,7 +722,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Plan: 3 to add, 0 to change, 0 to destroy.", }, { - id: "a8107907-7c53-4aae-bb48-9a5f9759c7d5", + id: 23, created_at: "2022-05-19T16:45:33.712Z", log_source: "provisioner", log_level: "info", @@ -730,7 +730,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "coder_agent.dev: Creating...", }, { - id: "aaf13503-2f1a-4f6c-aced-b8fc48304dc1", + id: 24, created_at: "2022-05-19T16:45:33.719Z", log_source: "provisioner", log_level: "info", @@ -739,7 +739,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "coder_agent.dev: Creation complete after 0s [id=d07f5bdc-4a8d-4919-9cdb-0ac6ba9e64d6]", }, { - id: "4ada8886-f5b3-4fee-a1a3-72064b50d5ae", + id: 25, created_at: "2022-05-19T16:45:34.139Z", log_source: "provisioner", log_level: "info", @@ -747,7 +747,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_disk.root: Creating...", }, { - id: "8ffc59e8-a4d0-4ffe-9bcc-cb84ca51cc22", + id: 26, created_at: "2022-05-19T16:45:44.14Z", log_source: "provisioner", log_level: "info", @@ -755,7 +755,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_disk.root: Still creating... [10s elapsed]", }, { - id: "063189fd-75ad-415a-ac77-8c34b9e202b2", + id: 27, created_at: "2022-05-19T16:45:47.106Z", log_source: "provisioner", log_level: "info", @@ -764,7 +764,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "google_compute_disk.root: Creation complete after 13s [id=projects/bruno-coder-v2/zones/europe-west4-b/disks/coder-developer-bruno-dev-123-root]", }, { - id: "6fd554a1-a7a2-439f-b8d8-369d6c1ead21", + id: 28, created_at: "2022-05-19T16:45:47.118Z", log_source: "provisioner", log_level: "info", @@ -772,7 +772,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_instance.dev[0]: Creating...", }, { - id: "87388f7e-ab01-44b1-b35e-8e06636164d3", + id: 29, created_at: "2022-05-19T16:45:57.122Z", log_source: "provisioner", log_level: "info", @@ -780,7 +780,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "google_compute_instance.dev[0]: Still creating... [10s elapsed]", }, { - id: "baa40120-3f18-40d2-a35c-b11f421a1ce1", + id: 30, created_at: "2022-05-19T16:46:00.837Z", log_source: "provisioner", log_level: "info", @@ -789,7 +789,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ "google_compute_instance.dev[0]: Creation complete after 14s [id=projects/bruno-coder-v2/zones/europe-west4-b/instances/coder-developer-bruno-dev-123]", }, { - id: "00e18953-fba6-4b43-97a3-ecf376553c08", + id: 31, created_at: "2022-05-19T16:46:00.846Z", log_source: "provisioner", log_level: "info", @@ -797,7 +797,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Apply complete! Resources: 3 added, 0 changed, 0 destroyed.", }, { - id: "431811da-b534-4d92-b6e5-44814548c812", + id: 32, created_at: "2022-05-19T16:46:00.847Z", log_source: "provisioner", log_level: "info", @@ -805,7 +805,7 @@ export const MockWorkspaceBuildLogs: TypesGen.ProvisionerJobLog[] = [ output: "Outputs: 0", }, { - id: "70459334-4878-4bda-a546-98eee166c4c6", + id: 33, created_at: "2022-05-19T16:46:02.283Z", log_source: "provisioner_daemon", log_level: "info", diff --git a/site/src/xServices/workspaceBuild/workspaceBuildXService.ts b/site/src/xServices/workspaceBuild/workspaceBuildXService.ts index fdd171e0bfde4..f77789c3c7567 100644 --- a/site/src/xServices/workspaceBuild/workspaceBuildXService.ts +++ b/site/src/xServices/workspaceBuild/workspaceBuildXService.ts @@ -125,11 +125,14 @@ export const workspaceBuildMachine = createMachine( API.getWorkspaceBuildLogs(ctx.buildId, ctx.timeCursor), streamWorkspaceBuildLogs: (ctx) => async (callback) => { return new Promise((resolve, reject) => { + if (!ctx.logs) { + return reject("logs must be set") + } const proto = location.protocol === "https:" ? "wss:" : "ws:" const socket = new WebSocket( `${proto}//${location.host}/api/v2/workspacebuilds/${ ctx.buildId - }/logs?follow=true&after=${ctx.timeCursor.getTime()}`, + }/logs?follow=true&after=${ctx.logs[ctx.logs.length - 1].id}`, ) socket.binaryType = "blob" socket.addEventListener("message", (event) => {