Skip to content

Commit 7be63f5

Browse files
committed
review
1 parent 4ef174a commit 7be63f5

File tree

11 files changed

+231
-128
lines changed

11 files changed

+231
-128
lines changed

cli/server.go

-7
Original file line numberDiff line numberDiff line change
@@ -728,13 +728,6 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
728728
options.Database = dbmetrics.NewDBMetrics(options.Database, options.Logger, options.PrometheusRegistry)
729729
}
730730

731-
wsUpdates, err := coderd.NewUpdatesProvider(logger.Named("workspace_updates"), options.Pubsub, options.Database, options.Authorizer)
732-
if err != nil {
733-
return xerrors.Errorf("create workspace updates provider: %w", err)
734-
}
735-
options.WorkspaceUpdatesProvider = wsUpdates
736-
defer wsUpdates.Close()
737-
738731
var deploymentID string
739732
err = options.Database.InTx(func(tx database.Store) error {
740733
// This will block until the lock is acquired, and will be

coderd/coderd.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,6 @@ type Options struct {
227227

228228
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
229229

230-
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
231-
232230
// This janky function is used in telemetry to parse fields out of the raw
233231
// JWT. It needs to be passed through like this because license parsing is
234232
// under the enterprise license, and can't be imported into AGPL.
@@ -495,6 +493,8 @@ func New(options *Options) *API {
495493
}
496494
}
497495

496+
updatesProvider := NewUpdatesProvider(options.Logger.Named("workspace_updates"), options.Pubsub, options.Database, options.Authorizer)
497+
498498
// Start a background process that rotates keys. We intentionally start this after the caches
499499
// are created to force initial requests for a key to populate the caches. This helps catch
500500
// bugs that may only occur when a key isn't precached in tests and the latency cost is minimal.
@@ -525,6 +525,7 @@ func New(options *Options) *API {
525525
metricsCache: metricsCache,
526526
Auditor: atomic.Pointer[audit.Auditor]{},
527527
TailnetCoordinator: atomic.Pointer[tailnet.Coordinator]{},
528+
UpdatesProvider: updatesProvider,
528529
TemplateScheduleStore: options.TemplateScheduleStore,
529530
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
530531
AccessControlStore: options.AccessControlStore,
@@ -660,7 +661,7 @@ func New(options *Options) *API {
660661
DERPMapFn: api.DERPMap,
661662
NetworkTelemetryHandler: api.NetworkTelemetryBatcher.Handler,
662663
ResumeTokenProvider: api.Options.CoordinatorResumeTokenProvider,
663-
WorkspaceUpdatesProvider: api.Options.WorkspaceUpdatesProvider,
664+
WorkspaceUpdatesProvider: api.UpdatesProvider,
664665
})
665666
if err != nil {
666667
api.Logger.Fatal(context.Background(), "failed to initialize tailnet client service", slog.Error(err))
@@ -1415,6 +1416,8 @@ type API struct {
14151416
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
14161417
PortSharer atomic.Pointer[portsharing.PortSharer]
14171418

1419+
UpdatesProvider tailnet.WorkspaceUpdatesProvider
1420+
14181421
HTTPAuth *HTTPAuthorizer
14191422

14201423
// APIHandler serves "/api/v2"
@@ -1496,6 +1499,7 @@ func (api *API) Close() error {
14961499
_ = api.OIDCConvertKeyCache.Close()
14971500
_ = api.AppSigningKeyCache.Close()
14981501
_ = api.AppEncryptionKeyCache.Close()
1502+
_ = api.UpdatesProvider.Close()
14991503
return nil
15001504
}
15011505

coderd/coderdtest/coderdtest.go

-17
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,6 @@ type Options struct {
163163
APIKeyEncryptionCache cryptokeys.EncryptionKeycache
164164
OIDCConvertKeyCache cryptokeys.SigningKeycache
165165
Clock quartz.Clock
166-
167-
WorkspaceUpdatesProvider tailnet.WorkspaceUpdatesProvider
168166
}
169167

170168
// New constructs a codersdk client connected to an in-memory API instance.
@@ -256,20 +254,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
256254
options.NotificationsEnqueuer = new(testutil.FakeNotificationsEnqueuer)
257255
}
258256

259-
if options.WorkspaceUpdatesProvider == nil {
260-
var err error
261-
options.WorkspaceUpdatesProvider, err = coderd.NewUpdatesProvider(
262-
options.Logger.Named("workspace_updates"),
263-
options.Pubsub,
264-
options.Database,
265-
options.Authorizer,
266-
)
267-
require.NoError(t, err)
268-
t.Cleanup(func() {
269-
_ = options.WorkspaceUpdatesProvider.Close()
270-
})
271-
}
272-
273257
accessControlStore := &atomic.Pointer[dbauthz.AccessControlStore]{}
274258
var acs dbauthz.AccessControlStore = dbauthz.AGPLTemplateAccessControlStore{}
275259
accessControlStore.Store(&acs)
@@ -547,7 +531,6 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
547531
HealthcheckTimeout: options.HealthcheckTimeout,
548532
HealthcheckRefresh: options.HealthcheckRefresh,
549533
StatsBatcher: options.StatsBatcher,
550-
WorkspaceUpdatesProvider: options.WorkspaceUpdatesProvider,
551534
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
552535
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
553536
NewTicker: options.NewTicker,

coderd/workspaceupdates.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/coder/coder/v2/coderd/database/dbauthz"
1515
"github.com/coder/coder/v2/coderd/database/pubsub"
1616
"github.com/coder/coder/v2/coderd/rbac"
17-
"github.com/coder/coder/v2/coderd/rbac/policy"
1817
"github.com/coder/coder/v2/coderd/util/slice"
1918
"github.com/coder/coder/v2/coderd/wspubsub"
2019
"github.com/coder/coder/v2/codersdk"
@@ -23,7 +22,8 @@ import (
2322
)
2423

2524
type UpdatesQuerier interface {
26-
GetAuthorizedWorkspacesAndAgentsByOwnerID(ctx context.Context, ownerID uuid.UUID, prep rbac.PreparedAuthorized) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error)
25+
// GetAuthorizedWorkspacesAndAgentsByOwnerID requires a context with an actor set
26+
GetWorkspacesAndAgentsByOwnerID(ctx context.Context, ownerID uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error)
2727
GetWorkspaceByAgentID(ctx context.Context, agentID uuid.UUID) (database.Workspace, error)
2828
}
2929

@@ -42,14 +42,14 @@ func (w ownedWorkspace) Equal(other ownedWorkspace) bool {
4242
}
4343

4444
type sub struct {
45+
// ALways contains an actor
4546
ctx context.Context
4647
cancelFn context.CancelFunc
4748

48-
mu sync.RWMutex
49-
userID uuid.UUID
50-
ch chan *proto.WorkspaceUpdate
51-
prev workspacesByID
52-
readPrep rbac.PreparedAuthorized
49+
mu sync.RWMutex
50+
userID uuid.UUID
51+
ch chan *proto.WorkspaceUpdate
52+
prev workspacesByID
5353

5454
db UpdatesQuerier
5555
ps pubsub.Pubsub
@@ -76,7 +76,8 @@ func (s *sub) handleEvent(ctx context.Context, event wspubsub.WorkspaceEvent, er
7676
}
7777
}
7878

79-
rows, err := s.db.GetAuthorizedWorkspacesAndAgentsByOwnerID(ctx, s.userID, s.readPrep)
79+
// Use context containing actor
80+
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(s.ctx, s.userID)
8081
if err != nil {
8182
s.logger.Warn(ctx, "failed to get workspaces and agents by owner ID", slog.Error(err))
8283
return
@@ -97,7 +98,7 @@ func (s *sub) handleEvent(ctx context.Context, event wspubsub.WorkspaceEvent, er
9798
}
9899

99100
func (s *sub) start(ctx context.Context) (err error) {
100-
rows, err := s.db.GetAuthorizedWorkspacesAndAgentsByOwnerID(ctx, s.userID, s.readPrep)
101+
rows, err := s.db.GetWorkspacesAndAgentsByOwnerID(ctx, s.userID)
101102
if err != nil {
102103
return xerrors.Errorf("get workspaces and agents by owner ID: %w", err)
103104
}
@@ -150,7 +151,7 @@ func NewUpdatesProvider(
150151
ps pubsub.Pubsub,
151152
db UpdatesQuerier,
152153
auth rbac.Authorizer,
153-
) (tailnet.WorkspaceUpdatesProvider, error) {
154+
) tailnet.WorkspaceUpdatesProvider {
154155
ctx, cancel := context.WithCancel(context.Background())
155156
out := &updatesProvider{
156157
auth: auth,
@@ -160,25 +161,25 @@ func NewUpdatesProvider(
160161
ctx: ctx,
161162
cancelFn: cancel,
162163
}
163-
return out, nil
164+
return out
164165
}
165166

166167
func (u *updatesProvider) Close() error {
167168
u.cancelFn()
168169
return nil
169170
}
170171

172+
// Subscribe subscribes to workspace updates for a user, for the workspaces
173+
// that user is authorized to `ActionRead` on. The provided context must have
174+
// a dbauthz actor set.
171175
func (u *updatesProvider) Subscribe(ctx context.Context, userID uuid.UUID) (tailnet.Subscription, error) {
172176
actor, ok := dbauthz.ActorFromContext(ctx)
173177
if !ok {
174178
return nil, xerrors.Errorf("actor not found in context")
175179
}
176-
readPrep, err := u.auth.Prepare(ctx, actor, policy.ActionRead, rbac.ResourceWorkspace.Type)
177-
if err != nil {
178-
return nil, xerrors.Errorf("prepare read action: %w", err)
179-
}
180+
ctx, cancel := context.WithCancel(u.ctx)
181+
ctx = dbauthz.As(ctx, actor)
180182
ch := make(chan *proto.WorkspaceUpdate, 1)
181-
ctx, cancel := context.WithCancel(ctx)
182183
sub := &sub{
183184
ctx: ctx,
184185
cancelFn: cancel,
@@ -188,9 +189,8 @@ func (u *updatesProvider) Subscribe(ctx context.Context, userID uuid.UUID) (tail
188189
ps: u.ps,
189190
logger: u.logger.Named(fmt.Sprintf("workspace_updates_subscriber_%s", userID)),
190191
prev: workspacesByID{},
191-
readPrep: readPrep,
192192
}
193-
err = sub.start(ctx)
193+
err := sub.start(ctx)
194194
if err != nil {
195195
_ = sub.Close()
196196
return nil, err

coderd/workspaceupdates_test.go

+52-26
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,23 @@ import (
2525

2626
func TestWorkspaceUpdates(t *testing.T) {
2727
t.Parallel()
28-
ctx := context.Background()
2928

30-
ws1ID := uuid.New()
29+
ws1ID := uuid.UUID{0x01}
3130
ws1IDSlice := tailnet.UUIDToByteSlice(ws1ID)
32-
agent1ID := uuid.New()
31+
agent1ID := uuid.UUID{0x02}
3332
agent1IDSlice := tailnet.UUIDToByteSlice(agent1ID)
34-
ws2ID := uuid.New()
33+
ws2ID := uuid.UUID{0x03}
3534
ws2IDSlice := tailnet.UUIDToByteSlice(ws2ID)
36-
ws3ID := uuid.New()
35+
ws3ID := uuid.UUID{0x04}
3736
ws3IDSlice := tailnet.UUIDToByteSlice(ws3ID)
38-
agent2ID := uuid.New()
37+
agent2ID := uuid.UUID{0x05}
3938
agent2IDSlice := tailnet.UUIDToByteSlice(agent2ID)
40-
ws4ID := uuid.New()
39+
ws4ID := uuid.UUID{0x06}
4140
ws4IDSlice := tailnet.UUIDToByteSlice(ws4ID)
41+
agent3ID := uuid.UUID{0x07}
42+
agent3IDSlice := tailnet.UUIDToByteSlice(agent3ID)
4243

43-
ownerID := uuid.New()
44+
ownerID := uuid.UUID{0x08}
4445
memberRole, err := rbac.RoleByName(rbac.RoleMember())
4546
require.NoError(t, err)
4647
ownerSubject := rbac.Subject{
@@ -53,9 +54,11 @@ func TestWorkspaceUpdates(t *testing.T) {
5354
t.Run("Basic", func(t *testing.T) {
5455
t.Parallel()
5556

57+
ctx := testutil.Context(t, testutil.WaitShort)
58+
5659
db := &mockWorkspaceStore{
5760
orderedRows: []database.GetWorkspacesAndAgentsByOwnerIDRow{
58-
// Gains a new agent
61+
// Gains agent2
5962
{
6063
ID: ws1ID,
6164
Name: "ws1",
@@ -81,6 +84,12 @@ func TestWorkspaceUpdates(t *testing.T) {
8184
Name: "ws3",
8285
JobStatus: database.ProvisionerJobStatusSucceeded,
8386
Transition: database.WorkspaceTransitionStop,
87+
Agents: []database.AgentIDNamePair{
88+
{
89+
ID: agent3ID,
90+
Name: "agent3",
91+
},
92+
},
8493
},
8594
},
8695
}
@@ -89,21 +98,24 @@ func TestWorkspaceUpdates(t *testing.T) {
8998
cbs: map[string]pubsub.ListenerWithErr{},
9099
}
91100

92-
updateProvider, err := coderd.NewUpdatesProvider(slogtest.Make(t, nil), ps, db, &mockAuthorizer{})
93-
require.NoError(t, err)
101+
updateProvider := coderd.NewUpdatesProvider(slogtest.Make(t, nil), ps, db, &mockAuthorizer{})
94102
t.Cleanup(func() {
95103
_ = updateProvider.Close()
96104
})
97105

98106
sub, err := updateProvider.Subscribe(dbauthz.As(ctx, ownerSubject), ownerID)
99107
require.NoError(t, err)
100-
ch := sub.Updates()
108+
t.Cleanup(func() {
109+
_ = sub.Close()
110+
})
101111

102-
update, ok := <-ch
103-
require.True(t, ok)
112+
update := testutil.RequireRecvCtx(ctx, t, sub.Updates())
104113
slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int {
105114
return strings.Compare(a.Name, b.Name)
106115
})
116+
slices.SortFunc(update.UpsertedAgents, func(a, b *proto.Agent) int {
117+
return strings.Compare(a.Name, b.Name)
118+
})
107119
require.Equal(t, &proto.WorkspaceUpdate{
108120
UpsertedWorkspaces: []*proto.Workspace{
109121
{
@@ -128,6 +140,11 @@ func TestWorkspaceUpdates(t *testing.T) {
128140
Name: "agent1",
129141
WorkspaceId: ws1IDSlice,
130142
},
143+
{
144+
Id: agent3IDSlice,
145+
Name: "agent3",
146+
WorkspaceId: ws3IDSlice,
147+
},
131148
},
132149
DeletedWorkspaces: []*proto.Workspace{},
133150
DeletedAgents: []*proto.Agent{},
@@ -169,8 +186,7 @@ func TestWorkspaceUpdates(t *testing.T) {
169186
WorkspaceID: ws1ID,
170187
})
171188

172-
update, ok = <-ch
173-
require.True(t, ok)
189+
update = testutil.RequireRecvCtx(ctx, t, sub.Updates())
174190
slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int {
175191
return strings.Compare(a.Name, b.Name)
176192
})
@@ -203,13 +219,21 @@ func TestWorkspaceUpdates(t *testing.T) {
203219
Status: proto.Workspace_STOPPED,
204220
},
205221
},
206-
DeletedAgents: []*proto.Agent{},
222+
DeletedAgents: []*proto.Agent{
223+
{
224+
Id: agent3IDSlice,
225+
Name: "agent3",
226+
WorkspaceId: ws3IDSlice,
227+
},
228+
},
207229
}, update)
208230
})
209231

210232
t.Run("Resubscribe", func(t *testing.T) {
211233
t.Parallel()
212234

235+
ctx := testutil.Context(t, testutil.WaitShort)
236+
213237
db := &mockWorkspaceStore{
214238
orderedRows: []database.GetWorkspacesAndAgentsByOwnerIDRow{
215239
{
@@ -231,15 +255,16 @@ func TestWorkspaceUpdates(t *testing.T) {
231255
cbs: map[string]pubsub.ListenerWithErr{},
232256
}
233257

234-
updateProvider, err := coderd.NewUpdatesProvider(slogtest.Make(t, nil), ps, db, &mockAuthorizer{})
235-
require.NoError(t, err)
258+
updateProvider := coderd.NewUpdatesProvider(slogtest.Make(t, nil), ps, db, &mockAuthorizer{})
236259
t.Cleanup(func() {
237260
_ = updateProvider.Close()
238261
})
239262

240263
sub, err := updateProvider.Subscribe(dbauthz.As(ctx, ownerSubject), ownerID)
241264
require.NoError(t, err)
242-
ch := sub.Updates()
265+
t.Cleanup(func() {
266+
_ = sub.Close()
267+
})
243268

244269
expected := &proto.WorkspaceUpdate{
245270
UpsertedWorkspaces: []*proto.Workspace{
@@ -260,18 +285,19 @@ func TestWorkspaceUpdates(t *testing.T) {
260285
DeletedAgents: []*proto.Agent{},
261286
}
262287

263-
update := testutil.RequireRecvCtx(ctx, t, ch)
288+
update := testutil.RequireRecvCtx(ctx, t, sub.Updates())
264289
slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int {
265290
return strings.Compare(a.Name, b.Name)
266291
})
267292
require.Equal(t, expected, update)
268293

294+
resub, err := updateProvider.Subscribe(dbauthz.As(ctx, ownerSubject), ownerID)
269295
require.NoError(t, err)
270-
sub, err = updateProvider.Subscribe(dbauthz.As(ctx, ownerSubject), ownerID)
271-
require.NoError(t, err)
272-
ch = sub.Updates()
296+
t.Cleanup(func() {
297+
_ = resub.Close()
298+
})
273299

274-
update = testutil.RequireRecvCtx(ctx, t, ch)
300+
update = testutil.RequireRecvCtx(ctx, t, resub.Updates())
275301
slices.SortFunc(update.UpsertedWorkspaces, func(a, b *proto.Workspace) int {
276302
return strings.Compare(a.Name, b.Name)
277303
})
@@ -290,7 +316,7 @@ type mockWorkspaceStore struct {
290316
}
291317

292318
// GetAuthorizedWorkspacesAndAgentsByOwnerID implements coderd.UpdatesQuerier.
293-
func (m *mockWorkspaceStore) GetAuthorizedWorkspacesAndAgentsByOwnerID(context.Context, uuid.UUID, rbac.PreparedAuthorized) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error) {
319+
func (m *mockWorkspaceStore) GetWorkspacesAndAgentsByOwnerID(context.Context, uuid.UUID) ([]database.GetWorkspacesAndAgentsByOwnerIDRow, error) {
294320
return m.orderedRows, nil
295321
}
296322

0 commit comments

Comments
 (0)