Skip to content

chore: send workspace pubsub events by owner id #14964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feedback
  • Loading branch information
ethanndickson committed Nov 1, 2024
commit 5748a10c87b4cb69b7ea1800ffe7ab29756f4f3a
47 changes: 6 additions & 41 deletions coderd/agentapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ type API struct {
*ScriptsAPI
*tailnet.DRPCService

mu sync.Mutex
cachedWorkspaceID uuid.UUID
mu sync.Mutex
}

var _ agentproto.DRPCAgentServer = &API{}
Expand Down Expand Up @@ -83,9 +82,8 @@ type Options struct {

func New(opts Options) *API {
api := &API{
opts: opts,
mu: sync.Mutex{},
cachedWorkspaceID: opts.WorkspaceID,
opts: opts,
mu: sync.Mutex{},
}

api.ManifestAPI = &ManifestAPI{
Expand Down Expand Up @@ -199,44 +197,11 @@ func (a *API) agent(ctx context.Context) (database.WorkspaceAgent, error) {
return agent, nil
}

func (a *API) workspaceID(ctx context.Context, agent *database.WorkspaceAgent) (uuid.UUID, error) {
a.mu.Lock()
if a.cachedWorkspaceID != uuid.Nil {
id := a.cachedWorkspaceID
a.mu.Unlock()
return id, nil
}

if agent == nil {
agnt, err := a.agent(ctx)
if err != nil {
return uuid.Nil, err
}
agent = &agnt
}

getWorkspaceAgentByIDRow, err := a.opts.Database.GetWorkspaceByAgentID(ctx, agent.ID)
if err != nil {
return uuid.Nil, xerrors.Errorf("get workspace by agent id %q: %w", agent.ID, err)
}

a.mu.Lock()
a.cachedWorkspaceID = getWorkspaceAgentByIDRow.ID
a.mu.Unlock()
return getWorkspaceAgentByIDRow.ID, nil
}

func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent) error {
workspaceID, err := a.workspaceID(ctx, agent)
if err != nil {
return err
}

func (a *API) publishWorkspaceUpdate(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
a.opts.PublishWorkspaceUpdateFn(ctx, a.opts.OwnerID, wspubsub.WorkspaceEvent{
Kind: wspubsub.WorkspaceEventKindAgentUpdate,
WorkspaceID: workspaceID,
Kind: kind,
WorkspaceID: a.opts.WorkspaceID,
AgentID: &agent.ID,
AgentName: &agent.Name,
})
return nil
}
5 changes: 3 additions & 2 deletions coderd/agentapi/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/wspubsub"
)

type AppsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Log slog.Logger
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent, wspubsub.WorkspaceEventKind) error
}

func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
Expand Down Expand Up @@ -96,7 +97,7 @@ func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.Bat
}

if a.PublishWorkspaceUpdateFn != nil && len(newApps) > 0 {
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAppHealthUpdate)
if err != nil {
return nil, xerrors.Errorf("publish workspace update: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions coderd/agentapi/apps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmock"
"github.com/coder/coder/v2/coderd/wspubsub"
)

func TestBatchUpdateAppHealths(t *testing.T) {
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestBatchUpdateAppHealths(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishCalled = true
return nil
},
Expand Down Expand Up @@ -100,7 +101,7 @@ func TestBatchUpdateAppHealths(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishCalled = true
return nil
},
Expand Down Expand Up @@ -139,7 +140,7 @@ func TestBatchUpdateAppHealths(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishCalled = true
return nil
},
Expand Down
5 changes: 3 additions & 2 deletions coderd/agentapi/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/wspubsub"
)

type contextKeyAPIVersion struct{}
Expand All @@ -28,7 +29,7 @@ type LifecycleAPI struct {
WorkspaceID uuid.UUID
Database database.Store
Log slog.Logger
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent, wspubsub.WorkspaceEventKind) error

TimeNowFn func() time.Time // defaults to dbtime.Now()
}
Expand Down Expand Up @@ -118,7 +119,7 @@ func (a *LifecycleAPI) UpdateLifecycle(ctx context.Context, req *agentproto.Upda
}

if a.PublishWorkspaceUpdateFn != nil {
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAgentLifecycleUpdate)
if err != nil {
return nil, xerrors.Errorf("publish workspace update: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions coderd/agentapi/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmock"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/wspubsub"
)

func TestUpdateLifecycle(t *testing.T) {
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestUpdateLifecycle(t *testing.T) {
WorkspaceID: workspaceID,
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishCalled = true
return nil
},
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestUpdateLifecycle(t *testing.T) {
WorkspaceID: workspaceID,
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishCalled = true
return nil
},
Expand Down Expand Up @@ -234,7 +235,7 @@ func TestUpdateLifecycle(t *testing.T) {
WorkspaceID: workspaceID,
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
atomic.AddInt64(&publishCalled, 1)
return nil
},
Expand Down Expand Up @@ -307,7 +308,7 @@ func TestUpdateLifecycle(t *testing.T) {
WorkspaceID: workspaceID,
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishCalled = true
return nil
},
Expand Down
7 changes: 4 additions & 3 deletions coderd/agentapi/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/codersdk/agentsdk"
)

type LogsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Log slog.Logger
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent) error
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent, wspubsub.WorkspaceEventKind) error
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)

TimeNowFn func() time.Time // defaults to dbtime.Now()
Expand Down Expand Up @@ -123,7 +124,7 @@ func (a *LogsAPI) BatchCreateLogs(ctx context.Context, req *agentproto.BatchCrea
}

