Skip to content

Commit d57f3e6

Browse files
committed
move core impl to coderd
1 parent 9faa940 commit d57f3e6

File tree

8 files changed

+358
-277
lines changed

8 files changed

+358
-277
lines changed

cli/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
719719
options.Database = dbmetrics.New(options.Database, options.PrometheusRegistry)
720720
}
721721

722+
wsUpdates, err := coderd.NewUpdatesProvider(ctx, &coderd.WorkspaceUpdateStore{
723+
Store: options.Database,
724+
}, options.Pubsub)
725+
if err != nil {
726+
return xerrors.Errorf("create workspace updates provider: %w", err)
727+
}
728+
options.WorkspaceUpdatesProvider = wsUpdates
729+
722730
var deploymentID string
723731
err = options.Database.InTx(func(tx database.Store) error {
724732
// This will block until the lock is acquired, and will be

coderd/coderd.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ type Options struct {
228228

229229
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
230230

231+
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
232+
231233
// This janky function is used in telemetry to parse fields out of the raw
232234
// JWT. It needs to be passed through like this because license parsing is
233235
// under the enterprise license, and can't be imported into AGPL.
@@ -591,12 +593,13 @@ func New(options *Options) *API {
591593
panic("CoordinatorResumeTokenProvider is nil")
592594
}
593595
api.TailnetClientService, err = tailnet.NewClientService(tailnet.ClientServiceOptions{
594-
Logger: api.Logger.Named("tailnetclient"),
595-
CoordPtr: &api.TailnetCoordinator,
596-
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
597-
DERPMapFn: api.DERPMap,
598-
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
599-
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
596+
Logger: api.Logger.Named("tailnetclient"),
597+
CoordPtr: &api.TailnetCoordinator,
598+
DERPMapUpdateFrequency: api.Options.DERPMapUpdateFrequency,
599+
DERPMapFn: api.DERPMap,
600+
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
601+
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
602+
WorkspaceUpdatesProvider: api.Options.WorkspaceUpdatesProvider,
600603
})
601604
if err != nil {
602605
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))

coderd/workspaces.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,11 +2146,12 @@ func (api *API) tailnet(rw http.ResponseWriter, r *http.Request) {
21462146

21472147
go httpapi.Heartbeat(ctx, conn)
21482148
err = api.TailnetClientService.ServeUserClient(ctx, version, wsNetConn, tailnet.ServeUserClientOptions{
2149-
PeerID: peerID,
2150-
UserID: owner.ID,
2151-
Subject: &ownerRoles,
2152-
Authz: api.Authorizer,
2153-
Database: api.Database,
2149+
PeerID: peerID,
2150+
UserID: owner.ID,
2151+
Subject: &ownerRoles,
2152+
Authz: api.Authorizer,
2153+
// TODO:
2154+
// Database: api.Database,
21542155
})
21552156
if err != nil && !xerrors.Is(err, io.EOF) && !xerrors.Is(err, context.Canceled) {
21562157
_ = conn.Close(websocket.StatusInternalError, err.Error())

coderd/workspaceupdates.go

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
package coderd
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"sync"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/xerrors"
10+
11+
"github.com/coder/coder/v2/coderd/database"
12+
"github.com/coder/coder/v2/coderd/database/db2sdk"
13+
"github.com/coder/coder/v2/coderd/database/pubsub"
14+
"github.com/coder/coder/v2/coderd/rbac"
15+
"github.com/coder/coder/v2/coderd/util/slice"
16+
"github.com/coder/coder/v2/codersdk"
17+
"github.com/coder/coder/v2/tailnet"
18+
"github.com/coder/coder/v2/tailnet/proto"
19+
)
20+
21+
type workspacesByOwner map[uuid.UUID]workspacesByID
22+
23+
type workspacesByID map[uuid.UUID]ownedWorkspace
24+
25+
type ownedWorkspace struct {
26+
WorkspaceName string
27+
JobStatus codersdk.ProvisionerJobStatus
28+
Transition codersdk.WorkspaceTransition
29+
Agents []tailnet.AgentIDNamePair
30+
}
31+
32+
type WorkspaceUpdateStore struct {
33+
database.Store
34+
}
35+
36+
var _ tailnet.UpdateQuerier = (*WorkspaceUpdateStore)(nil)
37+
38+
func (u *WorkspaceUpdateStore) GetWorkspaceRBACByAgentID(ctx context.Context, agentID uuid.UUID) (rbac.Objecter, error) {
39+
ws, err := u.Store.GetWorkspaceByAgentID(ctx, agentID)
40+
if err != nil {
41+
return nil, err
42+
}
43+
return ws, nil
44+
}
45+
46+
func (u *WorkspaceUpdateStore) GetWorkspacesAndAgents(ctx context.Context) ([]tailnet.WorkspacesAndAgents, error) {
47+
rows, err := u.Store.GetWorkspacesAndAgents(ctx)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
out := db2sdk.List(rows, func(v database.GetWorkspacesAndAgentsRow) tailnet.WorkspacesAndAgents {
53+
return tailnet.WorkspacesAndAgents{
54+
ID: v.ID,
55+
Name: v.Name,
56+
OwnerID: v.OwnerID,
57+
JobStatus: codersdk.ProvisionerJobStatus(v.JobStatus),
58+
Transition: codersdk.WorkspaceTransition(v.Transition),
59+
Agents: db2sdk.List(v.Agents, func(database.AgentIDNamePair) tailnet.AgentIDNamePair {
60+
return tailnet.AgentIDNamePair{
61+
ID: v.ID,
62+
Name: v.Name,
63+
}
64+
}),
65+
}
66+
})
67+
return out, nil
68+
}
69+
70+
// Equal does not compare agents
71+
func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
72+
return w.WorkspaceName == other.WorkspaceName &&
73+
w.JobStatus == other.JobStatus &&
74+
w.Transition == other.Transition
75+
}
76+
77+
func convertRows(v []tailnet.WorkspacesAndAgents) workspacesByOwner {
78+
m := make(map[uuid.UUID]workspacesByID)
79+
for _, ws := range v {
80+
owned := ownedWorkspace{
81+
WorkspaceName: ws.Name,
82+
JobStatus: ws.JobStatus,
83+
Transition: ws.Transition,
84+
Agents: ws.Agents,
85+
}
86+
if byID, exists := m[ws.OwnerID]; !exists {
87+
m[ws.OwnerID] = map[uuid.UUID]ownedWorkspace{ws.ID: owned}
88+
} else {
89+
byID[ws.ID] = owned
90+
m[ws.OwnerID] = byID
91+
}
92+
}
93+
return workspacesByOwner(m)
94+
}
95+
96+
func convertStatus(status codersdk.ProvisionerJobStatus, trans codersdk.WorkspaceTransition) proto.Workspace_Status {
97+
wsStatus := codersdk.ConvertWorkspaceStatus(status, trans)
98+
return tailnet.WorkspaceStatusToProto(wsStatus)
99+
}
100+
101+
type sub struct {
102+
mu sync.Mutex
103+
userID uuid.UUID
104+
tx chan<- *proto.WorkspaceUpdate
105+
prev workspacesByID
106+
}
107+
108+
func (s *sub) send(all workspacesByOwner) {
109+
s.mu.Lock()
110+
defer s.mu.Unlock()
111+
112+
// Filter to only the workspaces owned by the user
113+
latest := all[s.userID]
114+
update := produceUpdate(s.prev, latest)
115+
s.prev = latest
116+
s.tx <- update
117+
}
118+
119+
type updatesProvider struct {
120+
mu sync.RWMutex
121+
db tailnet.UpdateQuerier
122+
ps pubsub.Pubsub
123+
// Peer ID -> subscription
124+
subs map[uuid.UUID]*sub
125+
latest workspacesByOwner
126+
cancelFn func()
127+
}
128+
129+
var _ tailnet.WorkspaceUpdatesProvider = (*updatesProvider)(nil)
130+
131+
func NewUpdatesProvider(ctx context.Context, db tailnet.UpdateQuerier, ps pubsub.Pubsub) (tailnet.WorkspaceUpdatesProvider, error) {
132+
rows, err := db.GetWorkspacesAndAgents(ctx)
133+
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
134+
return nil, err
135+
}
136+
out := &updatesProvider{
137+
db: db,
138+
ps: ps,
139+
subs: map[uuid.UUID]*sub{},
140+
latest: convertRows(rows),
141+
}
142+
cancel, err := ps.Subscribe(codersdk.AllWorkspacesNotifyChannel, out.handleUpdate)
143+
if err != nil {
144+
return nil, err
145+
}
146+
out.cancelFn = cancel
147+
return out, nil
148+
}
149+
150+
func (u *updatesProvider) Stop() {
151+
for _, sub := range u.subs {
152+
close(sub.tx)
153+
}
154+
u.cancelFn()
155+
}
156+
157+
func (u *updatesProvider) handleUpdate(ctx context.Context, _ []byte) {
158+
rows, err := u.db.GetWorkspacesAndAgents(ctx)
159+
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
160+
// TODO: Log
161+
return
162+
}
163+
164+
wg := &sync.WaitGroup{}
165+
latest := convertRows(rows)
166+
u.mu.RLock()
167+
for _, sub := range u.subs {
168+
sub := sub
169+
wg.Add(1)
170+
go func() {
171+
sub.send(latest)
172+
defer wg.Done()
173+
}()
174+
}
175+
u.mu.RUnlock()
176+
177+
u.mu.Lock()
178+
u.latest = latest
179+
u.mu.Unlock()
180+
wg.Wait()
181+
}
182+
183+
func (u *updatesProvider) Subscribe(peerID uuid.UUID, userID uuid.UUID) (<-chan *proto.WorkspaceUpdate, error) {
184+
u.mu.Lock()
185+
defer u.mu.Unlock()
186+
187+
tx := make(chan *proto.WorkspaceUpdate, 1)
188+
sub := &sub{
189+
userID: userID,
190+
tx: tx,
191+
prev: make(workspacesByID),
192+
}
193+
u.subs[peerID] = sub
194+
// Write initial state
195+
sub.send(u.latest)
196+
return tx, nil
197+
}
198+
199+
func (u *updatesProvider) Unsubscribe(peerID uuid.UUID) {
200+
u.mu.Lock()
201+
defer u.mu.Unlock()
202+
203+
sub, exists := u.subs[peerID]
204+
if !exists {
205+
return
206+
}
207+
close(sub.tx)
208+
delete(u.subs, peerID)
209+
}
210+
211+
func produceUpdate(old, new workspacesByID) *proto.WorkspaceUpdate {
212+
out := &proto.WorkspaceUpdate{
213+
UpsertedWorkspaces: []*proto.Workspace{},
214+
UpsertedAgents: []*proto.Agent{},
215+
DeletedWorkspaces: []*proto.Workspace{},
216+
DeletedAgents: []*proto.Agent{},
217+
}
218+
219+
for wsID, newWorkspace := range new {
220+
oldWorkspace, exists := old[wsID]
221+
// Upsert both workspace and agents if the workspace is new
222+
if !exists {
223+
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{
224+
Id: tailnet.UUIDToByteSlice(wsID),
225+
Name: newWorkspace.WorkspaceName,
226+
Status: convertStatus(newWorkspace.JobStatus, newWorkspace.Transition),
227+
})
228+
for _, agent := range newWorkspace.Agents {
229+
out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{
230+
Id: tailnet.UUIDToByteSlice(agent.ID),
231+
Name: agent.Name,
232+
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
233+
})
234+
}
235+
continue
236+
}
237+
// Upsert workspace if the workspace is updated
238+
if !newWorkspace.Equal(oldWorkspace) {
239+
out.UpsertedWorkspaces = append(out.UpsertedWorkspaces, &proto.Workspace{
240+
Id: tailnet.UUIDToByteSlice(wsID),
241+
Name: newWorkspace.WorkspaceName,
242+
Status: convertStatus(newWorkspace.JobStatus, newWorkspace.Transition),
243+
})
244+
}
245+
246+
add, remove := slice.SymmetricDifference(oldWorkspace.Agents, newWorkspace.Agents)
247+
for _, agent := range add {
248+
out.UpsertedAgents = append(out.UpsertedAgents, &proto.Agent{
249+
Id: tailnet.UUIDToByteSlice(agent.ID),
250+
Name: agent.Name,
251+
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
252+
})
253+
}
254+
for _, agent := range remove {
255+
out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{
256+
Id: tailnet.UUIDToByteSlice(agent.ID),
257+
Name: agent.Name,
258+
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
259+
})
260+
}
261+
}
262+
263+
// Delete workspace and agents if the workspace is deleted
264+
for wsID, oldWorkspace := range old {
265+
if _, exists := new[wsID]; !exists {
266+
out.DeletedWorkspaces = append(out.DeletedWorkspaces, &proto.Workspace{
267+
Id: tailnet.UUIDToByteSlice(wsID),
268+
Name: oldWorkspace.WorkspaceName,
269+
Status: convertStatus(oldWorkspace.JobStatus, oldWorkspace.Transition),
270+
})
271+
for _, agent := range oldWorkspace.Agents {
272+
out.DeletedAgents = append(out.DeletedAgents, &proto.Agent{
273+
Id: tailnet.UUIDToByteSlice(agent.ID),
274+
Name: agent.Name,
275+
WorkspaceId: tailnet.UUIDToByteSlice(wsID),
276+
})
277+
}
278+
}
279+
}
280+
281+
return out
282+
}

0 commit comments

Comments
 (0)