Skip to content

Commit 31506e6

Browse files
chore: send workspace pubsub events by owner id (coder#14964)
We currently send empty payloads to pubsub channels of the form `workspace:<workspace_id>` to notify listeners of updates to workspaces (such as for refreshing the workspace dashboard). To support coder#14716, we'll instead send `WorkspaceEvent` payloads to pubsub channels of the form `workspace_owner:<owner_id>`. This enables a listener to receive events for all workspaces owned by a user. This PR replaces the usage of the old channels without modifying any existing behaviors. ``` type WorkspaceEvent struct { Kind WorkspaceEventKind `json:"kind"` WorkspaceID uuid.UUID `json:"workspace_id" format:"uuid"` // AgentID is only set for WorkspaceEventKindAgent* events // (excluding AgentTimeout) AgentID *uuid.UUID `json:"agent_id,omitempty" format:"uuid"` } ``` We've defined `WorkspaceEventKind`s based on how the old channel was used, but it's not yet necessary to inspect the types of any of the events, as the existing listeners are designed to fire off any of them. ``` WorkspaceEventKindStateChange WorkspaceEventKind = "state_change" WorkspaceEventKindStatsUpdate WorkspaceEventKind = "stats_update" WorkspaceEventKindMetadataUpdate WorkspaceEventKind = "mtd_update" WorkspaceEventKindAppHealthUpdate WorkspaceEventKind = "app_health" WorkspaceEventKindAgentLifecycleUpdate WorkspaceEventKind = "agt_lifecycle_update" WorkspaceEventKindAgentLogsUpdate WorkspaceEventKind = "agt_logs_update" WorkspaceEventKindAgentConnectionUpdate WorkspaceEventKind = "agt_connection_update" WorkspaceEventKindAgentLogsOverflow WorkspaceEventKind = "agt_logs_overflow" WorkspaceEventKindAgentTimeout WorkspaceEventKind = "agt_timeout" ```
1 parent 088f219 commit 31506e6

21 files changed

+396
-259
lines changed

coderd/agentapi/api.go

Lines changed: 16 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/coder/coder/v2/coderd/prometheusmetrics"
2525
"github.com/coder/coder/v2/coderd/tracing"
2626
"github.com/coder/coder/v2/coderd/workspacestats"
27+
"github.com/coder/coder/v2/coderd/wspubsub"
2728
"github.com/coder/coder/v2/codersdk"
2829
"github.com/coder/coder/v2/codersdk/agentsdk"
2930
"github.com/coder/coder/v2/tailnet"
@@ -45,14 +46,15 @@ type API struct {
4546
*ScriptsAPI
4647
*tailnet.DRPCService
4748

48-
mu sync.Mutex
49-
cachedWorkspaceID uuid.UUID
49+
mu sync.Mutex
5050
}
5151

5252
var _ agentproto.DRPCAgentServer = &API{}
5353

5454
type Options struct {
55-
AgentID uuid.UUID
55+
AgentID uuid.UUID
56+
OwnerID uuid.UUID
57+
WorkspaceID uuid.UUID
5658

5759
Ctx context.Context
5860
Log slog.Logger
@@ -62,7 +64,7 @@ type Options struct {
6264
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
6365
StatsReporter *workspacestats.Reporter
6466
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
65-
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
67+
PublishWorkspaceUpdateFn func(ctx context.Context, userID uuid.UUID, event wspubsub.WorkspaceEvent)
6668
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
6769
NetworkTelemetryHandler func(batch []*tailnetproto.TelemetryEvent)
6870

@@ -75,18 +77,13 @@ type Options struct {
7577
ExternalAuthConfigs []*externalauth.Config
7678
Experiments codersdk.Experiments
7779

78-
// Optional:
79-
// WorkspaceID avoids a future lookup to find the workspace ID by setting
80-
// the cache in advance.
81-
WorkspaceID uuid.UUID
8280
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
8381
}
8482

8583
func New(opts Options) *API {
8684
api := &API{
87-
opts: opts,
88-
mu: sync.Mutex{},
89-
cachedWorkspaceID: opts.WorkspaceID,
85+
opts: opts,
86+
mu: sync.Mutex{},
9087
}
9188

9289
api.ManifestAPI = &ManifestAPI{
@@ -98,16 +95,7 @@ func New(opts Options) *API {
9895
AgentFn: api.agent,
9996
Database: opts.Database,
10097
DerpMapFn: opts.DerpMapFn,
101-
WorkspaceIDFn: func(ctx context.Context, wa *database.WorkspaceAgent) (uuid.UUID, error) {
102-
if opts.WorkspaceID != uuid.Nil {
103-
return opts.WorkspaceID, nil
104-
}
105-
ws, err := opts.Database.GetWorkspaceByAgentID(ctx, wa.ID)
106-
if err != nil {
107-
return uuid.Nil, err
108-
}
109-
return ws.ID, nil
110-
},
98+
WorkspaceID: opts.WorkspaceID,
11199
}
112100

113101
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{
@@ -125,7 +113,7 @@ func New(opts Options) *API {
125113

126114
api.LifecycleAPI = &LifecycleAPI{
127115
AgentFn: api.agent,
128-
WorkspaceIDFn: api.workspaceID,
116+
WorkspaceID: opts.WorkspaceID,
129117
Database: opts.Database,
130118
Log: opts.Log,
131119
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
@@ -209,39 +197,11 @@ func (a *API) agent(ctx context.Context) (database.WorkspaceAgent, error) {
209197
return agent, nil
210198
}
211199

212-
func (a *API) workspaceID(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
213-
a.mu.Lock()
214-
if a.cachedWorkspaceID != uuid.Nil {
215-
id := a.cachedWorkspaceID
216-
a.mu.Unlock()
217-
return id, nil
218-
}
219-
220-
if agent == nil {
221-
agnt, err := a.agent(ctx)
222-
if err != nil {
223-
return uuid.Nil, err
224-
}
225-
agent = &agnt
226-
}
227-
228-
getWorkspaceAgentByIDRow, err := a.opts.Database.GetWorkspaceByAgentID(ctx, agent.ID)
229-
if err != nil {
230-
return uuid.Nil, xerrors.Errorf("get workspace by agent id %q: %w", agent.ID, err)
231-
}
232-
233-
a.mu.Lock()
234-
a.cachedWorkspaceID = getWorkspaceAgentByIDRow.ID
235-
a.mu.Unlock()
236-
return getWorkspaceAgentByIDRow.ID, nil
237-
}
238-
239-
func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent) error {
240-
workspaceID, err := a.workspaceID(ctx, agent)
241-
if err != nil {
242-
return err
243-
}
244-
245-
a.opts.PublishWorkspaceUpdateFn(ctx, workspaceID)
200+
func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
201+
a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, wspubsub.WorkspaceEvent{
202+
Kind: kind,
203+
WorkspaceID: a.opts.WorkspaceID,
204+
AgentID: &agent.ID,
205+
})
246206
return nil
247207
}

coderd/agentapi/apps.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ import (
99
"cdr.dev/slog"
1010
agentproto "github.com/coder/coder/v2/agent/proto"
1111
"github.com/coder/coder/v2/coderd/database"
12+
"github.com/coder/coder/v2/coderd/wspubsub"
1213
)
1314

1415
type AppsAPI struct {
1516
AgentFn func(context.Context) (database.WorkspaceAgent, error)
1617
Database database.Store
1718
Log slog.Logger
18-
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
19+
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent, wspubsub.WorkspaceEventKind) error
1920
}
2021

2122
func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
@@ -96,7 +97,7 @@ func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.Bat
9697
}
9798

9899
if a.PublishWorkspaceUpdateFn != nil && len(newApps) > 0 {
99-
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
100+
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAppHealthUpdate)
100101
if err != nil {
101102
return nil, xerrors.Errorf("publish workspace update: %w", err)
102103
}

coderd/agentapi/apps_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/coder/coder/v2/coderd/agentapi"
1515
"github.com/coder/coder/v2/coderd/database"
1616
"github.com/coder/coder/v2/coderd/database/dbmock"
17+
"github.com/coder/coder/v2/coderd/wspubsub"
1718
)
1819

1920
func TestBatchUpdateAppHealths(t *testing.T) {
@@ -62,7 +63,7 @@ func TestBatchUpdateAppHealths(t *testing.T) {
6263
},
6364
Database: dbM,
6465
Log: slogtest.Make(t, nil),
65-
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
66+
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
6667
publishCalled = true
6768
return nil
6869
},
@@ -100,7 +101,7 @@ func TestBatchUpdateAppHealths(t *testing.T) {
100101
},
101102
Database: dbM,
102103
Log: slogtest.Make(t, nil),
103-
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
104+
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
104105
publishCalled = true
105106
return nil
106107
},
@@ -139,7 +140,7 @@ func TestBatchUpdateAppHealths(t *testing.T) {
139140
},
140141
Database: dbM,
141142
Log: slogtest.Make(t, nil),
142-
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
143+
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
143144
publishCalled = true
144145
return nil
145146
},

coderd/agentapi/lifecycle.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
agentproto "github.com/coder/coder/v2/agent/proto"
1616
"github.com/coder/coder/v2/coderd/database"
1717
"github.com/coder/coder/v2/coderd/database/dbtime"
18+
"github.com/coder/coder/v2/coderd/wspubsub"
1819
)
1920

