Skip to content

Commit c5c1316

Browse files
committed
chore: send workspace pubsub events by owner id
1 parent b22bd81 commit c5c1316

17 files changed

+363
-175
lines changed

coderd/agentapi/api.go

+13-18
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/coder/coder/v2/coderd/prometheusmetrics"
2525
"github.com/coder/coder/v2/coderd/tracing"
2626
"github.com/coder/coder/v2/coderd/workspacestats"
27+
"github.com/coder/coder/v2/coderd/wspubsub"
2728
"github.com/coder/coder/v2/codersdk"
2829
"github.com/coder/coder/v2/codersdk/agentsdk"
2930
"github.com/coder/coder/v2/tailnet"
@@ -52,7 +53,9 @@ type API struct {
5253
var _ agentproto.DRPCAgentServer = &API{}
5354

5455
type Options struct {
55-
AgentID uuid.UUID
56+
AgentID uuid.UUID
57+
OwnerID uuid.UUID
58+
WorkspaceID uuid.UUID
5659

5760
Ctx context.Context
5861
Log slog.Logger
@@ -62,7 +65,7 @@ type Options struct {
6265
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
6366
StatsReporter *workspacestats.Reporter
6467
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
65-
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
68+
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
6669
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
6770
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
6871

@@ -75,10 +78,6 @@ type Options struct {
7578
ExternalAuthConfigs []*externalauth.Config
7679
Experiments codersdk.Experiments
7780

78-
// Optional:
79-
// WorkspaceID avoids a future lookup to find the workspace ID by setting
80-
// the cache in advance.
81-
WorkspaceID uuid.UUID
8281
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
8382
}
8483

@@ -98,16 +97,7 @@ func New(opts Options) *API {
9897
AgentFn: api.agent,
9998
Database: opts.Database,
10099
DerpMapFn: opts.DerpMapFn,
101-
WorkspaceIDFn: func(ctx context.Context, wa *database.WorkspaceAgent) (uuid.UUID, error) {
102-
if opts.WorkspaceID != uuid.Nil {
103-
return opts.WorkspaceID, nil
104-
}
105-
ws, err := opts.Database.GetWorkspaceByAgentID(ctx, wa.ID)
106-
if err != nil {
107-
return uuid.Nil, err
108-
}
109-
return ws.Workspace.ID, nil
110-
},
100+
WorkspaceID: opts.WorkspaceID,
111101
}
112102

113103
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{
@@ -125,7 +115,7 @@ func New(opts Options) *API {
125115

126116
api.LifecycleAPI = &LifecycleAPI{
127117
AgentFn: api.agent,
128-
WorkspaceIDFn: api.workspaceID,
118+
WorkspaceID: opts.WorkspaceID,
129119
Database: opts.Database,
130120
Log: opts.Log,
131121
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
@@ -242,6 +232,11 @@ func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.Worksp
242232
return err
243233
}
244234

245-
a.opts.PublishWorkspaceUpdateFn(ctx, workspaceID)
235+
a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, wspubsub.WorkspaceEvent{
236+
Kind: wspubsub.WorkspaceEventKindAgentUpdate,
237+
WorkspaceID: workspaceID,
238+
AgentID: &agent.ID,
239+
AgentName: &agent.Name,
240+
})
246241
return nil
247242
}

coderd/agentapi/lifecycle.go

+3-11
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func WithAPIVersion(ctx context.Context, version string) context.Context {
2525

2626
type LifecycleAPI struct {
2727
AgentFn func(context.Context) (database.WorkspaceAgent, error)
28-
WorkspaceIDFn func(context.Context, *database.WorkspaceAgent) (uuid.UUID, error)
28+
WorkspaceID uuid.UUID
2929
Database database.Store
3030
Log slog.Logger
3131
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
@@ -45,13 +45,9 @@ func (a *LifecycleAPI) UpdateLifecycle(ctx context.Context, req *agentproto.Upda
4545
if err != nil {
4646
return nil, err
4747
}
48-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
49-
if err != nil {
50-
return nil, err
51-
}
5248

5349
logger := a.Log.With(
54-
slog.F("workspace_id", workspaceID),
50+
slog.F("workspace_id", a.WorkspaceID),
5551
slog.F("payload", req),
5652
)
5753
logger.Debug(ctx, "workspace agent state report")
@@ -140,15 +136,11 @@ func (a *LifecycleAPI) UpdateStartup(ctx context.Context, req *agentproto.Update
140136
if err != nil {
141137
return nil, err
142138
}
143-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
144-
if err != nil {
145-
return nil, err
146-
}
147139

148140
a.Log.Debug(
149141
ctx,
150142
"post workspace agent version",
151-
slog.F("workspace_id", workspaceID),
143+
slog.F("workspace_id", a.WorkspaceID),
152144
slog.F("agent_version", req.Startup.Version),
153145
)
154146

coderd/agentapi/lifecycle_test.go

+25-43
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,9 @@ func TestUpdateLifecycle(t *testing.T) {
6969
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
7070
return agentCreated, nil
7171
},
72-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
73-
return workspaceID, nil
74-
},
75-
Database: dbM,
76-
Log: slogtest.Make(t, nil),
72+
WorkspaceID: workspaceID,
73+
Database: dbM,
74+
Log: slogtest.Make(t, nil),
7775
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
7876
publishCalled = true
7977
return nil
@@ -111,11 +109,9 @@ func TestUpdateLifecycle(t *testing.T) {
111109
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
112110
return agentStarting, nil
113111
},
114-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
115-
return workspaceID, nil
116-
},
117-
Database: dbM,
118-
Log: slogtest.Make(t, nil),
112+
WorkspaceID: workspaceID,
113+
Database: dbM,
114+
Log: slogtest.Make(t, nil),
119115
// Test that nil publish fn works.
120116
PublishWorkspaceUpdateFn: nil,
121117
}
@@ -156,11 +152,9 @@ func TestUpdateLifecycle(t *testing.T) {
156152
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
157153
return agentCreated, nil
158154
},
159-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
160-
return workspaceID, nil
161-
},
162-
Database: dbM,
163-
Log: slogtest.Make(t, nil),
155+
WorkspaceID: workspaceID,
156+
Database: dbM,
157+
Log: slogtest.Make(t, nil),
164158
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
165159
publishCalled = true
166160
return nil
@@ -204,9 +198,7 @@ func TestUpdateLifecycle(t *testing.T) {
204198
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
205199
return agentCreated, nil
206200
},
207-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
208-
return workspaceID, nil
209-
},
201+
WorkspaceID: workspaceID,
210202
Database: dbM,
211203
Log: slogtest.Make(t, nil),
212204
PublishWorkspaceUpdateFn: nil,
@@ -239,11 +231,9 @@ func TestUpdateLifecycle(t *testing.T) {
239231
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
240232
return agent, nil
241233
},
242-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
243-
return workspaceID, nil
244-
},
245-
Database: dbM,
246-
Log: slogtest.Make(t, nil),
234+
WorkspaceID: workspaceID,
235+
Database: dbM,
236+
Log: slogtest.Make(t, nil),
247237
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
248238
atomic.AddInt64(&publishCalled, 1)
249239
return nil
@@ -314,11 +304,9 @@ func TestUpdateLifecycle(t *testing.T) {
314304
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
315305
return agentCreated, nil
316306
},
317-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
318-
return workspaceID, nil
319-
},
320-
Database: dbM,
321-
Log: slogtest.Make(t, nil),
307+
WorkspaceID: workspaceID,
308+
Database: dbM,
309+
Log: slogtest.Make(t, nil),
322310
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
323311
publishCalled = true
324312
return nil
@@ -354,11 +342,9 @@ func TestUpdateStartup(t *testing.T) {
354342
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
355343
return agent, nil
356344
},
357-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
358-
return workspaceID, nil
359-
},
360-
Database: dbM,
361-
Log: slogtest.Make(t, nil),
345+
WorkspaceID: workspaceID,
346+
Database: dbM,
347+
Log: slogtest.Make(t, nil),
362348
// Not used by UpdateStartup.
363349
PublishWorkspaceUpdateFn: nil,
364350
}
@@ -402,11 +388,9 @@ func TestUpdateStartup(t *testing.T) {
402388
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
403389
return agent, nil
404390
},
405-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
406-
return workspaceID, nil
407-
},
408-
Database: dbM,
409-
Log: slogtest.Make(t, nil),
391+
WorkspaceID: workspaceID,
392+
Database: dbM,
393+
Log: slogtest.Make(t, nil),
410394
// Not used by UpdateStartup.
411395
PublishWorkspaceUpdateFn: nil,
412396
}
@@ -435,11 +419,9 @@ func TestUpdateStartup(t *testing.T) {
435419
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
436420
return agent, nil
437421
},
438-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
439-
return workspaceID, nil
440-
},
441-
Database: dbM,
442-
Log: slogtest.Make(t, nil),
422+
WorkspaceID: workspaceID,
423+
Database: dbM,
424+
Log: slogtest.Make(t, nil),
443425
// Not used by UpdateStartup.
444426
PublishWorkspaceUpdateFn: nil,
445427
}

