Skip to content

Commit 85d80ad

Browse files
committed
use subscribewitherr
1 parent 899c7be commit 85d80ad

File tree

5 files changed

+55
-39
lines changed

5 files changed

+55
-39
lines changed

coderd/agentapi/stats_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
"go.uber.org/mock/gomock"
1414
"google.golang.org/protobuf/types/known/durationpb"
1515

16-
"cdr.dev/slog/sloggers/slogtest"
17-
1816
agentproto "github.com/coder/coder/v2/agent/proto"
1917
"github.com/coder/coder/v2/coderd/agentapi"
2018
"github.com/coder/coder/v2/coderd/database"
@@ -157,10 +155,12 @@ func TestUpdateStates(t *testing.T) {
157155

158156
// Ensure that pubsub notifications are sent.
159157
notifyDescription := make(chan struct{})
160-
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
158+
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
161159
wspubsub.HandleWorkspaceEvent(
162-
slogtest.Make(t, nil),
163-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
160+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
161+
if err != nil {
162+
return
163+
}
164164
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
165165
go func() {
166166
notifyDescription <- struct{}{}
@@ -503,10 +503,12 @@ func TestUpdateStates(t *testing.T) {
503503

504504
// Ensure that pubsub notifications are sent.
505505
notifyDescription := make(chan struct{})
506-
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
506+
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
507507
wspubsub.HandleWorkspaceEvent(
508-
slogtest.Make(t, nil),
509-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
508+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
509+
if err != nil {
510+
return
511+
}
510512
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
511513
go func() {
512514
notifyDescription <- struct{}{}

coderd/provisionerdserver/provisionerdserver_test.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,12 @@ func TestAcquireJob(t *testing.T) {
296296

297297
startPublished := make(chan struct{})
298298
var closed bool
299-
closeStartSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
299+
closeStartSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
300300
wspubsub.HandleWorkspaceEvent(
301-
slogtest.Make(t, nil),
302-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
301+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
302+
if err != nil {
303+
return
304+
}
303305
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
304306
if !closed {
305307
close(startPublished)
@@ -404,10 +406,12 @@ func TestAcquireJob(t *testing.T) {
404406
})
405407

406408
stopPublished := make(chan struct{})
407-
closeStopSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
409+
closeStopSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
408410
wspubsub.HandleWorkspaceEvent(
409-
slogtest.Make(t, nil),
410-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
411+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
412+
if err != nil {
413+
return
414+
}
411415
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
412416
close(stopPublished)
413417
}
@@ -885,7 +889,7 @@ func TestFailJob(t *testing.T) {
885889
auditor: auditor,
886890
})
887891
org := dbgen.Organization(t, db, database.Organization{})
888-
workspace := dbgen.Workspace(t, db, database.Workspace{
892+
workspace := dbgen.Workspace(t, db, database.WorkspaceTable{
889893
ID: uuid.New(),
890894
AutomaticUpdates: database.AutomaticUpdatesNever,
891895
OrganizationID: org.ID,
@@ -925,10 +929,12 @@ func TestFailJob(t *testing.T) {
925929
require.NoError(t, err)
926930

927931
publishedWorkspace := make(chan struct{})
928-
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
932+
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
929933
wspubsub.HandleWorkspaceEvent(
930-
slogtest.Make(t, nil),
931-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
934+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
935+
if err != nil {
936+
return
937+
}
932938
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
933939
close(publishedWorkspace)
934940
}
@@ -1321,11 +1327,13 @@ func TestCompleteJob(t *testing.T) {
13211327
require.NoError(t, err)
13221328

13231329
publishedWorkspace := make(chan struct{})
1324-
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
1330+
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspaceTable.OwnerID),
13251331
wspubsub.HandleWorkspaceEvent(
1326-
slogtest.Make(t, nil),
1327-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
1328-
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
1332+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
1333+
if err != nil {
1334+
return
1335+
}
1336+
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspaceTable.ID {
13291337
close(publishedWorkspace)
13301338
}
13311339
}))

coderd/workspaceagents.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,9 @@ func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request)
252252
return
253253
}
254254

255-
api.publishWorkspaceUpdate(ctx, workspace.Workspace.OwnerID, wspubsub.WorkspaceEvent{
255+
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
256256
Kind: wspubsub.WorkspaceEventKindAgentLogsOverflow,
257-
WorkspaceID: workspace.Workspace.ID,
257+
WorkspaceID: workspace.ID,
258258
AgentID: &workspaceAgent.ID,
259259
})
260260

@@ -284,9 +284,9 @@ func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request)
284284
return
285285
}
286286

287-
api.publishWorkspaceUpdate(ctx, workspace.Workspace.OwnerID, wspubsub.WorkspaceEvent{
287+
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
288288
Kind: wspubsub.WorkspaceEventKindAgentFirstLogs,
289-
WorkspaceID: workspace.Workspace.ID,
289+
WorkspaceID: workspace.ID,
290290
AgentID: &workspaceAgent.ID,
291291
})
292292
}
@@ -417,10 +417,12 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
417417
notifyCh <- struct{}{}
418418

419419
// Subscribe to workspace to detect new builds.
420-
closeSubscribeWorkspace, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
420+
closeSubscribeWorkspace, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
421421
wspubsub.HandleWorkspaceEvent(
422-
logger,
423-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
422+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
423+
if err != nil {
424+
return
425+
}
424426
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
425427
select {
426428
case workspaceNotifyCh <- struct{}{}:

coderd/workspaces.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,10 +1676,12 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
16761676
})
16771677
}
16781678

1679-
cancelWorkspaceSubscribe, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
1679+
cancelWorkspaceSubscribe, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
16801680
wspubsub.HandleWorkspaceEvent(
1681-
api.Logger,
1682-
func(ctx context.Context, payload wspubsub.WorkspaceEvent) {
1681+
func(ctx context.Context, payload wspubsub.WorkspaceEvent, err error) {
1682+
if err != nil {
1683+
return
1684+
}
16831685
if payload.WorkspaceID != workspace.ID {
16841686
return
16851687
}

coderd/wspubsub/wspubsub.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77

8-
"cdr.dev/slog"
9-
108
"github.com/google/uuid"
119
"golang.org/x/xerrors"
1210
)
@@ -17,18 +15,22 @@ func WorkspaceEventChannel(ownerID uuid.UUID) string {
1715
return fmt.Sprintf("workspace_owner:%s", ownerID)
1816
}
1917

20-
func HandleWorkspaceEvent(logger slog.Logger, cb func(ctx context.Context, payload WorkspaceEvent)) func(ctx context.Context, message []byte) {
21-
return func(ctx context.Context, message []byte) {
18+
func HandleWorkspaceEvent(cb func(ctx context.Context, payload WorkspaceEvent, err error)) func(ctx context.Context, message []byte, err error) {
19+
return func(ctx context.Context, message []byte, err error) {
20+
if err != nil {
21+
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("workspace event pubsub: %w", err))
22+
return
23+
}
2224
var payload WorkspaceEvent
2325
if err := json.Unmarshal(message, &payload); err != nil {
24-
logger.Warn(ctx, "failed to unmarshal workspace event", slog.Error(err))
26+
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("unmarshal workspace event"))
2527
return
2628
}
2729
if err := payload.Validate(); err != nil {
28-
logger.Warn(ctx, "invalid workspace event", slog.Error(err))
30+
cb(ctx, payload, xerrors.Errorf("validate workspace event"))
2931
return
3032
}
31-
cb(ctx, payload)
33+
cb(ctx, payload, err)
3234
}
3335
}
3436

0 commit comments

Comments
 (0)