Skip to content

Commit ab7f678

Browse files
committed
redesign
1 parent b01d1cb commit ab7f678

File tree

6 files changed

+177
-113
lines changed

6 files changed

+177
-113
lines changed

coderd/workspaces.go

-5
Original file line numberDiff line numberDiff line change
@@ -2068,11 +2068,6 @@ func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUI
20682068
api.Logger.Warn(ctx, "failed to publish workspace update",
20692069
slog.F("workspace_id", workspaceID), slog.Error(err))
20702070
}
2071-
err = api.Pubsub.Publish(codersdk.AllWorkspacesNotifyChannel, []byte(workspaceID.String()))
2072-
if err != nil {
2073-
api.Logger.Warn(ctx, "failed to publish all workspaces update",
2074-
slog.F("workspace_id", workspaceID), slog.Error(err))
2075-
}
20762071
}
20772072

20782073
func (api *API) publishWorkspaceAgentLogsUpdate(ctx context.Context, workspaceAgentID uuid.UUID, m agentsdk.LogsNotifyMessage) {

coderd/workspaceupdates.go

+167-90
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package coderd
22

33
import (
44
"context"
5-
"database/sql"
5+
"fmt"
66
"sync"
77

88
"github.com/google/uuid"
@@ -16,8 +16,6 @@ import (
1616
"github.com/coder/coder/v2/tailnet/proto"
1717
)
1818

19-
type workspacesByOwner map[uuid.UUID]workspacesByID
20-
2119
type workspacesByID map[uuid.UUID]ownedWorkspace
2220

2321
type ownedWorkspace struct {
@@ -34,133 +32,184 @@ func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
3432
w.Transition == other.Transition
3533
}
3634

37-
func convertRows(v []database.GetWorkspacesAndAgentsRow) workspacesByOwner {
38-
m := make(map[uuid.UUID]workspacesByID)
39-
for _, ws := range v {
40-
owned := ownedWorkspace{
41-
WorkspaceName: ws.Name,
42-
JobStatus: ws.JobStatus,
43-
Transition: ws.Transition,
44-
Agents: ws.Agents,
45-
}
46-
if byID, exists := m[ws.OwnerID]; !exists {
47-
m[ws.OwnerID] = map[uuid.UUID]ownedWorkspace{ws.ID: owned}
48-
} else {
49-
byID[ws.ID] = owned
50-
m[ws.OwnerID] = byID
51-
}
52-
}
53-
return workspacesByOwner(m)
54-
}
55-
5635
func convertStatus(status database.ProvisionerJobStatus, trans database.WorkspaceTransition) proto.Workspace_Status {
5736
wsStatus := codersdk.ConvertWorkspaceStatus(codersdk.ProvisionerJobStatus(status), codersdk.WorkspaceTransition(trans))
5837
return tailnet.WorkspaceStatusToProto(wsStatus)
5938
}
6039

6140
type sub struct {
62-
mu sync.Mutex
41+
mu sync.RWMutex
6342
userID uuid.UUID
6443
tx chan<- *proto.WorkspaceUpdate
6544
prev workspacesByID
45+
46+
db UpdateQuerier
47+
ps pubsub.Pubsub
48+
49+
cancelFns []func()
50+
agentSubCancelFn func()
51+
wsSubCancelFn func()
6652
}
6753

68-
func (s *sub) send(all workspacesByOwner) {
54+
func (s *sub) ownsAgent(agentID uuid.UUID) bool {
55+
s.mu.RLock()
56+
defer s.mu.RUnlock()
57+
58+
for _, ws := range s.prev {
59+
for _, agent := range ws.Agents {
60+
if agent.ID == agentID {
61+
return true
62+
}
63+
}
64+
}
65+
return false
66+
}
67+
68+
func (s *sub) handleUpdateEvent(ctx context.Context, _ []byte) {
69+
s.mu.Lock()
70+
defer s.mu.Unlock()
71+
72+
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(ctx, s.userID)
73+
if err != nil {
74+
return
75+
}
76+
s.handleRowsNoLock(rows)
77+
}
78+
79+
func (s *sub) handleInsertWorkspaceEvent(ctx context.Context, wsIDStr []byte) {
6980
s.mu.Lock()
7081
defer s.mu.Unlock()
7182

72-
// Filter to only the workspaces owned by the user
73-
latest := all[s.userID]
83+
wsID, err := uuid.Parse(string(wsIDStr))
84+
if err != nil {
85+
return
86+
}
87+
88+
cancel, err := s.ps.Subscribe(codersdk.WorkspaceNotifyChannel(wsID), s.handleUpdateEvent)
89+
if err != nil {
90+
return
91+
}
92+
s.cancelFns = append(s.cancelFns, cancel)
93+
94+
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(ctx, s.userID)
95+
if err != nil {
96+
return
97+
}
98+
s.handleRowsNoLock(rows)
99+
}
100+
101+
func (s *sub) handleInsertAgentEvent(ctx context.Context, _ []byte) {
102+
s.mu.Lock()
103+
defer s.mu.Unlock()
104+
105+
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(ctx, s.userID)
106+
if err != nil {
107+
return
108+
}
109+
s.handleRowsNoLock(rows)
110+
}
111+
112+
func (s *sub) handleRowsNoLock(rows []database.GetWorkspacesAndAgentsByOwnerIDRow) {
113+
latest := convertRows(rows)
74114
update := produceUpdate(s.prev, latest)
75115
s.prev = latest
76116
s.tx <- update
77117
}
78118

119+
// start subscribes to updates for all workspaces owned by the user
120+
func (s *sub) start() (err error) {
121+
s.mu.Lock()
122+
defer s.mu.Unlock()
123+
124+
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(context.Background(), s.userID)
125+
if err != nil {
126+
return xerrors.Errorf("get workspaces and agents by owner ID: %w", err)
127+
}
128+
129+
// Send initial state
130+
s.handleRowsNoLock(rows)
131+
132+
for _, row := range rows {
133+
cancel, err := s.ps.Subscribe(codersdk.WorkspaceNotifyChannel(row.ID), s.handleUpdateEvent)
134+
if err != nil {
135+
return xerrors.Errorf("subscribe to workspace notify channel: %w", err)
136+
}
137+
s.cancelFns = append(s.cancelFns, cancel)
138+
}
139+
140+
cancel, err := s.ps.Subscribe(WorkspaceInsertChannel(s.userID), s.handleInsertWorkspaceEvent)
141+
if err != nil {
142+
return xerrors.Errorf("subscribe to new workspace channel: %w", err)
143+
}
144+
s.wsSubCancelFn = cancel
145+
146+
cancel, err = s.ps.Subscribe(fmt.Sprintf("new_agent:%s", s.userID.String()), s.handleInsertAgentEvent)
147+
if err != nil {
148+
return xerrors.Errorf("subscribe to new agent channel: %w", err)
149+
}
150+
s.agentSubCancelFn = cancel
151+
return nil
152+
}
153+
154+
func (s *sub) stop() {
155+
s.mu.Lock()
156+
defer s.mu.Unlock()
157+
158+
for _, cancel := range s.cancelFns {
159+
cancel()
160+
}
161+
162+
if s.wsSubCancelFn != nil {
163+
s.wsSubCancelFn()
164+
}
165+
166+
if s.agentSubCancelFn != nil {
167+
s.agentSubCancelFn()
168+
}
169+
170+
close(s.tx)
171+
}
172+
79173
type UpdateQuerier interface {
80-
GetWorkspacesAndAgents(ctx context.Context) ([]database.GetWorkspacesAndAgentsRow, error)
174+
GetWorkspacesAndAgentsByOwnerID(ctx context.Context, ownerID uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error)
81175
}
82176

83177
type updatesProvider struct {
84178
mu sync.RWMutex
85-
db UpdateQuerier
86-
ps pubsub.Pubsub
87179
// Peer ID -> subscription
88180
subs map[uuid.UUID]*sub
89-
// Owner ID -> workspace ID -> workspace
90-
latest workspacesByOwner
91-
cancelFn func()
181+
182+
db UpdateQuerier
183+
ps pubsub.Pubsub
92184
}
93185

94-
func (u *updatesProvider) IsOwner(userID uuid.UUID, agentID uuid.UUID) error {
186+
func (u *updatesProvider) IsOwner(userID uuid.UUID, agentID uuid.UUID) bool {
95187
u.mu.RLock()
96188
defer u.mu.RUnlock()
97189

98-
workspaces, exists := u.latest[userID]
99-
if !exists {
100-
return xerrors.Errorf("workspace agent not found or you do not have permission: %w", sql.ErrNoRows)
101-
}
102-
for _, workspace := range workspaces {
103-
for _, agent := range workspace.Agents {
104-
if agent.ID == agentID {
105-
return nil
106-
}
190+
for _, sub := range u.subs {
191+
if sub.userID == userID && sub.ownsAgent(agentID) {
192+
return true
107193
}
108194
}
109-
return xerrors.Errorf("workspace agent not found or you do not have permission: %w", sql.ErrNoRows)
195+
return false
110196
}
111197

112198
var _ tailnet.WorkspaceUpdatesProvider = (*updatesProvider)(nil)
113199

114-
func NewUpdatesProvider(ctx context.Context, db UpdateQuerier, ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider, error) {
115-
rows, err := db.GetWorkspacesAndAgents(ctx)
116-
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
117-
return nil, err
118-
}
200+
func NewUpdatesProvider(_ context.Context, db UpdateQuerier, ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider, error) {
119201
out := &updatesProvider{
120-
db: db,
121-
ps: ps,
122-
subs: map[uuid.UUID]*sub{},
123-
latest: convertRows(rows),
124-
}
125-
cancel, err := ps.Subscribe(codersdk.AllWorkspacesNotifyChannel, out.handleUpdate)
126-
if err != nil {
127-
return nil, err
202+
db: db,
203+
ps: ps,
204+
subs: map[uuid.UUID]*sub{},
128205
}
129-
out.cancelFn = cancel
130206
return out, nil
131207
}
132208

133209
func (u *updatesProvider) Stop() {
134210
for _, sub := range u.subs {
135-
close(sub.tx)
136-
}
137-
u.cancelFn()
138-
}
139-
140-
func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
141-
rows, err := u.db.GetWorkspacesAndAgents(ctx)
142-
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
143-
// TODO: Log
144-
return
145-
}
146-
147-
wg := &sync.WaitGroup{}
148-
latest := convertRows(rows)
149-
u.mu.RLock()
150-
for _, sub := range u.subs {
151-
sub := sub
152-
wg.Add(1)
153-
go func() {
154-
sub.send(latest)
155-
defer wg.Done()
156-
}()
211+
sub.stop()
157212
}
158-
u.mu.RUnlock()
159-
160-
u.mu.Lock()
161-
u.latest = latest
162-
u.mu.Unlock()
163-
wg.Wait()
164213
}
165214

166215
func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan *proto.WorkspaceUpdate, error) {
@@ -169,13 +218,20 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
169218

170219
tx := make(chan *proto.WorkspaceUpdate, 1)
171220
sub := &sub{
172-
userID: userID,
173-
tx: tx,
174-
prev: make(workspacesByID),
221+
userID: userID,
222+
tx: tx,
223+
db: u.db,
224+
ps: u.ps,
225+
prev: workspacesByID{},
226+
cancelFns: []func(){},
227+
}
228+
err := sub.start()
229+
if err != nil {
230+
sub.stop()
231+
return nil, err
175232
}
233+
176234
u.subs[peerID] = sub
177-
// Write initial state
178-
sub.send(u.latest)
179235
return tx, nil
180236
}
181237

@@ -263,3 +319,24 @@ func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
263319

264320
return out
265321
}
322+
323+
func convertRows(rows []database.GetWorkspacesAndAgentsByOwnerIDRow) workspacesByID {
324+
out := make(workspacesByID)
325+
for _, row := range rows {
326+
out[row.ID] = ownedWorkspace{
327+
WorkspaceName: row.Name,
328+
JobStatus: row.JobStatus,
329+
Transition: row.Transition,
330+
Agents: row.Agents,
331+
}
332+
}
333+
return out
334+
}
335+
336+
func WorkspaceInsertChannel(userID uuid.UUID) string {
337+
return fmt.Sprintf("new_workspace:%s", userID.String())
338+
}
339+
340+
func AgentInsertChannel(userID uuid.UUID) string {
341+
return fmt.Sprintf("new_agent:%s", userID.String())
342+
}

0 commit comments

Comments
 (0)