Skip to content

Commit 31a3d36

Browse files
committed
use subscribewitherr
1 parent d8cd584 commit 31a3d36

File tree

5 files changed

+49
-33
lines changed

5 files changed

+49
-33
lines changed

coderd/agentapi/stats_test.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
"go.uber.org/mock/gomock"
1414
"google.golang.org/protobuf/types/known/durationpb"
1515

16-
"cdr.dev/slog/sloggers/slogtest"
17-
1816
agentproto "github.com/coder/coder/v2/agent/proto"
1917
"github.com/coder/coder/v2/coderd/agentapi"
2018
"github.com/coder/coder/v2/coderd/database"
@@ -152,10 +150,12 @@ func TestUpdateStates(t *testing.T) {
152150

153151
// Ensure that pubsub notifications are sent.
154152
notifyDescription := make(chan struct{})
155-
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
153+
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
156154
wspubsub.HandleWorkspaceEvent(
157-
slogtest.Make(t, nil),
158-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
155+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
156+
if err != nil {
157+
return
158+
}
159159
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
160160
go func() {
161161
notifyDescription <- struct{}{}
@@ -490,10 +490,12 @@ func TestUpdateStates(t *testing.T) {
490490

491491
// Ensure that pubsub notifications are sent.
492492
notifyDescription := make(chan struct{})
493-
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
493+
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
494494
wspubsub.HandleWorkspaceEvent(
495-
slogtest.Make(t, nil),
496-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
495+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
496+
if err != nil {
497+
return
498+
}
497499
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
498500
go func() {
499501
notifyDescription <- struct{}{}

coderd/provisionerdserver/provisionerdserver_test.go

+20-12
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,12 @@ func TestAcquireJob(t *testing.T) {
296296

297297
startPublished := make(chan struct{})
298298
var closed bool
299-
closeStartSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
299+
closeStartSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
300300
wspubsub.HandleWorkspaceEvent(
301-
slogtest.Make(t, nil),
302-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
301+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
302+
if err != nil {
303+
return
304+
}
303305
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
304306
if !closed {
305307
close(startPublished)
@@ -404,10 +406,12 @@ func TestAcquireJob(t *testing.T) {
404406
})
405407

406408
stopPublished := make(chan struct{})
407-
closeStopSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
409+
closeStopSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
408410
wspubsub.HandleWorkspaceEvent(
409-
slogtest.Make(t, nil),
410-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
411+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
412+
if err != nil {
413+
return
414+
}
411415
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
412416
close(stopPublished)
413417
}
@@ -925,10 +929,12 @@ func TestFailJob(t *testing.T) {
925929
require.NoError(t, err)
926930

927931
publishedWorkspace := make(chan struct{})
928-
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
932+
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
929933
wspubsub.HandleWorkspaceEvent(
930-
slogtest.Make(t, nil),
931-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
934+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
935+
if err != nil {
936+
return
937+
}
932938
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
933939
close(publishedWorkspace)
934940
}
@@ -1321,10 +1327,12 @@ func TestCompleteJob(t *testing.T) {
13211327
require.NoError(t, err)
13221328

13231329
publishedWorkspace := make(chan struct{})
1324-
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
1330+
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
13251331
wspubsub.HandleWorkspaceEvent(
1326-
slogtest.Make(t, nil),
1327-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
1332+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
1333+
if err != nil {
1334+
return
1335+
}
13281336
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
13291337
close(publishedWorkspace)
13301338
}

coderd/workspaceagents.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -417,10 +417,12 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
417417
notifyCh <- struct{}{}
418418

419419
// Subscribe to workspace to detect new builds.
420-
closeSubscribeWorkspace, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
420+
closeSubscribeWorkspace, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
421421
wspubsub.HandleWorkspaceEvent(
422-
logger,
423-
func(_ context.Context, e wspubsub.WorkspaceEvent) {
422+
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
423+
if err != nil {
424+
return
425+
}
424426
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
425427
select {
426428
case workspaceNotifyCh <- struct{}{}:

coderd/workspaces.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -1702,10 +1702,12 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
17021702
})
17031703
}
17041704

1705-
cancelWorkspaceSubscribe, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
1705+
cancelWorkspaceSubscribe, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
17061706
wspubsub.HandleWorkspaceEvent(
1707-
api.Logger,
1708-
func(ctx context.Context, payload wspubsub.WorkspaceEvent) {
1707+
func(ctx context.Context, payload wspubsub.WorkspaceEvent, err error) {
1708+
if err != nil {
1709+
return
1710+
}
17091711
if payload.WorkspaceID != workspace.ID {
17101712
return
17111713
}

coderd/wspubsub/wspubsub.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77

8-
"cdr.dev/slog"
9-
108
"github.com/google/uuid"
119
"golang.org/x/xerrors"
1210
)
@@ -17,18 +15,22 @@ func WorkspaceEventChannel(ownerID uuid.UUID) string {
1715
return fmt.Sprintf("workspace_owner:%s", ownerID)
1816
}
1917

20-
func HandleWorkspaceEvent(logger slog.Logger, cb func(ctx context.Context, payload WorkspaceEvent)) func(ctx context.Context, message []byte) {
21-
return func(ctx context.Context, message []byte) {
18+
func HandleWorkspaceEvent(cb func(ctx context.Context, payload WorkspaceEvent, err error)) func(ctx context.Context, message []byte, err error) {
19+
return func(ctx context.Context, message []byte, err error) {
20+
if err != nil {
21+
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("workspace event pubsub: %w", err))
22+
return
23+
}
2224
var payload WorkspaceEvent
2325
if err := json.Unmarshal(message, &payload); err != nil {
24-
logger.Warn(ctx, "failed to unmarshal workspace event", slog.Error(err))
26+
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("unmarshal workspace event"))
2527
return
2628
}
2729
if err := payload.Validate(); err != nil {
28-
logger.Warn(ctx, "invalid workspace event", slog.Error(err))
30+
cb(ctx, payload, xerrors.Errorf("validate workspace event"))
2931
return
3032
}
31-
cb(ctx, payload)
33+
cb(ctx, payload, err)
3234
}
3335
}
3436

0 commit comments

Comments
 (0)