Skip to content

Commit 56b963a

Browse files
authored
feat: Make workspace watching realtime instead of polling (#4922)
* feat: Make workspace watching realtime instead of polling This was leading to performance issues on the frontend, where the page should only be rendered if changes occur. While this could be changed on the frontend, it was always the intention to make this socket ~realtime anyways. * Fix workspace tests waiting, erroring on workspace update, and add comments to workspace events
1 parent a5cc197 commit 56b963a

12 files changed

+238
-76
lines changed

coderd/activitybump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas
5454

5555
newDeadline := database.Now().Add(bumpAmount)
5656

57-
if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
57+
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
5858
ID: build.ID,
5959
UpdatedAt: database.Now(),
6060
ProvisionerState: build.ProvisionerState,

coderd/database/databasefake/databasefake.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2825,7 +2825,7 @@ func (q *fakeQuerier) UpdateWorkspaceLastUsedAt(_ context.Context, arg database.
28252825
return sql.ErrNoRows
28262826
}
28272827

2828-
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) error {
2828+
func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.UpdateWorkspaceBuildByIDParams) (database.WorkspaceBuild, error) {
28292829
q.mutex.Lock()
28302830
defer q.mutex.Unlock()
28312831

@@ -2837,9 +2837,9 @@ func (q *fakeQuerier) UpdateWorkspaceBuildByID(_ context.Context, arg database.U
28372837
workspaceBuild.ProvisionerState = arg.ProvisionerState
28382838
workspaceBuild.Deadline = arg.Deadline
28392839
q.workspaceBuilds[index] = workspaceBuild
2840-
return nil
2840+
return workspaceBuild, nil
28412841
}
2842-
return sql.ErrNoRows
2842+
return database.WorkspaceBuild{}, sql.ErrNoRows
28432843
}
28442844

