@@ -40,6 +40,7 @@ func convertRows(v []database.GetWorkspacesAndAgentsRow) workspacesByOwner {
40
40
WorkspaceName : ws .Name ,
41
41
JobStatus : ws .JobStatus ,
42
42
Transition : ws .Transition ,
43
+ Agents : ws .Agents ,
43
44
}
44
45
if byID , exists := m [ws .OwnerID ]; ! exists {
45
46
m [ws .OwnerID ] = map [uuid.UUID ]ownedWorkspace {ws .ID : owned }
@@ -68,14 +69,16 @@ func (s *sub) send(all workspacesByOwner) {
68
69
defer s .mu .Unlock ()
69
70
70
71
// Filter to only the workspaces owned by the user
71
- own := all [s .userID ]
72
- update := produceUpdate (s .prev , own )
73
- s .prev = own
72
+ latest := all [s .userID ]
73
+ update := produceUpdate (s .prev , latest )
74
+ s .prev = latest
74
75
s .tx <- update
75
76
}
76
77
77
78
type WorkspaceUpdatesProvider interface {
78
- Subscribe (userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate , error )
79
+ Subscribe (peerID uuid.UUID , userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate , error )
80
+ Unsubscribe (peerID uuid.UUID )
81
+ Stop ()
79
82
}
80
83
81
84
type WorkspaceStore interface {
@@ -84,25 +87,40 @@ type WorkspaceStore interface {
84
87
}
85
88
86
89
type updatesProvider struct {
87
- mu sync.RWMutex
88
- db WorkspaceStore
89
- ps pubsub.Pubsub
90
- subs []* sub
90
+ mu sync.RWMutex
91
+ db WorkspaceStore
92
+ ps pubsub.Pubsub
93
+ // Peer ID -> subscription
94
+ subs map [uuid.UUID ]* sub
95
+ latest workspacesByOwner
91
96
cancelFn func ()
92
97
}
93
98
94
99
var _ WorkspaceUpdatesProvider = (* updatesProvider )(nil )
95
100
96
- func (u * updatesProvider ) Start () error {
97
- cancel , err := u .ps .Subscribe (codersdk .AllWorkspacesNotifyChannel , u .handleUpdate )
101
+ func NewUpdatesProvider (ctx context.Context , db WorkspaceStore , ps pubsub.Pubsub ) (WorkspaceUpdatesProvider , error ) {
102
+ rows , err := db .GetWorkspacesAndAgents (ctx )
103
+ if err != nil && ! xerrors .Is (err , sql .ErrNoRows ) {
104
+ return nil , err
105
+ }
106
+ out := & updatesProvider {
107
+ db : db ,
108
+ ps : ps ,
109
+ subs : map [uuid.UUID ]* sub {},
110
+ latest : convertRows (rows ),
111
+ }
112
+ cancel , err := ps .Subscribe (codersdk .AllWorkspacesNotifyChannel , out .handleUpdate )
98
113
if err != nil {
99
- return err
114
+ return nil , err
100
115
}
101
- u .cancelFn = cancel
102
- return nil
116
+ out .cancelFn = cancel
117
+ return out , nil
103
118
}
104
119
105
120
func (u * updatesProvider ) Stop () {
121
+ for _ , sub := range u .subs {
122
+ close (sub .tx )
123
+ }
106
124
u .cancelFn ()
107
125
}
108
126
@@ -116,7 +134,6 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
116
134
wg := & sync.WaitGroup {}
117
135
latest := convertRows (rows )
118
136
u .mu .RLock ()
119
- defer u .mu .RUnlock ()
120
137
for _ , sub := range u .subs {
121
138
sub := sub
122
139
wg .Add (1 )
@@ -125,18 +142,15 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
125
142
defer wg .Done ()
126
143
}()
127
144
}
128
- wg .Wait ()
129
- }
145
+ u .mu .RUnlock ()
130
146
131
- func NewUpdatesProvider (db WorkspaceStore , ps pubsub.Pubsub ) WorkspaceUpdatesProvider {
132
- return & updatesProvider {
133
- db : db ,
134
- ps : ps ,
135
- subs : make ([]* sub , 0 ),
136
- }
147
+ u .mu .Lock ()
148
+ u .latest = latest
149
+ u .mu .Unlock ()
150
+ wg .Wait ()
137
151
}
138
152
139
- func (u * updatesProvider ) Subscribe (userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate , error ) {
153
+ func (u * updatesProvider ) Subscribe (peerID uuid. UUID , userID uuid.UUID ) (<- chan * proto.WorkspaceUpdate , error ) {
140
154
u .mu .Lock ()
141
155
defer u .mu .Unlock ()
142
156
@@ -146,10 +160,25 @@ func (u *updatesProvider) Subscribe(userID uuid.UUID) (<-chan *proto.WorkspaceUp
146
160
tx : tx ,
147
161
prev : make (workspacesByID ),
148
162
}
149
- u .subs = append (u .subs , sub )
163
+ u .subs [peerID ] = sub
164
+ // Write initial state
165
+ sub .send (u .latest )
150
166
return tx , nil
151
167
}
152
168
169
+ func (u * updatesProvider ) Unsubscribe (peerID uuid.UUID ) {
170
+ u .mu .Lock ()
171
+ defer u .mu .Unlock ()
172
+
173
+ sub , exists := u .subs [peerID ]
174
+ if ! exists {
175
+ return
176
+ }
177
+ close (sub .tx )
178
+ delete (u .subs , peerID )
179
+ return
180
+ }
181
+
153
182
func produceUpdate (old , new workspacesByID ) * proto.WorkspaceUpdate {
154
183
out := & proto.WorkspaceUpdate {
155
184
UpsertedWorkspaces : []* proto.Workspace {},
0 commit comments