Skip to content

Commit 9faa940

Browse files
committed
use peer ids as subscription key + tests
1 parent 8b55da8 commit 9faa940

File tree

4 files changed

+358
-215
lines changed

4 files changed

+358
-215
lines changed

tailnet/service.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,12 @@ func (s *DRPCService) WorkspaceUpdates(_ *proto.WorkspaceUpdatesRequest, stream
259259
)
260260
switch auth := streamID.Auth.(type) {
261261
case ClientUserCoordinateeAuth:
262-
updatesCh, err = s.WorkspaceUpdatesProvider.Subscribe(auth.UserID)
262+
// Stream ID is the peer ID
263+
updatesCh, err = s.WorkspaceUpdatesProvider.Subscribe(streamID.ID, auth.UserID)
263264
if err != nil {
264265
err = xerrors.Errorf("subscribe to workspace updates: %w", err)
265266
}
267+
defer s.WorkspaceUpdatesProvider.Unsubscribe(streamID.ID)
266268
default:
267269
err = xerrors.Errorf("workspace updates not supported by auth name %T", auth)
268270
}
@@ -273,6 +275,9 @@ func (s *DRPCService) WorkspaceUpdates(_ *proto.WorkspaceUpdatesRequest, stream
273275
for {
274276
select {
275277
case updates := <-updatesCh:
278+
if updates == nil {
279+
return nil
280+
}
276281
err := stream.Send(updates)
277282
if err != nil {
278283
return xerrors.Errorf("send workspace update: %w", err)

tailnet/workspaceupdates.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func convertRows(v []database.GetWorkspacesAndAgentsRow) workspacesByOwner {
4040
WorkspaceName: ws.Name,
4141
JobStatus: ws.JobStatus,
4242
Transition: ws.Transition,
43+
Agents: ws.Agents,
4344
}
4445
if byID, exists := m[ws.OwnerID]; !exists {
4546
m[ws.OwnerID] = map[uuid.UUID]ownedWorkspace{ws.ID: owned}
@@ -68,14 +69,16 @@ func (s *sub) send(all workspacesByOwner) {
6869
defer s.mu.Unlock()
6970

7071
// Filter to only the workspaces owned by the user
71-
own := all[s.userID]
72-
update := produceUpdate(s.prev, own)
73-
s.prev = own
72+
latest := all[s.userID]
73+
update := produceUpdate(s.prev, latest)
74+
s.prev = latest
7475
s.tx <- update
7576
}
7677

7778
type WorkspaceUpdatesProvider interface {
78-
Subscribe(userID uuid.UUID) (<-chan *proto.WorkspaceUpdate, error)
79+
Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan *proto.WorkspaceUpdate, error)
80+
Unsubscribe(peerID uuid.UUID)
81+
Stop()
7982
}
8083

8184
type WorkspaceStore interface {
@@ -84,25 +87,40 @@ type WorkspaceStore interface {
8487
}
8588

8689
type updatesProvider struct {
87-
mu sync.RWMutex
88-
db WorkspaceStore
89-
ps pubsub.Pubsub
90-
subs []*sub
90+
mu sync.RWMutex
91+
db WorkspaceStore
92+
ps pubsub.Pubsub
93+
// Peer ID -> subscription
94+
subs map[uuid.UUID]*sub
95+
latest workspacesByOwner
9196
cancelFn func()
9297
}
9398

9499
var _ WorkspaceUpdatesProvider = (*updatesProvider)(nil)
95100

96-
func (u *updatesProvider) Start() error {
97-
cancel, err := u.ps.Subscribe(codersdk.AllWorkspacesNotifyChannel, u.handleUpdate)
101+
func NewUpdatesProvider(ctx context.Context, db WorkspaceStore, ps pubsub.Pubsub) (WorkspaceUpdatesProvider, error) {
102+
rows, err := db.GetWorkspacesAndAgents(ctx)
103+
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
104+
return nil, err
105+
}
106+
out := &updatesProvider{
107+
db: db,
108+
ps: ps,
109+
subs: map[uuid.UUID]*sub{},
110+
latest: convertRows(rows),
111+
}
112+
cancel, err := ps.Subscribe(codersdk.AllWorkspacesNotifyChannel, out.handleUpdate)
98113
if err != nil {
99-
return err
114+
return nil, err
100115
}
101-
u.cancelFn = cancel
102-
return nil
116+
out.cancelFn = cancel
117+
return out, nil
103118
}
104119

105120
func (u *updatesProvider) Stop() {
121+
for _, sub := range u.subs {
122+
close(sub.tx)
123+
}
106124
u.cancelFn()
107125
}
108126

@@ -116,7 +134,6 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
116134
wg := &sync.WaitGroup{}
117135
latest := convertRows(rows)
118136
u.mu.RLock()
119-
defer u.mu.RUnlock()
120137
for _, sub := range u.subs {
121138
sub := sub
122139
wg.Add(1)
@@ -125,18 +142,15 @@ func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
125142
defer wg.Done()
126143
}()
127144
}
128-
wg.Wait()
129-
}
145+
u.mu.RUnlock()
130146

131-
func NewUpdatesProvider(db WorkspaceStore, ps pubsub.Pubsub) WorkspaceUpdatesProvider {
132-
return &updatesProvider{
133-
db: db,
134-
ps: ps,
135-
subs: make([]*sub, 0),
136-
}
147+
u.mu.Lock()
148+
u.latest = latest
149+
u.mu.Unlock()
150+
wg.Wait()
137151
}
138152

139-
func (u *updatesProvider) Subscribe(userID uuid.UUID) (<-chan *proto.WorkspaceUpdate, error) {
153+
func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan *proto.WorkspaceUpdate, error) {
140154
u.mu.Lock()
141155
defer u.mu.Unlock()
142156

@@ -146,10 +160,25 @@ func (u *updatesProvider) Subscribe(userID uuid.UUID) (<-chan *proto.WorkspaceUp
146160
tx: tx,
147161
prev: make(workspacesByID),
148162
}
149-
u.subs = append(u.subs, sub)
163+
u.subs[peerID] = sub
164+
// Write initial state
165+
sub.send(u.latest)
150166
return tx, nil
151167
}
152168

169+
func (u *updatesProvider) Unsubscribe(peerID uuid.UUID) {
170+
u.mu.Lock()
171+
defer u.mu.Unlock()
172+
173+
sub, exists := u.subs[peerID]
174+
if !exists {
175+
return
176+
}
177+
close(sub.tx)
178+
delete(u.subs, peerID)
179+
return
180+
}
181+
153182
func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
154183
out := &proto.WorkspaceUpdate{
155184
UpsertedWorkspaces: []*proto.Workspace{},

tailnet/workspaceupdates_internal_test.go

Lines changed: 0 additions & 190 deletions
This file was deleted.

0 commit comments

Comments
 (0)