28452845
func (q *fakeQuerier) UpdateWorkspaceDeletedByID(_ context.Context, arg database.UpdateWorkspaceDeletedByIDParams) error {

coderd/database/querier.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries.sql.go

Lines changed: 20 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/database/queries/workspacebuilds.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ INSERT INTO
124124
VALUES
125125
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *;
126126

127-
-- name: UpdateWorkspaceBuildByID :exec
127+
-- name: UpdateWorkspaceBuildByID :one
128128
UPDATE
129129
workspace_builds
130130
SET
131131
updated_at = $2,
132132
provisioner_state = $3,
133133
deadline = $4
134134
WHERE
135-
id = $1;
135+
id = $1 RETURNING *;

coderd/httpapi/httpapi.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,20 @@ func ServerSentEventSender(rw http.ResponseWriter, r *http.Request) (sendEvent f
228228
buf := &bytes.Buffer{}
229229
enc := json.NewEncoder(buf)
230230

231-
_, err := buf.WriteString(fmt.Sprintf("event: %s\ndata: ", sse.Type))
231+
_, err := buf.WriteString(fmt.Sprintf("event: %s\n", sse.Type))
232232
if err != nil {
233233
return err
234234
}
235235

236-
err = enc.Encode(sse.Data)
237-
if err != nil {
238-
return err
236+
if sse.Data != nil {
237+
_, err = buf.WriteString("data: ")
238+
if err != nil {
239+
return err
240+
}
241+
err = enc.Encode(sse.Data)
242+
if err != nil {
243+
return err
244+
}
239245
}
240246

241247
err = buf.WriteByte('\n')

coderd/provisionerdaemons.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
223223
if err != nil {
224224
return nil, failJob(fmt.Sprintf("get owner: %s", err))
225225
}
226+
err = server.Pubsub.Publish(watchWorkspaceChannel(workspace.ID), []byte{})
227+
if err != nil {
228+
return nil, failJob(fmt.Sprintf("publish workspace update: %s", err))
229+
}
226230

227231
// Compute parameters for the workspace to consume.
228232
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
@@ -547,7 +551,7 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
547551
if err != nil {
548552
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
549553
}
550-
err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
554+
build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
551555
ID: input.WorkspaceBuildID,
552556
UpdatedAt: database.Now(),
553557
ProvisionerState: jobType.WorkspaceBuild.State,
@@ -556,6 +560,10 @@ func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.Fa
556560
if err != nil {
557561
return nil, xerrors.Errorf("update workspace build state: %w", err)
558562
}
563+
err = server.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
564+
if err != nil {
565+
return nil, xerrors.Errorf("update workspace: %w", err)
566+
}
559567
case *proto.FailedJob_TemplateImport_:
560568
}
561569

@@ -661,7 +669,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
661669
if err != nil {
662670
return xerrors.Errorf("update provisioner job: %w", err)
663671
}
664-
err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
672+
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
665673
ID: workspaceBuild.ID,
666674
Deadline: workspaceDeadline,
667675
ProvisionerState: jobType.WorkspaceBuild.State,
@@ -696,6 +704,11 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
696704
if err != nil {
697705
return nil, xerrors.Errorf("complete job: %w", err)
698706
}
707+
708+
err = server.Pubsub.Publish(watchWorkspaceChannel(workspaceBuild.WorkspaceID), []byte{})
709+
if err != nil {
710+
return nil, xerrors.Errorf("update workspace: %w", err)
711+
}
699712
case *proto.CompletedJob_TemplateDryRun_:
700713
for _, resource := range jobType.TemplateDryRun.Resources {
701714
server.Logger.Info(ctx, "inserting template dry-run job resource",

coderd/workspaceagents.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,13 +539,15 @@ func (api *API) workspaceAgentCoordinate(rw http.ResponseWriter, r *http.Request
539539
Valid: true,
540540
}
541541
_ = updateConnectionTimes()
542+
_ = api.Pubsub.Publish(watchWorkspaceChannel(build.WorkspaceID), []byte{})
542543
}()
543544

544545
err = updateConnectionTimes()
545546
if err != nil {
546547
_ = conn.Close(websocket.StatusGoingAway, err.Error())
547548
return
548549
}
550+
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
549551

550552
// End span so we don't get long lived trace data.
551553
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
@@ -972,6 +974,32 @@ func (api *API) postWorkspaceAppHealth(rw http.ResponseWriter, r *http.Request)
972974
}
973975
}
974976

977+
resource, err := api.Database.GetWorkspaceResourceByID(r.Context(), workspaceAgent.ResourceID)
978+
if err != nil {
979+
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
980+
Message: "Internal error fetching workspace resource.",
981+
Detail: err.Error(),
982+
})
983+
return
984+
}
985+
job, err := api.Database.GetWorkspaceBuildByJobID(r.Context(), resource.JobID)
986+
if err != nil {
987+
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
988+
Message: "Internal error fetching workspace build.",
989+
Detail: err.Error(),
990+
})
991+
return
992+
}
993+
workspace, err := api.Database.GetWorkspaceByID(r.Context(), job.WorkspaceID)
994+
if err != nil {
995+
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
996+
Message: "Internal error fetching workspace.",
997+
Detail: err.Error(),
998+
})
999+
return
1000+
}
1001+
api.publishWorkspaceUpdate(r.Context(), workspace.ID)
1002+
9751003
httpapi.Write(r.Context(), rw, http.StatusOK, nil)
9761004
}
9771005

coderd/workspacebuilds.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,8 @@ func (api *API) postWorkspaceBuilds(rw http.ResponseWriter, r *http.Request) {
574574
return
575575
}
576576

577+
api.publishWorkspaceUpdate(ctx, workspace.ID)
578+
577579
httpapi.Write(ctx, rw, http.StatusCreated, apiBuild)
578580
}
579581

@@ -632,6 +634,9 @@ func (api *API) patchCancelWorkspaceBuild(rw http.ResponseWriter, r *http.Reques
632634
})
633635
return
634636
}
637+
638+
api.publishWorkspaceUpdate(ctx, workspace.ID)
639+
635640
httpapi.Write(ctx, rw, http.StatusOK, codersdk.Response{
636641
Message: "Job has been marked as canceled...",
637642
})

coderd/workspaces.go

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,8 @@ func (api *API) patchWorkspace(rw http.ResponseWriter, r *http.Request) {
634634
return
635635
}
636636

637+
api.publishWorkspaceUpdate(ctx, workspace.ID)
638+
637639
aReq.New = newWorkspace
638640
rw.WriteHeader(http.StatusNoContent)
639641
}
@@ -839,7 +841,7 @@ func (api *API) putExtendWorkspace(rw http.ResponseWriter, r *http.Request) {
839841
return err
840842
}
841843