2021
type contextKeyAPIVersion struct{}
@@ -25,10 +26,10 @@ func WithAPIVersion(ctx context.Context, version string) context.Context {
2526

2627
type LifecycleAPI struct {
2728
AgentFn func(context.Context) (database.WorkspaceAgent, error)
28-
WorkspaceIDFn func(context.Context, *database.WorkspaceAgent) (uuid.UUID, error)
29+
WorkspaceID uuid.UUID
2930
Database database.Store
3031
Log slog.Logger
31-
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
32+
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent, wspubsub.WorkspaceEventKind) error
3233

3334
TimeNowFn func() time.Time // defaults to dbtime.Now()
3435
}
@@ -45,13 +46,9 @@ func (a *LifecycleAPI) UpdateLifecycle(ctx context.Context, req *agentproto.Upda
4546
if err != nil {
4647
return nil, err
4748
}
48-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
49-
if err != nil {
50-
return nil, err
51-
}
5249

5350
logger := a.Log.With(
54-
slog.F("workspace_id", workspaceID),
51+
slog.F("workspace_id", a.WorkspaceID),
5552
slog.F("payload", req),
5653
)
5754
logger.Debug(ctx, "workspace agent state report")
@@ -122,7 +119,7 @@ func (a *LifecycleAPI) UpdateLifecycle(ctx context.Context, req *agentproto.Upda
122119
}
123120

124121
if a.PublishWorkspaceUpdateFn != nil {
125-
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
122+
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAgentLifecycleUpdate)
126123
if err != nil {
127124
return nil, xerrors.Errorf("publish workspace update: %w", err)
128125
}
@@ -140,15 +137,11 @@ func (a *LifecycleAPI) UpdateStartup(ctx context.Context, req *agentproto.Update
140137
if err != nil {
141138
return nil, err
142139
}
143-
workspaceID, err := a.WorkspaceIDFn(ctx, &workspaceAgent)
144-
if err != nil {
145-
return nil, err
146-
}
147140

148141
a.Log.Debug(
149142
ctx,
150143
"post workspace agent version",
151-
slog.F("workspace_id", workspaceID),
144+
slog.F("workspace_id", a.WorkspaceID),
152145
slog.F("agent_version", req.Startup.Version),
153146
)
154147

coderd/agentapi/lifecycle_test.go

Lines changed: 30 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/coder/coder/v2/coderd/database"
2020
"github.com/coder/coder/v2/coderd/database/dbmock"
2121
"github.com/coder/coder/v2/coderd/database/dbtime"
22+
"github.com/coder/coder/v2/coderd/wspubsub"
2223
)
2324

2425
func TestUpdateLifecycle(t *testing.T) {
@@ -69,12 +70,10 @@ func TestUpdateLifecycle(t *testing.T) {
6970
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
7071
return agentCreated, nil
7172
},
72-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
73-
return workspaceID, nil
74-
},
75-
Database: dbM,
76-
Log: slogtest.Make(t, nil),
77-
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
73+
WorkspaceID: workspaceID,
74+
Database: dbM,
75+
Log: slogtest.Make(t, nil),
76+
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
7877
publishCalled = true
7978
return nil
8079
},
@@ -111,11 +110,9 @@ func TestUpdateLifecycle(t *testing.T) {
111110
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
112111
return agentStarting, nil
113112
},
114-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
115-
return workspaceID, nil
116-
},
117-
Database: dbM,
118-
Log: slogtest.Make(t, nil),
113+
WorkspaceID: workspaceID,
114+
Database: dbM,
115+
Log: slogtest.Make(t, nil),
119116
// Test that nil publish fn works.
120117
PublishWorkspaceUpdateFn: nil,
121118
}
@@ -156,12 +153,10 @@ func TestUpdateLifecycle(t *testing.T) {
156153
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
157154
return agentCreated, nil
158155
},
159-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
160-
return workspaceID, nil
161-
},
162-
Database: dbM,
163-
Log: slogtest.Make(t, nil),
164-
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
156+
WorkspaceID: workspaceID,
157+
Database: dbM,
158+
Log: slogtest.Make(t, nil),
159+
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
165160
publishCalled = true
166161
return nil
167162
},
@@ -204,9 +199,7 @@ func TestUpdateLifecycle(t *testing.T) {
204199
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
205200
return agentCreated, nil
206201
},
207-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
208-
return workspaceID, nil
209-
},
202+
WorkspaceID: workspaceID,
210203
Database: dbM,
211204
Log: slogtest.Make(t, nil),
212205
PublishWorkspaceUpdateFn: nil,
@@ -239,12 +232,10 @@ func TestUpdateLifecycle(t *testing.T) {
239232
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
240233
return agent, nil
241234
},
242-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
243-
return workspaceID, nil
244-
},
245-
Database: dbM,
246-
Log: slogtest.Make(t, nil),
247-
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
235+
WorkspaceID: workspaceID,
236+
Database: dbM,
237+
Log: slogtest.Make(t, nil),
238+
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
248239
atomic.AddInt64(&publishCalled, 1)
249240
return nil
250241
},
@@ -314,12 +305,10 @@ func TestUpdateLifecycle(t *testing.T) {
314305
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
315306
return agentCreated, nil
316307
},
317-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
318-
return workspaceID, nil
319-
},
320-
Database: dbM,
321-
Log: slogtest.Make(t, nil),
322-
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
308+
WorkspaceID: workspaceID,
309+
Database: dbM,
310+
Log: slogtest.Make(t, nil),
311+
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
323312
publishCalled = true
324313
return nil
325314
},
@@ -354,11 +343,9 @@ func TestUpdateStartup(t *testing.T) {
354343
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
355344
return agent, nil
356345
},
357-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
358-
return workspaceID, nil
359-
},
360-
Database: dbM,
361-
Log: slogtest.Make(t, nil),
346+
WorkspaceID: workspaceID,
347+
Database: dbM,
348+
Log: slogtest.Make(t, nil),
362349
// Not used by UpdateStartup.
363350
PublishWorkspaceUpdateFn: nil,
364351
}
@@ -402,11 +389,9 @@ func TestUpdateStartup(t *testing.T) {
402389
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
403390
return agent, nil
404391
},
405-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
406-
return workspaceID, nil
407-
},
408-
Database: dbM,
409-
Log: slogtest.Make(t, nil),
392+
WorkspaceID: workspaceID,
393+
Database: dbM,
394+
Log: slogtest.Make(t, nil),
410395
// Not used by UpdateStartup.
411396
PublishWorkspaceUpdateFn: nil,
412397
}
@@ -435,11 +420,9 @@ func TestUpdateStartup(t *testing.T) {
435420
AgentFn: func(ctx context.Context) (database.WorkspaceAgent, error) {
436421
return agent, nil
437422
},
438-
WorkspaceIDFn: func(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
439-
return workspaceID, nil
440-
},
441-
Database: dbM,
442-
Log: slogtest.Make(t, nil),
423+
WorkspaceID: workspaceID,
424+
Database: dbM,
425+
Log: slogtest.Make(t, nil),
443426
// Not used by UpdateStartup.
444427
PublishWorkspaceUpdateFn: nil,
445428
}

0 commit comments

Comments
 (0)