if a.PublishWorkspaceUpdateFn != nil {
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAgentLogsOverflow)
if err != nil {
return nil, xerrors.Errorf("publish workspace update: %w", err)
}
Expand All @@ -143,7 +144,7 @@ func (a *LogsAPI) BatchCreateLogs(ctx context.Context, req *agentproto.BatchCrea
if workspaceAgent.LogsLength == 0 && a.PublishWorkspaceUpdateFn != nil {
// If these are the first logs being appended, we publish a UI update
// to notify the UI that logs are now available.
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent)
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAgentLogsUpdate)
if err != nil {
return nil, xerrors.Errorf("publish workspace update: %w", err)
}
Expand Down
13 changes: 7 additions & 6 deletions coderd/agentapi/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmock"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/codersdk/agentsdk"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func TestBatchCreateLogs(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishWorkspaceUpdateCalled = true
return nil
},
Expand Down Expand Up @@ -154,7 +155,7 @@ func TestBatchCreateLogs(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishWorkspaceUpdateCalled = true
return nil
},
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestBatchCreateLogs(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishWorkspaceUpdateCalled = true
return nil
},
Expand Down Expand Up @@ -295,7 +296,7 @@ func TestBatchCreateLogs(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishWorkspaceUpdateCalled = true
return nil
},
Expand Down Expand Up @@ -339,7 +340,7 @@ func TestBatchCreateLogs(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishWorkspaceUpdateCalled = true
return nil
},
Expand Down Expand Up @@ -386,7 +387,7 @@ func TestBatchCreateLogs(t *testing.T) {
},
Database: dbM,
Log: slogtest.Make(t, nil),
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent) error {
PublishWorkspaceUpdateFn: func(ctx context.Context, wa *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
publishWorkspaceUpdateCalled = true
return nil
},
Expand Down
40 changes: 22 additions & 18 deletions coderd/agentapi/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/types/known/durationpb"

"cdr.dev/slog/sloggers/slogtest"

agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
Expand Down Expand Up @@ -156,13 +158,15 @@ func TestUpdateStates(t *testing.T) {
// Ensure that pubsub notifications are sent.
notifyDescription := make(chan struct{})
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(func(_ context.Context, e wspubsub.WorkspaceEvent) {
if e.Kind == wspubsub.WorkspaceEventKindUpdatedStats && e.WorkspaceID == workspace.ID {
go func() {
notifyDescription <- struct{}{}
}()
}
}))
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
go func() {
notifyDescription <- struct{}{}
}()
}
}))

resp, err := api.UpdateStats(context.Background(), req)
require.NoError(t, err)
Expand All @@ -187,8 +191,7 @@ func TestUpdateStates(t *testing.T) {
select {
case <-ctx.Done():
t.Error("timed out while waiting for pubsub notification")
case description := <-notifyDescription:
require.Equal(t, description, struct{}{})
case <-notifyDescription:
}
require.True(t, updateAgentMetricsFnCalled)
})
Expand Down Expand Up @@ -501,13 +504,15 @@ func TestUpdateStates(t *testing.T) {
// Ensure that pubsub notifications are sent.
notifyDescription := make(chan struct{})
ps.Subscribe(wspubsub.WorkspaceEventChannel(workspace.OwnerID),
wspubsub.HandleWorkspaceEvent(func(_ context.Context, e wspubsub.WorkspaceEvent) {
if e.Kind == wspubsub.WorkspaceEventKindUpdatedStats && e.WorkspaceID == workspace.ID {
go func() {
notifyDescription <- struct{}{}
}()
}
}))
wspubsub.HandleWorkspaceEvent(
slogtest.Make(t, nil),
func(_ context.Context, e wspubsub.WorkspaceEvent) {
if e.Kind == wspubsub.WorkspaceEventKindStatsUpdate && e.WorkspaceID == workspace.ID {
go func() {
notifyDescription <- struct{}{}
}()
}
}))

resp, err := api.UpdateStats(context.Background(), req)
require.NoError(t, err)
Expand All @@ -530,8 +535,7 @@ func TestUpdateStates(t *testing.T) {
select {
case <-ctx.Done():
t.Error("timed out while waiting for pubsub notification")
case description := <-notifyDescription:
require.Equal(t, description, struct{}{})
case <-notifyDescription:
}
require.True(t, updateAgentMetricsFnCalled)
})
Expand Down
10 changes: 3 additions & 7 deletions coderd/database/dbfake/dbfake.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/coder/coder/v2/coderd/provisionerdserver"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/provisionersdk"
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
Expand Down Expand Up @@ -227,14 +226,11 @@ func (b WorkspaceBuildBuilder) Do() WorkspaceResponse {

if b.ps != nil {
msg, err := json.Marshal(wspubsub.WorkspaceEvent{
Kind: wspubsub.WorkspaceEventKindStateChange,
WorkspaceID: resp.Workspace.ID,
WorkspaceName: ptr.Ref(resp.Workspace.Name),
Transition: ptr.Ref(resp.Build.Transition),
JobStatus: ptr.Ref(job.JobStatus),
Kind: wspubsub.WorkspaceEventKindStateChange,
WorkspaceID: resp.Workspace.ID,
})
require.NoError(b.t, err)
err = b.ps.Publish(wspubsub.WorkspaceEventChannel(resp.Build.InitiatorID), msg)
err = b.ps.Publish(wspubsub.WorkspaceEventChannel(resp.Workspace.OwnerID), msg)
require.NoError(b.t, err)
}

Expand Down
Loading