@@ -2,7 +2,7 @@ package coderd
2
2
3
3
import (
4
4
"context"
5
- "database/sql "
5
+ "fmt "
6
6
"sync"
7
7
8
8
"github.com/google/uuid"
@@ -16,8 +16,6 @@ import (
16
16
"github.com/coder/coder/v2/tailnet/proto"
17
17
)
18
18
19
- type workspacesByOwner map [uuid.UUID ]workspacesByID
20
-
21
19
type workspacesByID map [uuid.UUID ]ownedWorkspace
22
20
23
21
type ownedWorkspace struct {
@@ -34,133 +32,184 @@ func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
34
32
w .Transition == other .Transition
35
33
}
36
34
37
- func convertRows (v []database.GetWorkspacesAndAgentsRow ) workspacesByOwner {
38
- m := make (map [uuid.UUID ]workspacesByID )
39
- for _ , ws := range v {
40
- owned := ownedWorkspace {
41
- WorkspaceName : ws .Name ,
42
- JobStatus : ws .JobStatus ,
43
- Transition : ws .Transition ,
44
- Agents : ws .Agents ,
45
- }
46
- if byID , exists := m [ws .OwnerID ]; ! exists {
47
- m [ws .OwnerID ] = map [uuid.UUID ]ownedWorkspace {ws .ID : owned }
48
- } else {
49
- byID [ws .ID ] = owned
50
- m [ws .OwnerID ] = byID
51
- }
52
- }
53
- return workspacesByOwner (m )
54
- }
55
-
56
35
func convertStatus (status database.ProvisionerJobStatus , trans database.WorkspaceTransition ) proto.Workspace_Status {
57
36
wsStatus := codersdk .ConvertWorkspaceStatus (codersdk .ProvisionerJobStatus (status ), codersdk .WorkspaceTransition (trans ))
58
37
return tailnet .WorkspaceStatusToProto (wsStatus )
59
38
}
60
39
61
40
type sub struct {
62
- mu sync.Mutex
41
+ mu sync.RWMutex
63
42
userID uuid.UUID
64
43
tx chan <- * proto.WorkspaceUpdate
65
44
prev workspacesByID
45
+
46
+ db UpdateQuerier
47
+ ps pubsub.Pubsub
48
+
49
+ cancelFns []func ()
50
+ agentSubCancelFn func ()
51
+ wsSubCancelFn func ()
66
52
}
67
53
68
- func (s * sub ) send (all workspacesByOwner ) {
54
+ func (s * sub ) ownsAgent (agentID uuid.UUID ) bool {
55
+ s .mu .RLock ()
56
+ defer s .mu .RUnlock ()
57
+
58
+ for _ , ws := range s .prev {
59
+ for _ , agent := range ws .Agents {
60
+ if agent .ID == agentID {
61
+ return true
62
+ }
63
+ }
64
+ }
65
+ return false
66
+ }
67
+
68
+ func (s * sub ) handleUpdateEvent (ctx context.Context , _ []byte ) {
69
+ s .mu .Lock ()
70
+ defer s .mu .Unlock ()
71
+
72
+ rows , err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx , s .userID )
73
+ if err != nil {
74
+ return
75
+ }
76
+ s .handleRowsNoLock (rows )
77
+ }
78
+
79
+ func (s * sub ) handleInsertWorkspaceEvent (ctx context.Context , wsIDStr []byte ) {
69
80
s .mu .Lock ()
70
81
defer s .mu .Unlock ()
71
82
72
- // Filter to only the workspaces owned by the user
73
- latest := all [s .userID ]
83
+ wsID , err := uuid .Parse (string (wsIDStr ))
84
+ if err != nil {
85
+ return
86
+ }
87
+
88
+ cancel , err := s .ps .Subscribe (codersdk .WorkspaceNotifyChannel (wsID ), s .handleUpdateEvent )
89
+ if err != nil {
90
+ return
91
+ }
92
+ s .cancelFns = append (s .cancelFns , cancel )
93
+
94
+ rows , err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx , s .userID )
95
+ if err != nil {
96
+ return
97
+ }
98
+ s .handleRowsNoLock (rows )
99
+ }
100
+
101
+ func (s * sub ) handleInsertAgentEvent (ctx context.Context , _ []byte ) {
102
+ s .mu .Lock ()
103
+ defer s .mu .Unlock ()
104
+
105
+ rows , err := s .db .GetWorkspacesAndAgentsByOwnerID (ctx , s .userID )
106
+ if err != nil {
107
+ return
108
+ }
109
+ s .handleRowsNoLock (rows )
110
+ }
111
+
112
+ func (s * sub ) handleRowsNoLock (rows []database.GetWorkspacesAndAgentsByOwnerIDRow ) {
113
+ latest := convertRows (rows )
74
114
update := produceUpdate (s .prev , latest )
75
115
s .prev = latest
76
116
s .tx <- update
77
117
}
78
118
119
+ // start subscribes to updates for all workspaces owned by the user
120
+ func (s * sub ) start () (err error ) {
121
+ s .mu .Lock ()
122
+ defer s .mu .Unlock ()
123
+
124
+ rows , err := s .db .GetWorkspacesAndAgentsByOwnerID (context .Background (), s .userID )
125
+ if err != nil {
126
+ return xerrors .Errorf ("get workspaces and agents by owner ID: %w" , err )
127
+ }
128
+
129
+ // Send initial state
130
+ s .handleRowsNoLock (rows )
131
+
132
+ for _ , row := range rows {
133
+ cancel , err := s .ps .Subscribe (codersdk .WorkspaceNotifyChannel (row .ID ), s .handleUpdateEvent )
134
+ if err != nil {
135
+ return xerrors .Errorf ("subscribe to workspace notify channel: %w" , err )
136
+ }
137
+ s .cancelFns = append (s .cancelFns , cancel )
138
+ }
139
+
140
+ cancel , err := s .ps .Subscribe (WorkspaceInsertChannel (s .userID ), s .handleInsertWorkspaceEvent )
141
+ if err != nil {
142
+ return xerrors .Errorf ("subscribe to new workspace channel: %w" , err )
143
+ }
144
+ s .wsSubCancelFn = cancel
145
+
146
+ cancel , err = s .ps .Subscribe (fmt .Sprintf ("new_agent:%s" , s .userID .String ()), s .handleInsertAgentEvent )
147
+ if err != nil {
148
+ return xerrors .Errorf ("subscribe to new agent channel: %w" , err )
149
+ }
150
+ s .agentSubCancelFn = cancel
151
+ return nil
152
+ }
153
+
154
+ func (s * sub ) stop () {
155
+ s .mu .Lock ()
156
+ defer s .mu .Unlock ()
157
+
158
+ for _ , cancel := range s .cancelFns {
159
+ cancel ()
160
+ }
161
+
162
+ if s .wsSubCancelFn != nil {
163
+ s .wsSubCancelFn ()
164
+ }
165
+
166
+ if s .agentSubCancelFn != nil {
167
+ s .agentSubCancelFn ()
168
+ }
169
+
170
+ close (s .tx )
171
+ }
172
+
79
173
type UpdateQuerier interface {
80
- GetWorkspacesAndAgents (ctx context.Context ) ([]database.GetWorkspacesAndAgentsRow , error )
174
+ GetWorkspacesAndAgentsByOwnerID (ctx context.Context , ownerID uuid. UUID ) ([]database.GetWorkspacesAndAgentsByOwnerIDRow , error )
81
175
}
82
176
83
177
type updatesProvider struct {
84
178
mu sync.RWMutex
85
- db UpdateQuerier
86
- ps pubsub.Pubsub
87
179
// Peer ID -> subscription
88
180
subs map [uuid.UUID ]* sub
89
- // Owner ID -> workspace ID -> workspace
90
- latest workspacesByOwner
91
- cancelFn func ()
181
+
182
+ db UpdateQuerier
183
+ ps pubsub. Pubsub
92
184
}
93
185
94
- func (u * updatesProvider ) IsOwner (userID uuid.UUID , agentID uuid.UUID ) error {
186
+ func (u * updatesProvider ) IsOwner (userID uuid.UUID , agentID uuid.UUID ) bool {
95
187
u .mu .RLock ()
96
188
defer u .mu .RUnlock ()
97
189
98
- workspaces , exists := u .latest [userID ]
99
- if ! exists {
100
- return xerrors .Errorf ("workspace agent not found or you do not have permission: %w" , sql .ErrNoRows )
101
- }
102
- for _ , workspace := range workspaces {
103
- for _ , agent := range workspace .Agents {
104
- if agent .ID == agentID {
105
- return nil
106
- }
190
+ for _ , sub := range u .subs {
191
+ if sub .userID == userID && sub .ownsAgent (agentID ) {
192
+ return true
107
193
}
108
194
}
109
- return xerrors . Errorf ( "workspace agent not found or you do not have permission: %w" , sql . ErrNoRows )
195
+ return false
110
196
}
111
197
112
198
var _ tailnet.WorkspaceUpdatesProvider = (* updatesProvider )(nil )
113
199
114
- func NewUpdatesProvider (ctx context.Context , db UpdateQuerier , ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider , error ) {
115
- rows , err := db .GetWorkspacesAndAgents (ctx )
116
- if err != nil && ! xerrors .Is (err , sql .ErrNoRows ) {
117
- return nil , err
118
- }
200
+ func NewUpdatesProvider (_ context.Context , db UpdateQuerier , ps pubsub.Pubsub ) (tailnet.WorkspaceUpdatesProvider , error ) {
119
201
out := & updatesProvider {
120
- db : db ,
121
- ps : ps ,
122
- subs : map [uuid.UUID ]* sub {},
123
- latest : convertRows (rows ),
124
- }
125
- cancel , err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel , out .handleUpdate )
126
- if err != nil {
127
- return nil , err
202
+ db : db ,
203
+ ps : ps ,
204
+ subs : map [uuid.UUID ]* sub {},
128
205
}
129
- out .cancelFn = cancel
130
206
return out , nil
131
207
}
132
208
133
209
func (u * updatesProvider ) Stop () {
134
210
for _ , sub := range u .subs {
135
- close (sub .tx )
136
- }
137
- u .cancelFn ()
138
- }
139
-
140
- func (u * updatesProvider ) handleUpdate (ctx context.Context , _ []byte ) {
141
- rows , err := u .db .GetWorkspacesAndAgents (ctx )
142
- if err != nil && ! xerrors .Is (err , sql .ErrNoRows ) {
143
- // TODO: Log
144
- return
145
- }
146
-
147
- wg := & sync.WaitGroup {}
148
- latest := convertRows (rows )
149
- u .mu .RLock ()
150
- for _ , sub := range u .subs {
151
- sub := sub
152
- wg .Add (1 )
153
- go func () {
154
- sub .send (latest )
155
- defer wg .Done ()
156
- }()
211
+ sub .stop ()
157
212
}
158
- u .mu .RUnlock ()
159
-
160
- u .mu .Lock ()
161
- u .latest = latest
162
- u .mu .Unlock ()
163
- wg .Wait ()
164
213
}
165
214
166
215
func (u * updatesProvider ) Subscribe (peerID uuid.UUID , userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate , error ) {
@@ -169,13 +218,20 @@ func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan
169
218
170
219
tx := make (chan * proto.WorkspaceUpdate , 1 )
171
220
sub := & sub {
172
- userID : userID ,
173
- tx : tx ,
174
- prev : make (workspacesByID ),
221
+ userID : userID ,
222
+ tx : tx ,
223
+ db : u .db ,
224
+ ps : u .ps ,
225
+ prev : workspacesByID {},
226
+ cancelFns : []func (){},
227
+ }
228
+ err := sub .start ()
229
+ if err != nil {
230
+ sub .stop ()
231
+ return nil , err
175
232
}
233
+
176
234
u .subs [peerID ] = sub
177
- // Write initial state
178
- sub .send (u .latest )
179
235
return tx , nil
180
236
}
181
237
@@ -263,3 +319,24 @@ func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
263
319
264
320
return out
265
321
}
322
+
323
+ func convertRows (rows []database.GetWorkspacesAndAgentsByOwnerIDRow ) workspacesByID {
324
+ out := make (workspacesByID )
325
+ for _ , row := range rows {
326
+ out [row .ID ] = ownedWorkspace {
327
+ WorkspaceName : row .Name ,
328
+ JobStatus : row .JobStatus ,
329
+ Transition : row .Transition ,
330
+ Agents : row .Agents ,
331
+ }
332
+ }
333
+ return out
334
+ }
335
+
336
+ func WorkspaceInsertChannel (userID uuid.UUID ) string {
337
+ return fmt .Sprintf ("new_workspace:%s" , userID .String ())
338
+ }
339
+
340
+ func AgentInsertChannel (userID uuid.UUID ) string {
341
+ return fmt .Sprintf ("new_agent:%s" , userID .String ())
342
+ }
0 commit comments