Skip to content

Commit de1435d

Browse files
committed
redesign
1 parent 13d9896 commit de1435d

File tree

5 files changed

+141
-184
lines changed

5 files changed

+141
-184
lines changed

coderd/workspaces.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2089,11 +2089,6 @@ func (api *API) publishWorkspaceUpdate(ctx context.Context, ownerID uuid.UUID, e
20892089
api.Logger.Warn(ctx, "failed to publish workspace update",
20902090
slog.F("workspace_id", event.WorkspaceID), slog.Error(err))
20912091
}
2092-
err = api.Pubsub.Publish(codersdk.AllWorkspacesNotifyChannel, []byte(workspaceID.String()))
2093-
if err != nil {
2094-
api.Logger.Warn(ctx, "failed to publish all workspaces update",
2095-
slog.F("workspace_id", workspaceID), slog.Error(err))
2096-
}
20972092
}
20982093

20992094
func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAgentID uuid.UUID, m agentsdk.LogsNotifyMessage) {

coderd/workspaceupdates.go

Lines changed: 128 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,18 @@ package coderd
22

33
import (
44
"context"
5-
"database/sql"
65
"sync"
76

87
"github.com/google/uuid"
98
"golang.org/x/xerrors"
109

1110
"github.com/coder/coder/v2/coderd/database"
1211
"github.com/coder/coder/v2/coderd/database/pubsub"
13-
"github.com/coder/coder/v2/coderd/util/slice"
1412
"github.com/coder/coder/v2/codersdk"
1513
"github.com/coder/coder/v2/tailnet"
1614
"github.com/coder/coder/v2/tailnet/proto"
1715
)
1816

19-
type workspacesByOwner map[uuid.UUID]workspacesByID
20-
2117
type workspacesByID map[uuid.UUID]ownedWorkspace
2218

2319
type ownedWorkspace struct {
@@ -27,140 +23,147 @@ type ownedWorkspace struct {
2723
Agents []database.AgentIDNamePair
2824
}
2925

30-
// Equal does not compare agents
31-
func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
32-
return w.WorkspaceName == other.WorkspaceName &&
33-
w.JobStatus == other.JobStatus &&
34-
w.Transition == other.Transition
35-
}
36-
37-
func convertRows(v []database.GetWorkspacesAndAgentsRow) workspacesByOwner {
38-
m := make(map[uuid.UUID]workspacesByID)
39-
for _, ws := range v {
40-
owned := ownedWorkspace{
41-
WorkspaceName: ws.Name,
42-
JobStatus: ws.JobStatus,
43-
Transition: ws.Transition,
44-
Agents: ws.Agents,
45-
}
46-
if byID, exists := m[ws.OwnerID]; !exists {
47-
m[ws.OwnerID] = map[uuid.UUID]ownedWorkspace{ws.ID: owned}
48-
} else {
49-
byID[ws.ID] = owned
50-
m[ws.OwnerID] = byID
51-
}
52-
}
53-
return workspacesByOwner(m)
54-
}
55-
5626
func convertStatus(status database.ProvisionerJobStatus, trans database.WorkspaceTransition) proto.Workspace_Status {
5727
wsStatus := codersdk.ConvertWorkspaceStatus(codersdk.ProvisionerJobStatus(status), codersdk.WorkspaceTransition(trans))
5828
return tailnet.WorkspaceStatusToProto(wsStatus)
5929
}
6030

6131
type sub struct {
62-
mu sync.Mutex
32+
mu sync.RWMutex
6333
userID uuid.UUID
6434
tx chan<- *proto.WorkspaceUpdate
65-
prev workspacesByID
35+
state workspacesByID
36+
37+
db UpdateQuerier
38+
ps pubsub.Pubsub
39+
40+
cancelFn func()
41+
}
42+
43+
func (s *sub) ownsAgent(agentID uuid.UUID) bool {
44+
s.mu.RLock()
45+
defer s.mu.RUnlock()
46+
47+
for _, ws := range s.state {
48+
for _, agent := range ws.Agents {
49+
if agent.ID == agentID {
50+
return true
51+
}
52+
}
53+
}
54+
return false
55+
}
56+
57+
func (s *sub) handleEvent(_ context.Context, event codersdk.WorkspaceEvent) {
58+
s.mu.Lock()
59+
defer s.mu.Unlock()
60+
61+
out := &proto.WorkspaceUpdate{
62+
UpsertedWorkspaces: []*proto.Workspace{},
63+
UpsertedAgents: []*proto.Agent{},
64+
DeletedWorkspaces: []*proto.Workspace{},
65+
DeletedAgents: []*proto.Agent{},
66+
}
67+
68+
switch event.Kind {
69+
case codersdk.WorkspaceEventKindNewAgent:
70+
out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{
71+
WorkspaceId: tailnet.UUIDToByteSlice(event.WorkspaceID),
72+
Id: tailnet.UUIDToByteSlice(*event.AgentID),
73+
Name: *event.AgentName,
74+
})
75+
ws, ok := s.state[event.WorkspaceID]
76+
if !ok {
77+
break
78+
}
79+
ws.Agents = append(ws.Agents, database.AgentIDNamePair{
80+
ID: *event.AgentID,
81+
Name: *event.AgentName,
82+
})
83+
s.state[event.WorkspaceID] = ws
84+
case codersdk.WorkspaceEventKindStateChange:
85+
// TODO: One event for both upsertions and deletions
86+
default:
87+
return
88+
}
89+
s.tx <- out
90+
}
91+
92+
// start subscribes to updates for all workspaces owned by the user
93+
func (s *sub) start() (err error) {
94+
s.mu.Lock()
95+
defer s.mu.Unlock()
96+
97+
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(context.Background(), s.userID)
98+
if err != nil {
99+
return xerrors.Errorf("get workspaces and agents by owner ID: %w", err)
100+
}
101+
102+
initUpdate := produceInitialUpdate(rows)
103+
s.tx <- initUpdate
104+
initState := convertRows(rows)
105+
s.state = initState
106+
107+
cancel, err := s.ps.Subscribe(codersdk.WorkspaceEventChannel(s.userID), codersdk.HandleWorkspaceEvent(s.handleEvent))
108+
if err != nil {
109+
return xerrors.Errorf("subscribe to workspace event channel: %w", err)
110+
}
111+
112+
s.cancelFn = cancel
113+
return nil
66114
}
67115

