diff --git a/coderd/activitybump.go b/coderd/activitybump.go index fa90f7d275daf..0dc9db8adb0ad 100644 --- a/coderd/activitybump.go +++ b/coderd/activitybump.go @@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas newDeadline := database.Now().Add(bumpAmount) - if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: build.ID, UpdatedAt: database.Now(), ProvisionerState: build.ProvisionerState, diff --git a/coderd/database/databasefake/databasefake.go b/coderd/database/databasefake/databasefake.go index a9117f2b2efca..660b04b0d3a13 100644 --- a/coderd/database/databasefake/databasefake.go +++ b/coderd/database/databasefake/databasefake.go @@ -2823,7 +2823,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database. return sql.ErrNoRows } -func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error { +func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) (database.WorkspaceBuild, error) { q.mutex.Lock() defer q.mutex.Unlock() @@ -2835,9 +2835,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U workspaceBuild.ProvisionerState = arg.ProvisionerState workspaceBuild.Deadline = arg.Deadline q.workspaceBuilds[index] = workspaceBuild - return nil + return workspaceBuild, nil } - return sql.ErrNoRows + return database.WorkspaceBuild{}, sql.ErrNoRows } func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error { diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 25ab45c21df96..7a4fa43ffc5a5 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -187,7 +187,7 @@ type sqlcQuerier interface { UpdateWorkspaceAgentVersionByID(ctx context.Context, arg UpdateWorkspaceAgentVersionByIDParams) error UpdateWorkspaceAppHealthByID(ctx context.Context, arg UpdateWorkspaceAppHealthByIDParams) error UpdateWorkspaceAutostart(ctx context.Context, arg UpdateWorkspaceAutostartParams) error - UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) error + UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) (WorkspaceBuild, error) UpdateWorkspaceDeletedByID(ctx context.Context, arg UpdateWorkspaceDeletedByIDParams) error UpdateWorkspaceLastUsedAt(ctx context.Context, arg UpdateWorkspaceLastUsedAtParams) error UpdateWorkspaceTTL(ctx context.Context, arg UpdateWorkspaceTTLParams) error diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index 016110e045b05..bfb65b30dc9d5 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -5492,7 +5492,7 @@ func (q *sqlQuerier) InsertWorkspaceBuild(ctx context.Context, arg InsertWorkspa return i, err } -const updateWorkspaceBuildByID = `-- name: UpdateWorkspaceBuildByID :exec +const updateWorkspaceBuildByID = `-- name: UpdateWorkspaceBuildByID :one UPDATE workspace_builds SET @@ -5500,7 +5500,7 @@ SET provisioner_state = $3, deadline = $4 WHERE - id = $1 + id = $1 RETURNING id, created_at, updated_at, workspace_id, template_version_id, build_number, transition, initiator_id, provisioner_state, job_id, deadline, reason ` type UpdateWorkspaceBuildByIDParams struct { @@ -5510,14 +5510,29 @@ type UpdateWorkspaceBuildByIDParams struct { Deadline time.Time `db:"deadline" json:"deadline"` } -func (q *sqlQuerier) UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) error { - _, err := q.db.ExecContext(ctx, updateWorkspaceBuildByID, +func (q *sqlQuerier) UpdateWorkspaceBuildByID(ctx context.Context, arg UpdateWorkspaceBuildByIDParams) (WorkspaceBuild, error) { + row := q.db.QueryRowContext(ctx, updateWorkspaceBuildByID, arg.ID, arg.UpdatedAt, arg.ProvisionerState, arg.Deadline, ) - return err + var i WorkspaceBuild + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.WorkspaceID, + &i.TemplateVersionID, + &i.BuildNumber, + &i.Transition, + &i.InitiatorID, + &i.ProvisionerState, + &i.JobID, + &i.Deadline, + &i.Reason, + ) + return i, err } const getWorkspaceResourceByID = `-- name: GetWorkspaceResourceByID :one diff --git a/coderd/database/queries/workspacebuilds.sql b/coderd/database/queries/workspacebuilds.sql index cdb7d0e130f07..d560a91d996f5 100644 --- a/coderd/database/queries/workspacebuilds.sql +++ b/coderd/database/queries/workspacebuilds.sql @@ -124,7 +124,7 @@ INSERT INTO VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *; --- name: UpdateWorkspaceBuildByID :exec +-- name: UpdateWorkspaceBuildByID :one UPDATE workspace_builds SET @@ -132,4 +132,4 @@ SET provisioner_state = $3, deadline = $4 WHERE - id = $1; + id = $1 RETURNING *; diff --git a/coderd/httpapi/httpapi.go b/coderd/httpapi/httpapi.go index 5e4648828fd37..d2f1c07de0277 100644 --- a/coderd/httpapi/httpapi.go +++ b/coderd/httpapi/httpapi.go @@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f buf := &bytes.Buffer{} enc := json.NewEncoder(buf) - _, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type)) + _, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type)) if err != nil { return err } - err = enc.Encode(sse.Data) - if err != nil { - return err + if sse.Data != nil { + _, err = buf.WriteString("data: ") + if err != nil { + return err + } + err = enc.Encode(sse.Data) + if err != nil { + return err + } } err = buf.WriteByte('\n') diff --git a/coderd/provisionerdaemons.go b/coderd/provisionerdaemons.go index 87a9b9a98320a..1399d4a7c1eb1 100644 --- a/coderd/provisionerdaemons.go +++ b/coderd/provisionerdaemons.go @@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty if err != nil { return nil, failJob(fmt.Sprintf("get owner: %s", err)) } + err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{}) + if err != nil { + return nil, failJob(fmt.Sprintf("publish workspace update: %s", err)) + } // Compute parameters for the workspace to consume. parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{ @@ -543,7 +547,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa if err != nil { return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err) } - err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: input.WorkspaceBuildID, UpdatedAt: database.Now(), ProvisionerState: jobType.WorkspaceBuild.State, @@ -552,6 +556,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa if err != nil { return nil, xerrors.Errorf("update workspace build state: %w", err) } + err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{}) + if err != nil { + return nil, xerrors.Errorf("update workspace: %w", err) + } case *proto.FailedJob_TemplateImport_: } @@ -657,7 +665,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr if err != nil { return xerrors.Errorf("update provisioner job: %w", err) } - err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + _, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: workspaceBuild.ID, Deadline: workspaceDeadline, ProvisionerState: jobType.WorkspaceBuild.State, @@ -692,6 +700,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr if err != nil { return nil, xerrors.Errorf("complete job: %w", err) } + + err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{}) + if err != nil { + return nil, xerrors.Errorf("update workspace: %w", err) + } case *proto.CompletedJob_TemplateDryRun_: for _, resource := range jobType.TemplateDryRun.Resources { server.Logger.Info(ctx, "inserting template dry-run job resource", diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index f754b7a2c162e..b101789e15145 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -539,6 +539,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request Valid: true, } _ = updateConnectionTimes() + _ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{}) }() err = updateConnectionTimes() @@ -546,6 +547,7 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request _ = conn.Close(websocket.StatusGoingAway, err.Error()) return } + api.publishWorkspaceUpdate(ctx, build.WorkspaceID) // End span so we don't get long lived trace data. tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx)) @@ -972,6 +974,32 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request) } } + resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID) + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error fetching workspace resource.", + Detail: err.Error(), + }) + return + } + job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID) + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error fetching workspace build.", + Detail: err.Error(), + }) + return + } + workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID) + if err != nil { + httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error fetching workspace.", + Detail: err.Error(), + }) + return + } + api.publishWorkspaceUpdate(r.Context(), workspace.ID) + httpapi.Write(r.Context(), rw, http.StatusOK, nil) } diff --git a/coderd/workspacebuilds.go b/coderd/workspacebuilds.go index 999b7317b4da7..2373556cb8417 100644 --- a/coderd/workspacebuilds.go +++ b/coderd/workspacebuilds.go @@ -574,6 +574,8 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) { return } + api.publishWorkspaceUpdate(ctx, workspace.ID) + httpapi.Write(ctx, rw, http.StatusCreated, apiBuild) } @@ -632,6 +634,9 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques }) return } + + api.publishWorkspaceUpdate(ctx, workspace.ID) + httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{ Message: "Job has been marked as canceled...", }) diff --git a/coderd/workspaces.go b/coderd/workspaces.go index 2b3d077d106ac..53e56be63fda4 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -634,6 +634,8 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) { return } + api.publishWorkspaceUpdate(ctx, workspace.ID) + aReq.New = newWorkspace rw.WriteHeader(http.StatusNoContent) } @@ -839,7 +841,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) { return err } - if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ + if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{ ID: build.ID, UpdatedAt: build.UpdatedAt, ProvisionerState: build.ProvisionerState, @@ -883,48 +885,65 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) { // Ignore all trace spans after this, they're not too useful. ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan) - t := time.NewTicker(time.Second * 1) - defer t.Stop() + cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) { + workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID) + if err != nil { + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeError, + Data: codersdk.Response{ + Message: "Internal error fetching workspace.", + Detail: err.Error(), + }, + }) + return + } + + data, err := api.workspaceData(ctx, []database.Workspace{workspace}) + if err != nil { + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeError, + Data: codersdk.Response{ + Message: "Internal error fetching workspace data.", + Detail: err.Error(), + }, + }) + return + } + + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeData, + Data: convertWorkspace( + workspace, + data.builds[0], + data.templates[0], + findUser(workspace.OwnerID, data.users), + ), + }) + }) + if err != nil { + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeError, + Data: codersdk.Response{ + Message: "Internal error subscribing to workspace events.", + Detail: err.Error(), + }, + }) + return + } + defer cancelSubscribe() + + // An initial ping signals to the request that the server is now ready + // and the client can begin servicing a channel with data. + _ = sendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypePing, + }) + for { select { case <-ctx.Done(): return case <-senderClosed: return - case <-t.C: - workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID) - if err != nil { - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeError, - Data: codersdk.Response{ - Message: "Internal error fetching workspace.", - Detail: err.Error(), - }, - }) - return - } - - data, err := api.workspaceData(ctx, []database.Workspace{workspace}) - if err != nil { - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeError, - Data: codersdk.Response{ - Message: "Internal error fetching workspace data.", - Detail: err.Error(), - }, - }) - return - } - - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeData, - Data: convertWorkspace( - workspace, - data.builds[0], - data.templates[0], - findUser(workspace.OwnerID, data.users), - ), - }) } } } @@ -1213,3 +1232,15 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes return parts } + +func watchWorkspaceChannel(id uuid.UUID) string { + return fmt.Sprintf("workspace:%s", id) +} + +func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) { + err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{}) + if err != nil { + api.Logger.Warn(ctx, "failed to publish workspace update", + slog.F("workspace_id", workspaceID), slog.Error(err)) + } +} diff --git a/coderd/workspaces_test.go b/coderd/workspaces_test.go index 06d98ce87352d..d3de52fd2f623 100644 --- a/coderd/workspaces_test.go +++ b/coderd/workspaces_test.go @@ -12,6 +12,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + + "github.com/coder/coder/agent" "github.com/coder/coder/coderd/audit" "github.com/coder/coder/coderd/autobuild/schedule" "github.com/coder/coder/coderd/coderdtest" @@ -1347,27 +1351,86 @@ func TestWorkspaceExtend(t *testing.T) { func TestWorkspaceWatcher(t *testing.T) { t.Parallel() - client := coderdtest.New(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) + client, closeFunc := coderdtest.NewWithProvisionerCloser(t, &coderdtest.Options{IncludeProvisionerDaemon: true}) user := coderdtest.CreateFirstUser(t, client) - version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, nil) + authToken := uuid.NewString() + version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ + Parse: echo.ParseComplete, + ProvisionDryRun: echo.ProvisionComplete, + Provision: []*proto.Provision_Response{{ + Type: &proto.Provision_Response_Complete{ + Complete: &proto.Provision_Complete{ + Resources: []*proto.Resource{{ + Name: "example", + Type: "aws_instance", + Agents: []*proto.Agent{{ + Id: uuid.NewString(), + Auth: &proto.Agent_Token{ + Token: authToken, + }, + }}, + }}, + }, + }, + }}, + }) coderdtest.AwaitTemplateVersionJob(t, client, version.ID) template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - + coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong) defer cancel() - w, err := client.Workspace(ctx, workspace.ID) + wc, err := client.WatchWorkspace(ctx, workspace.ID) require.NoError(t, err) + wait := func() { + select { + case <-ctx.Done(): + t.Fail() + case <-wc: + } + } - wc, err := client.WatchWorkspace(ctx, w.ID) + coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) + // the workspace build being created + wait() + // the workspace build being acquired + wait() + // the workspace build completing + wait() + + agentClient := codersdk.New(client.URL) + agentClient.SessionToken = authToken + agentCloser := agent.New(agent.Options{ + Client: agentClient, + Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug), + }) + defer func() { + _ = agentCloser.Close() + }() + + // the agent connected + wait() + agentCloser.Close() + // the agent disconnected + wait() + + closeFunc.Close() + build := coderdtest.CreateWorkspaceBuild(t, client, workspace, database.WorkspaceTransitionStart) + // First is for the workspace build itself + wait() + err = client.CancelWorkspaceBuild(ctx, build.ID) require.NoError(t, err) - for i := 0; i < 3; i++ { - _, more := <-wc - require.True(t, more) - } + // Second is for the build cancel + wait() + + err = client.UpdateWorkspace(ctx, workspace.ID, codersdk.UpdateWorkspaceRequest{ + Name: "another", + }) + require.NoError(t, err) + wait() + cancel() - require.EqualValues(t, codersdk.Workspace{}, <-wc) } func mustLocation(t *testing.T, location string) *time.Location { diff --git a/codersdk/workspaces.go b/codersdk/workspaces.go index 69d287a595ca6..bf8279a016d87 100644 --- a/codersdk/workspaces.go +++ b/codersdk/workspaces.go @@ -161,18 +161,19 @@ func (c *Client) WatchWorkspace(ctx context.Context, id uuid.UUID) (<-chan Works if err != nil { return } - if sse.Type == ServerSentEventTypeData { - var ws Workspace - b, ok := sse.Data.([]byte) - if !ok { - return - } - err = json.Unmarshal(b, &ws) - if err != nil { - return - } - wc <- ws + if sse.Type != ServerSentEventTypeData { + continue } + var ws Workspace + b, ok := sse.Data.([]byte) + if !ok { + return + } + err = json.Unmarshal(b, &ws) + if err != nil { + return + } + wc <- ws } } }()