coderd/agentapi/manifest.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,18 @@ type ManifestAPI struct {
2929
ExternalAuthConfigs []*externalauth.Config
3030
DisableDirectConnections bool
3131
DerpForceWebSockets bool
32+
WorkspaceID uuid.UUID
3233

33-
AgentFn func(context.Context) (database.WorkspaceAgent, error)
34-
WorkspaceIDFn func(context.Context, *database.WorkspaceAgent) (uuid.UUID, error)
35-
Database database.Store
36-
DerpMapFn func() *tailcfg.DERPMap
34+
AgentFn func(context.Context) (database.WorkspaceAgent, error)
35+
Database database.Store
36+
DerpMapFn func() *tailcfg.DERPMap
3737
}
3838

3939
func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
4040
workspaceAgent, err := a.AgentFn(ctx)
4141
if err != nil {
4242
return nil, err
4343
}
44-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
45-
if err != nil {
46-
return nil, err
47-
}
48-
4944
var (
5045
dbApps []database.WorkspaceApp
5146
scripts []database.WorkspaceAgentScript
@@ -75,7 +70,7 @@ func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifest
7570
return err
7671
})
7772
eg.Go(func() (err error) {
78-
workspace, err = a.Database.GetWorkspaceByID(ctx, workspaceID)
73+
workspace, err = a.Database.GetWorkspaceByID(ctx, a.WorkspaceID)
7974
if err != nil {
8075
return xerrors.Errorf("getting workspace by id: %w", err)
8176
}

coderd/agentapi/manifest_test.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,9 @@ func TestGetManifest(t *testing.T) {
288288
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
289289
return agent, nil
290290
},
291-
WorkspaceIDFn: func(ctx context.Context, _ *database.WorkspaceAgent) (uuid.UUID, error) {
292-
return workspace.ID, nil
293-
},
294-
Database: mDB,
295-
DerpMapFn: derpMapFn,
291+
WorkspaceID: workspace.ID,
292+
Database: mDB,
293+
DerpMapFn: derpMapFn,
296294
}
297295

298296
mDB.EXPECT().GetWorkspaceAppsByAgentID(gomock.Any(), agent.ID).Return(apps, nil)
@@ -355,11 +353,9 @@ func TestGetManifest(t *testing.T) {
355353
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
356354
return agent, nil
357355
},
358-
WorkspaceIDFn: func(ctx context.Context, _ *database.WorkspaceAgent) (uuid.UUID, error) {
359-
return workspace.ID, nil
360-
},
361-
Database: mDB,
362-
DerpMapFn: derpMapFn,
356+
WorkspaceID: workspace.ID,
357+
Database: mDB,
358+
DerpMapFn: derpMapFn,
363359
}
364360