68-
func (s *sub) send(all workspacesByOwner) {
116+
func (s *sub) stop() {
69117
s.mu.Lock()
70118
defer s.mu.Unlock()
71119

72-
// Filter to only the workspaces owned by the user
73-
latest := all[s.userID]
74-
update := produceUpdate(s.prev, latest)
75-
s.prev = latest
76-
s.tx <- update
120+
if s.cancelFn != nil {
121+
s.cancelFn()
122+
}
123+
124+
close(s.tx)
77125
}
78126

79127
type UpdateQuerier interface {
80-
GetWorkspacesAndAgents(ctx context.Context) ([]database.GetWorkspacesAndAgentsRow, error)
128+
GetWorkspacesAndAgentsByOwnerID(ctx context.Context, ownerID uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error)
81129
}
82130

83131
type updatesProvider struct {
84132
mu sync.RWMutex
85-
db UpdateQuerier
86-
ps pubsub.Pubsub
87133
// Peer ID -> subscription
88134
subs map[uuid.UUID]*sub
89-
// Owner ID -> workspace ID -> workspace
90-
latest workspacesByOwner
91-
cancelFn func()
135+
136+
db UpdateQuerier
137+
ps pubsub.Pubsub
92138
}
93139

94-
func (u *updatesProvider) IsOwner(userID uuid.UUID, agentID uuid.UUID) error {
140+
func (u *updatesProvider) IsOwner(userID uuid.UUID, agentID uuid.UUID) bool {
95141
u.mu.RLock()
96142
defer u.mu.RUnlock()
97143

98-
workspaces, exists := u.latest[userID]
99-
if !exists {
100-
return xerrors.Errorf("workspace agent not found or you do not have permission: %w", sql.ErrNoRows)
101-
}
102-
for _, workspace := range workspaces {
103-
for _, agent := range workspace.Agents {
104-
if agent.ID == agentID {
105-
return nil
106-
}
144+
for _, sub := range u.subs {
145+
if sub.userID == userID && sub.ownsAgent(agentID) {
146+
return true
107147
}
108148
}
109-
return xerrors.Errorf("workspace agent not found or you do not have permission: %w", sql.ErrNoRows)
149+
return false
110150
}
111151

112152
var _ tailnet.WorkspaceUpdatesProvider = (*updatesProvider)(nil)
113153

114-
func NewUpdatesProvider(ctx context.Context, db UpdateQuerier, ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider, error) {
115-
rows, err := db.GetWorkspacesAndAgents(ctx)
116-
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
117-
return nil, err
118-
}
154+
func NewUpdatesProvider(_ context.Context, db UpdateQuerier, ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider, error) {
119155
out := &updatesProvider{
120-
db: db,
121-
ps: ps,
122-
subs: map[uuid.UUID]*sub{},
123-
latest: convertRows(rows),
124-
}
125-
cancel, err := ps.Subscribe(codersdk.AllWorkspacesNotifyChannel, out.handleUpdate)
126-
if err != nil {
127-
return nil, err
156+
db: db,
157+
ps: ps,
158+
subs: map[uuid.UUID]*sub{},
128159
}
129-
out.cancelFn = cancel
130160
return out, nil
131161
}
132162

133163
func (u *updatesProvider) Stop() {
134164
for _, sub := range u.subs {
135-
close(sub.tx)
165+
sub.stop()
136166
}
137-
u.cancelFn()
138-
}
139-
140-
func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
141-
rows, err := u.db.GetWorkspacesAndAgents(ctx)
142-
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
143-
// TODO: Log
144-
return
145-
}
146-
147-
wg := &sync.WaitGroup{}
148-
latest := convertRows(rows)
149-
u.mu.RLock()
150-
for _, sub := range u.subs {
151-
sub := sub
152-
wg.Add(1)
153-
go func() {
154-
sub.send(latest)
155-
defer wg.Done()
156-
}()
157-
}
158-
u.mu.RUnlock()
159-
160-
u.mu.Lock()
161-
u.latest = latest
162-
u.mu.Unlock()
163-
wg.Wait()
164167
}
165168

166169
func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan *proto.WorkspaceUpdate, error) {
@@ -171,11 +174,17 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
171174
sub := &sub{
172175
userID: userID,
173176
tx: tx,
174-
prev: make(workspacesByID),
177+
db: u.db,
178+
ps: u.ps,
179+
state: workspacesByID{},
180+
}
181+
err := sub.start()
182+
if err != nil {
183+
sub.stop()
184+
return nil, err
175185
}
186+
176187
u.subs[peerID] = sub
177-
// Write initial state
178-
sub.send(u.latest)
179188
return tx, nil
180189
}
181190

@@ -191,75 +200,40 @@ func (u *updatesProvider) Unsubscribe(peerID uuid.UUID) {
191200
delete(u.subs, peerID)
192201
}
193202

194-
func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
203+
func produceInitialUpdate(rows []database.GetWorkspacesAndAgentsByOwnerIDRow) *proto.WorkspaceUpdate {
195204
out := &proto.WorkspaceUpdate{
196205
UpsertedWorkspaces: []*proto.Workspace{},
197206
UpsertedAgents: []*proto.Agent{},
198207
DeletedWorkspaces: []*proto.Workspace{},
199208
DeletedAgents: []*proto.Agent{},
200209
}
201210

202-
for wsID, newWorkspace := range new {
203-
oldWorkspace, exists := old[wsID]
204-
// Upsert both workspace and agents if the workspace is new
205-
if !exists {
206-
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{
207-
Id: tailnet.UUIDToByteSlice(wsID),
208-
Name: newWorkspace.WorkspaceName,
209-
Status: convertStatus(newWorkspace.JobStatus, newWorkspace.Transition),
210-
})
211-
for _, agent := range newWorkspace.Agents {
212-
out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{
213-
Id: tailnet.UUIDToByteSlice(agent.ID),
214-
Name: agent.Name,
215-
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
216-
})
217-
}
218-
continue
219-
}
220-
// Upsert workspace if the workspace is updated
221-
if !newWorkspace.Equal(oldWorkspace) {
222-
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{
223-
Id: tailnet.UUIDToByteSlice(wsID),
224-
Name: newWorkspace.WorkspaceName,
225-
Status: convertStatus(newWorkspace.JobStatus, newWorkspace.Transition),
226-
})
227-
}
228-
229-
add, remove := slice.SymmetricDifference(oldWorkspace.Agents, newWorkspace.Agents)
230-
for _, agent := range add {
211+
for _, row := range rows {
212+
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{
213+
Id: tailnet.UUIDToByteSlice(row.ID),
214+
Name: row.Name,
215+
Status: convertStatus(row.JobStatus, row.Transition),
216+
})
217+
for _, agent := range row.Agents {
231218
out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{
232219
Id: tailnet.UUIDToByteSlice(agent.ID),
233220
Name: agent.Name,
234-
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
235-
})
236-
}
237-
for _, agent := range remove {
238-
out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{
239-
Id: tailnet.UUIDToByteSlice(agent.ID),
240-
Name: agent.Name,
241-
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
221+
WorkspaceId: tailnet.UUIDToByteSlice(row.ID),
242222
})
243223
}
244224
}
225+
return out
226+
}
245227

246-
// Delete workspace and agents if the workspace is deleted
247-
for wsID, oldWorkspace := range old {
248-
if _, exists := new[wsID]; !exists {
249-
out.DeletedWorkspaces = append(out.DeletedWorkspaces, &proto.Workspace{
250-
Id: tailnet.UUIDToByteSlice(wsID),
251-
Name: oldWorkspace.WorkspaceName,
252-
Status: convertStatus(oldWorkspace.JobStatus, oldWorkspace.Transition),
253-
})
254-
for _, agent := range oldWorkspace.Agents {
255-
out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{
256-
Id: tailnet.UUIDToByteSlice(agent.ID),
257-
Name: agent.Name,
258-
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
259-
})
260-
}
228+
func convertRows(rows []database.GetWorkspacesAndAgentsByOwnerIDRow) workspacesByID {
229+
out := make(workspacesByID)
230+
for _, row := range rows {
231+
out[row.ID] = ownedWorkspace{
232+
WorkspaceName: row.Name,
233+
JobStatus: row.JobStatus,
234+
Transition: row.Transition,
235+
Agents: row.Agents,
261236
}
262237
}
263-
264238
return out
265239
}

0 commit comments

Comments
 (0)