842-
if err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
844+
if _, err := s.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
843845
ID: build.ID,
844846
UpdatedAt: build.UpdatedAt,
845847
ProvisionerState: build.ProvisionerState,
@@ -883,48 +885,65 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
883885
// Ignore all trace spans after this, they're not too useful.
884886
ctx = trace.ContextWithSpan(ctx, tracing.NoopSpan)
885887

886-
t := time.NewTicker(time.Second * 1)
887-
defer t.Stop()
888+
cancelSubscribe, err := api.Pubsub.Subscribe(watchWorkspaceChannel(workspace.ID), func(_ context.Context, _ []byte) {
889+
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
890+
if err != nil {
891+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
892+
Type: codersdk.ServerSentEventTypeError,
893+
Data: codersdk.Response{
894+
Message: "Internal error fetching workspace.",
895+
Detail: err.Error(),
896+
},
897+
})
898+
return
899+
}
900+
901+
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
902+
if err != nil {
903+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
904+
Type: codersdk.ServerSentEventTypeError,
905+
Data: codersdk.Response{
906+
Message: "Internal error fetching workspace data.",
907+
Detail: err.Error(),
908+
},
909+
})
910+
return
911+
}
912+
913+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
914+
Type: codersdk.ServerSentEventTypeData,
915+
Data: convertWorkspace(
916+
workspace,
917+
data.builds[0],
918+
data.templates[0],
919+
findUser(workspace.OwnerID, data.users),
920+
),
921+
})
922+
})
923+
if err != nil {
924+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
925+
Type: codersdk.ServerSentEventTypeError,
926+
Data: codersdk.Response{
927+
Message: "Internal error subscribing to workspace events.",
928+
Detail: err.Error(),
929+
},
930+
})
931+
return
932+
}
933+
defer cancelSubscribe()
934+
935+
// An initial ping signals to the request that the server is now ready
936+
// and the client can begin servicing a channel with data.
937+
_ = sendEvent(ctx, codersdk.ServerSentEvent{
938+
Type: codersdk.ServerSentEventTypePing,
939+
})
940+
888941
for {
889942
select {
890943
case <-ctx.Done():
891944
return
892945
case <-senderClosed:
893946
return
894-
case <-t.C:
895-
workspace, err := api.Database.GetWorkspaceByID(ctx, workspace.ID)
896-
if err != nil {
897-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
898-
Type: codersdk.ServerSentEventTypeError,
899-
Data: codersdk.Response{
900-
Message: "Internal error fetching workspace.",
901-
Detail: err.Error(),
902-
},
903-
})
904-
return
905-
}
906-
907-
data, err := api.workspaceData(ctx, []database.Workspace{workspace})
908-
if err != nil {
909-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
910-
Type: codersdk.ServerSentEventTypeError,
911-
Data: codersdk.Response{
912-
Message: "Internal error fetching workspace data.",
913-
Detail: err.Error(),
914-
},
915-
})
916-
return
917-
}
918-
919-
_ = sendEvent(ctx, codersdk.ServerSentEvent{
920-
Type: codersdk.ServerSentEventTypeData,
921-
Data: convertWorkspace(
922-
workspace,
923-
data.builds[0],
924-
data.templates[0],
925-
findUser(workspace.OwnerID, data.users),
926-
),
927-
})
928947
}
929948
}
930949
}
@@ -1213,3 +1232,15 @@ func splitQueryParameterByDelimiter(query string, delimiter rune, maintainQuotes
12131232

12141233
return parts
12151234
}
1235+
1236+
func watchWorkspaceChannel(id uuid.UUID) string {
1237+
return fmt.Sprintf("workspace:%s", id)
1238+
}
1239+
1240+
func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUID) {
1241+
err := api.Pubsub.Publish(watchWorkspaceChannel(workspaceID), []byte{})
1242+
if err != nil {
1243+
api.Logger.Warn(ctx, "failed to publish workspace update",
1244+
slog.F("workspace_id", workspaceID), slog.Error(err))
1245+
}
1246+
}

0 commit comments

Comments
 (0)