From f2cce85f66a41268b65aa22c6cf4ad7e544e29ea Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Sun, 6 Nov 2022 22:29:44 +0000 Subject: [PATCH 1/2] feat: Make workspace watching realtime instead of polling This was leading to performance issues on the frontend, where the page should only be rendered if changes occur. While this could be changed on the frontend, it was always the intention to make this socket ~realtime anyways. --- coderd/activitybump.go | 2 +- coderd/database/databasefake/databasefake.go | 6 +- coderd/database/querier.go | 2 +- coderd/database/queries.sql.go | 25 ++++- coderd/database/queries/workspacebuilds.sql | 4 +- coderd/httpapi/httpapi.go | 14 ++- coderd/provisionerdaemons.go | 17 ++- coderd/workspaceagents.go | 32 ++++++ coderd/workspacebuilds.go | 9 ++ coderd/workspaces.go | 106 ++++++++++++------- coderd/workspaces_test.go | 76 +++++++++++-- codersdk/workspaces.go | 23 ++-- 12 files changed, 240 insertions(+), 76 deletions(-) diff --git a/coderd/activitybump.go b/coderd/activitybump.go index fa90f7d275daf..0dc9db8adb0ad 100644 --- a/coderd/activitybump.go +++ b/coderd/activitybump.go @@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas newDeadline := database.Now().Add(bumpAmount) - if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: build.ID, UpdatedAt: database.Now(), ProvisionerState: build.ProvisionerState, diff --git a/coderd/database/databasefake/databasefake.go b/coderd/database/databasefake/databasefake.go index a9117f2b2efca..660b04b0d3a13 100644 --- a/coderd/database/databasefake/databasefake.go +++ b/coderd/database/databasefake/databasefake.go @@ -2823,7 +2823,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database. return sql.ErrNoRows } -func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error { +func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) (database.WorkspaceBuild, error) { q.mutex.Lock() defer q.mutex.Unlock() @@ -2835,9 +2835,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U workspaceBuild.ProvisionerState = arg.ProvisionerState workspaceBuild.Deadline = arg.Deadline q.workspaceBuilds[index] = workspaceBuild - return nil + return workspaceBuild, nil } - return sql.ErrNoRows + return database.WorkspaceBuild{}, sql.ErrNoRows } func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error { diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 25ab45c21df96..7a4fa43ffc5a5 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -187,7 +187,7 @@ type sqlcQuerier interface { UpdateWorkspaceAgentVersionByID(ctx context.Context, arg UpdateWorkspaceAgentVersionByIDParams) error UpdateWorkspaceAppHealthByID(ctx context.Context, arg UpdateWorkspaceAppHealthByIDParams) error UpdateWorkspaceAutostart(ctx context.Context, arg UpdateWorkspaceAutostartParams) error - UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) error + UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) (WorkspaceBuild, error) UpdateWorkspaceDeletedByID(ctx context.Context, arg UpdateWorkspaceDeletedByIDParams) error UpdateWorkspaceLastUsedAt(ctx context.Context, arg UpdateWorkspaceLastUsedAtParams) error UpdateWorkspaceTTL(ctx context.Context, arg UpdateWorkspaceTTLParams) error diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 016110e045b05..bfb65b30dc9d5 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -5492,7 +5492,7 @@ func (q *sqlQuerier) InsertWorkspaceBuild(ctx context.Context, arg InsertWorkspa return i, err } -const updateWorkspaceBuildByID = `-- name: UpdateWorkspaceBuildByID :exec +const updateWorkspaceBuildByID = `-- name: UpdateWorkspaceBuildByID :one UPDATE workspace_builds SET @@ -5500,7 +5500,7 @@ SET provisioner_state = $3, deadline = $4 WHERE - id = $1 + id = $1 RETURNING id, created_at, updated_at, workspace_id, template_version_id, build_number, transition, initiator_id, provisioner_state, job_id, deadline, reason ` type UpdateWorkspaceBuildByIDParams struct { @@ -5510,14 +5510,29 @@ type UpdateWorkspaceBuildByIDParams struct { Deadline time.Time `db:"deadline" json:"deadline"` } -func (q *sqlQuerier) UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) error { - _, err := q.db.ExecContext(ctx, updateWorkspaceBuildByID, +func (q *sqlQuerier) UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) (WorkspaceBuild, error) { + row := q.db.QueryRowContext(ctx, updateWorkspaceBuildByID, arg.ID, arg.UpdatedAt, arg.ProvisionerState, arg.Deadline, ) - return err + var i WorkspaceBuild + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.WorkspaceID, + &i.TemplateVersionID, + &i.BuildNumber, + &i.Transition, + &i.InitiatorID, + &i.ProvisionerState, + &i.JobID, + &i.Deadline, + &i.Reason, + ) + return i, err } const getWorkspaceResourceByID = `-- name: GetWorkspaceResourceByID :one diff --git a/coderd/database/queries/workspacebuilds.sql b/coderd/database/queries/workspacebuilds.sql index cdb7d0e130f07..d560a91d996f5 100644 --- a/coderd/database/queries/workspacebuilds.sql +++ b/coderd/database/queries/workspacebuilds.sql @@ -124,7 +124,7 @@ INSERT INTO VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *; --- name: UpdateWorkspaceBuildByID :exec +-- name: UpdateWorkspaceBuildByID :one UPDATE workspace_builds SET @@ -132,4 +132,4 @@ SET provisioner_state = $3, deadline = $4 WHERE - id = $1; + id = $1 RETURNING *; diff --git a/coderd/httpapi/httpapi.go b/coderd/httpapi/httpapi.go index 5e4648828fd37..d2f1c07de0277 100644 --- a/coderd/httpapi/httpapi.go +++ b/coderd/httpapi/httpapi.go @@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f buf := &bytes.Buffer{} enc := json.NewEncoder(buf) - _, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type)) + _, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type)) if err != nil { return err } - err = enc.Encode(sse.Data) - if err != nil { - return err + if sse.Data != nil { + _, err = buf.WriteString("data: ") + if err != nil { + return err + } + err = enc.Encode(sse.Data) + if err != nil { + return err + } } err = buf.WriteByte('\n') diff --git a/coderd/provisionerdaemons.go b/coderd/provisionerdaemons.go index 87a9b9a98320a..1399d4a7c1eb1 100644 --- a/coderd/provisionerdaemons.go +++ b/coderd/provisionerdaemons.go @@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty if err != nil { return nil, failJob(fmt.Sprintf("get owner: %s", err)) } + err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{}) + if err != nil { + return nil, failJob(fmt.Sprintf("publish workspace update: %s", err)) + } // Compute parameters for the workspace to consume. parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{ @@ -543,7 +547,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa if err != nil { return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err) } - err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: input.WorkspaceBuildID, UpdatedAt: database.Now(), ProvisionerState: jobType.WorkspaceBuild.State, @@ -552,6 +556,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa if err != nil { return nil, xerrors.Errorf("update workspace build state: %w", err) } + err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{}) + if err != nil { + return nil, xerrors.Errorf("update workspace: %w", err) + } case *proto.FailedJob_TemplateImport_: } @@ -657,7 +665,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr if err != nil { return xerrors.Errorf("update provisioner job: %w", err) } - err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + _, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: workspaceBuild.ID, Deadline: workspaceDeadline, ProvisionerState: jobType.WorkspaceBuild.State, @@ -692,6 +700,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr if err != nil { return nil, xerrors.Errorf("complete job: %w", err) } + + err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{}) + if err != nil { + return nil, xerrors.Errorf("update workspace: %w", err) + } case *proto.CompletedJob_TemplateDryRun_: for _, resource := range jobType.TemplateDryRun.Resources { server.Logger.Info(ctx, "inserting template dry-run job resource", diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index f754b7a2c162e..429184aea1c7d 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -539,6 +539,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request Valid: true, } _ = updateConnectionTimes() + _ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{}) }() err = updateConnectionTimes() @@ -546,6 +547,9 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request _ = conn.Close(websocket.StatusGoingAway, err.Error()) return } + if !api.publishWorkspaceUpdate(ctx, rw, build.WorkspaceID) { + return + } // End span so we don't get long lived trace data. tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx)) @@ -972,6 +976,34 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request) } } + resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID) + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error fetching workspace resource.", + Detail: err.Error(), + }) + return + } + job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID) + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error fetching workspace build.", + Detail: err.Error(), + }) + return + } + workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID) + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error fetching workspace.", + Detail: err.Error(), + }) + return + } + if !api.publishWorkspaceUpdate(r.Context(), rw, workspace.ID) { + return + } + httpapi.Write(r.Context(), rw, http.StatusOK, nil) } diff --git a/coderd/workspacebuilds.go b/coderd/workspacebuilds.go index 999b7317b4da7..baaae39b9be44 100644 --- a/coderd/workspacebuilds.go +++ b/coderd/workspacebuilds.go @@ -574,6 +574,10 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) { return } + if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) { + return + } + httpapi.Write(ctx, rw, http.StatusCreated, apiBuild) } @@ -632,6 +636,11 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques }) return } + + if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) { + return + } + httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{ Message: "Job has been marked as canceled...", }) diff --git a/coderd/workspaces.go b/coderd/workspaces.go index 2b3d077d106ac..b68f7bcf04047 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -634,6 +634,10 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) { return } + if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) { + return + } + aReq.New = newWorkspace rw.WriteHeader(http.StatusNoContent) } @@ -839,7 +843,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) { return err } - if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: build.ID, UpdatedAt: build.UpdatedAt, ProvisionerState: build.ProvisionerState, @@ -883,48 +887,60 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) { // Ignore all trace spans after this, they're not too useful. ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan) - t := time.NewTicker(time.Second * 1) - defer t.Stop() + cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) { + workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID) + if err != nil { + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeError, + Data: codersdk.Response{ + Message: "Internal error fetching workspace.", + Detail: err.Error(), + }, + }) + return + } + + data, err := api.workspaceData(ctx, []database.Workspace{workspace}) + if err != nil { + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeError, + Data: codersdk.Response{ + Message: "Internal error fetching workspace data.", + Detail: err.Error(), + }, + }) + return + } + + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeData, + Data: convertWorkspace( + workspace, + data.builds[0], + data.templates[0], + findUser(workspace.OwnerID, data.users), + ), + }) + }) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error subscribing to workspace events.", + Detail: err.Error(), + }) + return + } + defer cancelSubscribe() + + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypePing, + }) + for { select { case <-ctx.Done(): return case <-senderClosed: return - case <-t.C: - workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID) - if err != nil { - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeError, - Data: codersdk.Response{ - Message: "Internal error fetching workspace.", - Detail: err.Error(), - }, - }) - return - } - - data, err := api.workspaceData(ctx, []database.Workspace{workspace}) - if err != nil { - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeError, - Data: codersdk.Response{ - Message: "Internal error fetching workspace data.", - Detail: err.Error(), - }, - }) - return - } - - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeData, - Data: convertWorkspace( - workspace, - data.builds[0], - data.templates[0], - findUser(workspace.OwnerID, data.users), - ), - }) } } } @@ -1213,3 +1229,19 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes return parts } + +func watchWorkspaceChannel(id uuid.UUID) string { + return fmt.Sprintf("workspace:%s", id) +} + +func (api *API) publishWorkspaceUpdate(ctx context.Context, rw http.ResponseWriter, workspaceID uuid.UUID) bool { + err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{}) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error publishing workspace update.", + Detail: err.Error(), + }) + return false + } + return true +} diff --git a/coderd/workspaces_test.go b/coderd/workspaces_test.go index 06d98ce87352d..6dedc38931e60 100644 --- a/coderd/workspaces_test.go +++ b/coderd/workspaces_test.go @@ -12,6 +12,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + + "github.com/coder/coder/agent" "github.com/coder/coder/coderd/audit" "github.com/coder/coder/coderd/autobuild/schedule" "github.com/coder/coder/coderd/coderdtest" @@ -1347,27 +1351,79 @@ func TestWorkspaceExtend(t *testing.T) { func TestWorkspaceWatcher(t *testing.T) { t.Parallel() - client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) + client, closeFunc := coderdtest.NewWithProvisionerCloser(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) user := coderdtest.CreateFirstUser(t, client) - version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, nil) + authToken := uuid.NewString() + version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ + Parse: echo.ParseComplete, + ProvisionDryRun: echo.ProvisionComplete, + Provision: []*proto.Provision_Response{{ + Type: &proto.Provision_Response_Complete{ + Complete: &proto.Provision_Complete{ + Resources: []*proto.Resource{{ + Name: "example", + Type: "aws_instance", + Agents: []*proto.Agent{{ + Id: uuid.NewString(), + Auth: &proto.Agent_Token{ + Token: authToken, + }, + }}, + }}, + }, + }, + }}, + }) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - + coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - w, err := client.Workspace(ctx, workspace.ID) + wc, err := client.WatchWorkspace(ctx, workspace.ID) require.NoError(t, err) - wc, err := client.WatchWorkspace(ctx, w.ID) + coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) + // the workspace build being created + <-wc + // the workspace build being acquired + <-wc + // the workspace build completing + <-wc + + agentClient := codersdk.New(client.URL) + agentClient.SessionToken = authToken + agentCloser := agent.New(agent.Options{ + Client: agentClient, + Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + }) + defer func() { + _ = agentCloser.Close() + }() + + // the agent connected + <-wc + agentCloser.Close() + // the agent disconnected + <-wc + + closeFunc.Close() + build := coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) + // First is for the workspace build itself + <-wc + err = client.CancelWorkspaceBuild(ctx, build.ID) require.NoError(t, err) - for i := 0; i < 3; i++ { - _, more := <-wc - require.True(t, more) - } + // Second is for the build cancel + <-wc + + err = client.UpdateWorkspace(ctx, workspace.ID, codersdk.UpdateWorkspaceRequest{ + Name: "another", + }) + require.NoError(t, err) + <-wc + cancel() - require.EqualValues(t, codersdk.Workspace{}, <-wc) } func mustLocation(t *testing.T, location string) *time.Location { diff --git a/codersdk/workspaces.go b/codersdk/workspaces.go index 69d287a595ca6..bf8279a016d87 100644 --- a/codersdk/workspaces.go +++ b/codersdk/workspaces.go @@ -161,18 +161,19 @@ func (c *Client) WatchWorkspace(ctx context.Context, id uuid.UUID) (<-chan Works if err != nil { return } - if sse.Type == ServerSentEventTypeData { - var ws Workspace - b, ok := sse.Data.([]byte) - if !ok { - return - } - err = json.Unmarshal(b, &ws) - if err != nil { - return - } - wc <- ws + if sse.Type != ServerSentEventTypeData { + continue } + var ws Workspace + b, ok := sse.Data.([]byte) + if !ok { + return + } + err = json.Unmarshal(b, &ws) + if err != nil { + return + } + wc <- ws } } }() From febe12dcd9ae105ae3297d827ca943be297c0108 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Mon, 7 Nov 2022 15:00:56 +0000 Subject: [PATCH 2/2] Fix workspace tests waiting, erroring on workspace update, and add comments to workspace events --- coderd/workspaceagents.go | 8 ++------ coderd/workspacebuilds.go | 8 ++------ coderd/workspaces.go | 25 ++++++++++++------------- coderd/workspaces_test.go | 23 +++++++++++++++-------- 4 files changed, 31 insertions(+), 33 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 429184aea1c7d..b101789e15145 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -547,9 +547,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request _ = conn.Close(websocket.StatusGoingAway, err.Error()) return } - if !api.publishWorkspaceUpdate(ctx, rw, build.WorkspaceID) { - return - } + api.publishWorkspaceUpdate(ctx, build.WorkspaceID) // End span so we don't get long lived trace data. tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx)) @@ -1000,9 +998,7 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request) }) return } - if !api.publishWorkspaceUpdate(r.Context(), rw, workspace.ID) { - return - } + api.publishWorkspaceUpdate(r.Context(), workspace.ID) httpapi.Write(r.Context(), rw, http.StatusOK, nil) } diff --git a/coderd/workspacebuilds.go b/coderd/workspacebuilds.go index baaae39b9be44..2373556cb8417 100644 --- a/coderd/workspacebuilds.go +++ b/coderd/workspacebuilds.go @@ -574,9 +574,7 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) { return } - if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) { - return - } + api.publishWorkspaceUpdate(ctx, workspace.ID) httpapi.Write(ctx, rw, http.StatusCreated, apiBuild) } @@ -637,9 +635,7 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques return } - if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) { - return - } + api.publishWorkspaceUpdate(ctx, workspace.ID) httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{ Message: "Job has been marked as canceled...", diff --git a/coderd/workspaces.go b/coderd/workspaces.go index b68f7bcf04047..53e56be63fda4 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -634,9 +634,7 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) { return } - if !api.publishWorkspaceUpdate(ctx, rw, workspace.ID) { - return - } + api.publishWorkspaceUpdate(ctx, workspace.ID) aReq.New = newWorkspace rw.WriteHeader(http.StatusNoContent) @@ -923,14 +921,19 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) { }) }) if err != nil { - httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ - Message: "Internal error subscribing to workspace events.", - Detail: err.Error(), + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeError, + Data: codersdk.Response{ + Message: "Internal error subscribing to workspace events.", + Detail: err.Error(), + }, }) return } defer cancelSubscribe() + // An initial ping signals to the request that the server is now ready + // and the client can begin servicing a channel with data. _ = sendEvent(ctx, codersdk.ServerSentEvent{ Type: codersdk.ServerSentEventTypePing, }) @@ -1234,14 +1237,10 @@ func watchWorkspaceChannel(id uuid.UUID) string { return fmt.Sprintf("workspace:%s", id) } -func (api *API) publishWorkspaceUpdate(ctx context.Context, rw http.ResponseWriter, workspaceID uuid.UUID) bool { +func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) { err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{}) if err != nil { - httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ - Message: "Internal error publishing workspace update.", - Detail: err.Error(), - }) - return false + api.Logger.Warn(ctx, "failed to publish workspace update", + slog.F("workspace_id", workspaceID), slog.Error(err)) } - return true } diff --git a/coderd/workspaces_test.go b/coderd/workspaces_test.go index 6dedc38931e60..d3de52fd2f623 100644 --- a/coderd/workspaces_test.go +++ b/coderd/workspaces_test.go @@ -1383,14 +1383,21 @@ func TestWorkspaceWatcher(t *testing.T) { wc, err := client.WatchWorkspace(ctx, workspace.ID) require.NoError(t, err) + wait := func() { + select { + case <-ctx.Done(): + t.Fail() + case <-wc: + } + } coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) // the workspace build being created - <-wc + wait() // the workspace build being acquired - <-wc + wait() // the workspace build completing - <-wc + wait() agentClient := codersdk.New(client.URL) agentClient.SessionToken = authToken @@ -1403,25 +1410,25 @@ func TestWorkspaceWatcher(t *testing.T) { }() // the agent connected - <-wc + wait() agentCloser.Close() // the agent disconnected - <-wc + wait() closeFunc.Close() build := coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) // First is for the workspace build itself - <-wc + wait() err = client.CancelWorkspaceBuild(ctx, build.ID) require.NoError(t, err) // Second is for the build cancel - <-wc + wait() err = client.UpdateWorkspace(ctx, workspace.ID, codersdk.UpdateWorkspaceRequest{ Name: "another", }) require.NoError(t, err) - <-wc + wait() cancel() }