365361
mDB.EXPECT().GetWorkspaceAppsByAgentID(gomock.Any(), agent.ID).Return(apps, nil)

coderd/agentapi/stats_test.go

+21-14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/coder/coder/v2/coderd/schedule"
2424
"github.com/coder/coder/v2/coderd/workspacestats"
2525
"github.com/coder/coder/v2/coderd/workspacestats/workspacestatstest"
26+
"github.com/coder/coder/v2/coderd/wspubsub"
2627
"github.com/coder/coder/v2/codersdk"
2728
"github.com/coder/coder/v2/testutil"
2829
)
@@ -148,12 +149,15 @@ func TestUpdateStates(t *testing.T) {
148149
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)
149150

150151
// Ensure that pubsub notifications are sent.
151-
notifyDescription := make(chan []byte)
152-
ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) {
153-
go func() {
154-
notifyDescription <- description
155-
}()
156-
})
152+
notifyDescription := make(chan struct{})
153+
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
154+
wspubsub.HandleWorkspaceEvent(func(_ context.Context, e wspubsub.WorkspaceEvent) {
155+
if e.Kind == wspubsub.WorkspaceEventKindUpdatedStats && e.WorkspaceID == workspace.ID {
156+
go func() {
157+
notifyDescription <- struct{}{}
158+
}()
159+
}
160+
}))
157161

158162
resp, err := api.UpdateStats(context.Background(), req)
159163
require.NoError(t, err)
@@ -175,7 +179,7 @@ func TestUpdateStates(t *testing.T) {
175179
case <-ctx.Done():
176180
t.Error("timed out while waiting for pubsub notification")
177181
case description := <-notifyDescription:
178-
require.Equal(t, description, []byte{})
182+
require.Equal(t, description, struct{}{})
179183
}
180184
require.True(t, updateAgentMetricsFnCalled)
181185
})
@@ -482,12 +486,15 @@ func TestUpdateStates(t *testing.T) {
482486
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)
483487

484488
// Ensure that pubsub notifications are sent.
485-
notifyDescription := make(chan []byte)
486-
ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) {
487-
go func() {
488-
notifyDescription <- description
489-
}()
490-
})
489+
notifyDescription := make(chan struct{})
490+
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
491+
wspubsub.HandleWorkspaceEvent(func(_ context.Context, e wspubsub.WorkspaceEvent) {
492+
if e.Kind == wspubsub.WorkspaceEventKindUpdatedStats && e.WorkspaceID == workspace.ID {
493+
go func() {
494+
notifyDescription <- struct{}{}
495+
}()
496+
}
497+
}))
491498

492499
resp, err := api.UpdateStats(context.Background(), req)
493500
require.NoError(t, err)
@@ -507,7 +514,7 @@ func TestUpdateStates(t *testing.T) {
507514
case <-ctx.Done():
508515
t.Error("timed out while waiting for pubsub notification")
509516
case description := <-notifyDescription:
510-
require.Equal(t, description, []byte{})
517+
require.Equal(t, description, struct{}{})
511518
}
512519
require.True(t, updateAgentMetricsFnCalled)
513520
})

0 commit comments

Comments
 (0)