Skip to content

chore: send workspace pubsub events by owner id #14964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
use subscribewitherr
  • Loading branch information
ethanndickson committed Nov 1, 2024
commit ea1ef09246932d437693c6c9495c3c929e7a554d
18 changes: 10 additions & 8 deletions coderd/agentapi/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/types/known/durationpb"

"cdr.dev/slog/sloggers/slogtest"

agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
Expand Down Expand Up @@ -157,10 +155,12 @@ func TestUpdateStates(t *testing.T) {

// Ensure that pubsub notifications are sent.
notifyDescription := make(chan struct{})
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
go func() {
notifyDescription <- struct{}{}
Expand Down Expand Up @@ -503,10 +503,12 @@ func TestUpdateStates(t *testing.T) {

// Ensure that pubsub notifications are sent.
notifyDescription := make(chan struct{})
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
go func() {
notifyDescription <- struct{}{}
Expand Down
36 changes: 22 additions & 14 deletions coderd/provisionerdserver/provisionerdserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,12 @@ func TestAcquireJob(t *testing.T) {

startPublished := make(chan struct{})
var closed bool
closeStartSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
closeStartSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
if !closed {
close(startPublished)
Expand Down Expand Up @@ -404,10 +406,12 @@ func TestAcquireJob(t *testing.T) {
})

stopPublished := make(chan struct{})
closeStopSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
closeStopSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
close(stopPublished)
}
Expand Down Expand Up @@ -885,7 +889,7 @@ func TestFailJob(t *testing.T) {
auditor: auditor,
})
org := dbgen.Organization(t, db, database.Organization{})
workspace := dbgen.Workspace(t, db, database.Workspace{
workspace := dbgen.Workspace(t, db, database.WorkspaceTable{
ID: uuid.New(),
AutomaticUpdates: database.AutomaticUpdatesNever,
OrganizationID: org.ID,
Expand Down Expand Up @@ -925,10 +929,12 @@ func TestFailJob(t *testing.T) {
require.NoError(t, err)

publishedWorkspace := make(chan struct{})
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
close(publishedWorkspace)
}
Expand Down Expand Up @@ -1321,11 +1327,13 @@ func TestCompleteJob(t *testing.T) {
require.NoError(t, err)

publishedWorkspace := make(chan struct{})
closeWorkspaceSubscribe, err := ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
closeWorkspaceSubscribe, err := ps.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspaceTable.OwnerID),
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspaceTable.ID {
close(publishedWorkspace)
}
}))
Expand Down
16 changes: 9 additions & 7 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request)
return
}

api.publishWorkspaceUpdate(ctx, workspace.Workspace.OwnerID, wspubsub.WorkspaceEvent{
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
Kind: wspubsub.WorkspaceEventKindAgentLogsOverflow,
WorkspaceID: workspace.Workspace.ID,
WorkspaceID: workspace.ID,
AgentID: &workspaceAgent.ID,
})

Expand Down Expand Up @@ -284,9 +284,9 @@ func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request)
return
}

api.publishWorkspaceUpdate(ctx, workspace.Workspace.OwnerID, wspubsub.WorkspaceEvent{
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
Kind: wspubsub.WorkspaceEventKindAgentFirstLogs,
WorkspaceID: workspace.Workspace.ID,
WorkspaceID: workspace.ID,
AgentID: &workspaceAgent.ID,
})
}
Expand Down Expand Up @@ -417,10 +417,12 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
notifyCh <- struct{}{}

// Subscribe to workspace to detect new builds.
closeSubscribeWorkspace, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
closeSubscribeWorkspace, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(
logger,
func(_ context.Context, e wspubsub.WorkspaceEvent) {
func(_ context.Context, e wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if e.Kind == wspubsub.WorkspaceEventKindStateChange && e.WorkspaceID == workspace.ID {
select {
case workspaceNotifyCh <- struct{}{}:
Expand Down
8 changes: 5 additions & 3 deletions coderd/workspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,10 +1676,12 @@ func (api *API) watchWorkspace(rw http.ResponseWriter, r *http.Request) {
})
}

cancelWorkspaceSubscribe, err := api.Pubsub.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
cancelWorkspaceSubscribe, err := api.Pubsub.SubscribeWithErr(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(
api.Logger,
func(ctx context.Context, payload wspubsub.WorkspaceEvent) {
func(ctx context.Context, payload wspubsub.WorkspaceEvent, err error) {
if err != nil {
return
}
if payload.WorkspaceID != workspace.ID {
return
}
Expand Down
16 changes: 9 additions & 7 deletions coderd/wspubsub/wspubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/json"
"fmt"

"cdr.dev/slog"

"github.com/google/uuid"
"golang.org/x/xerrors"
)
Expand All @@ -17,18 +15,22 @@ func WorkspaceEventChannel(ownerID uuid.UUID) string {
return fmt.Sprintf("workspace_owner:%s", ownerID)
}

func HandleWorkspaceEvent(logger slog.Logger, cb func(ctx context.Context, payload WorkspaceEvent)) func(ctx context.Context, message []byte) {
return func(ctx context.Context, message []byte) {
func HandleWorkspaceEvent(cb func(ctx context.Context, payload WorkspaceEvent, err error)) func(ctx context.Context, message []byte, err error) {
return func(ctx context.Context, message []byte, err error) {
if err != nil {
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("workspace event pubsub: %w", err))
return
}
var payload WorkspaceEvent
if err := json.Unmarshal(message, &payload); err != nil {
logger.Warn(ctx, "failed to unmarshal workspace event", slog.Error(err))
cb(ctx, WorkspaceEvent{}, xerrors.Errorf("unmarshal workspace event"))
return
}
if err := payload.Validate(); err != nil {
logger.Warn(ctx, "invalid workspace event", slog.Error(err))
cb(ctx, payload, xerrors.Errorf("validate workspace event"))
return
}
cb(ctx, payload)
cb(ctx, payload, err)
}
}

Expand Down
Loading