Skip to content

feat: use Agent v2 API for Service Banner #11806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/coder/coder/v2/agent/agentproc"
"github.com/coder/coder/v2/agent/agentscripts"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/reconnectingpty"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/gitauth"
Expand Down Expand Up @@ -95,7 +96,6 @@ type Client interface {
PostStartup(ctx context.Context, req agentsdk.PostStartupRequest) error
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error
PatchLogs(ctx context.Context, req agentsdk.PatchLogs) error
GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerConfig, error)
}

type Agent interface {
Expand Down Expand Up @@ -269,7 +269,6 @@ func (a *agent) init(ctx context.Context) {
func (a *agent) runLoop(ctx context.Context) {
go a.reportLifecycleLoop(ctx)
go a.reportMetadataLoop(ctx)
go a.fetchServiceBannerLoop(ctx)
go a.manageProcessPriorityLoop(ctx)

for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
Expand Down Expand Up @@ -662,22 +661,23 @@ func (a *agent) setLifecycle(ctx context.Context, state codersdk.WorkspaceAgentL
// fetchServiceBannerLoop fetches the service banner on an interval. It will
// not be fetched immediately; the expectation is that it is primed elsewhere
// (and must be done before the session actually starts).
func (a *agent) fetchServiceBannerLoop(ctx context.Context) {
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient) error {
ticker := time.NewTicker(a.serviceBannerRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case <-ticker.C:
serviceBanner, err := a.client.GetServiceBanner(ctx)
sbp, err := aAPI.GetServiceBanner(ctx, &proto.GetServiceBannerRequest{})
if err != nil {
if ctx.Err() != nil {
return
return ctx.Err()
}
a.logger.Error(ctx, "failed to update service banner", slog.Error(err))
continue
return err
}
serviceBanner := proto.SDKServiceBannerFromProto(sbp)
a.serviceBanner.Store(&serviceBanner)
}
}
Expand All @@ -693,10 +693,24 @@ func (a *agent) run(ctx context.Context) error {
}
a.sessionToken.Store(&sessionToken)

serviceBanner, err := a.client.GetServiceBanner(ctx)
// Listen returns the dRPC connection we use for the Agent v2+ API
conn, err := a.client.Listen(ctx)
if err != nil {
return err
}
defer func() {
cErr := conn.Close()
if cErr != nil {
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
}
}()

aAPI := proto.NewDRPCAgentClient(conn)
sbp, err := aAPI.GetServiceBanner(ctx, &proto.GetServiceBannerRequest{})
if err != nil {
return xerrors.Errorf("fetch service banner: %w", err)
}
serviceBanner := proto.SDKServiceBannerFromProto(sbp)
a.serviceBanner.Store(&serviceBanner)

manifest, err := a.client.Manifest(ctx)
Expand Down Expand Up @@ -821,18 +835,6 @@ func (a *agent) run(ctx context.Context) error {
network.SetBlockEndpoints(manifest.DisableDirectConnections)
}

// Listen returns the dRPC connection we use for both Coordinator and DERPMap updates
conn, err := a.client.Listen(ctx)
if err != nil {
return err
}
defer func() {
cErr := conn.Close()
if cErr != nil {
a.logger.Debug(ctx, "error closing drpc connection", slog.Error(err))
}
}()

eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
a.logger.Debug(egCtx, "running tailnet connection coordinator")
Expand All @@ -852,6 +854,15 @@ func (a *agent) run(ctx context.Context) error {
return nil
})

eg.Go(func() error {
a.logger.Debug(egCtx, "running fetch server banner loop")
err := a.fetchServiceBannerLoop(egCtx, aAPI)
if err != nil {
return xerrors.Errorf("fetch server banner loop: %w", err)
}
return nil
})

return eg.Wait()
}

Expand Down
6 changes: 5 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/valyala/fasthttp/fasthttputil"
"go.uber.org/goleak"
"go.uber.org/mock/gomock"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -2026,7 +2027,10 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
afero.Fs,
agent.Agent,
) {
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
logger := slogtest.Make(t, &slogtest.Options{
// we get this error when closing the Agent API
IgnoredErrorIs: append(slogtest.DefaultIgnoredErrorIs, fasthttputil.ErrInmemoryListenerClosed),
}).Leveled(slog.LevelDebug)
if metadata.DERPMap == nil {
metadata.DERPMap, _ = tailnettest.RunDERPAndSTUN(t)
}
Expand Down
112 changes: 87 additions & 25 deletions agent/agenttest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"tailscale.com/tailcfg"

"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
drpcsdk "github.com/coder/coder/v2/codersdk/drpc"
Expand Down Expand Up @@ -48,6 +49,9 @@ func NewClient(t testing.TB,
}
err := proto.DRPCRegisterTailnet(mux, drpcService)
require.NoError(t, err)
fakeAAPI := NewFakeAgentAPI(t, logger)
err = agentproto.DRPCRegisterAgent(mux, fakeAAPI)
require.NoError(t, err)
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
Log: func(err error) {
if xerrors.Is(err, io.EOF) {
Expand All @@ -64,22 +68,23 @@ func NewClient(t testing.TB,
statsChan: statsChan,
coordinator: coordinator,
server: server,
fakeAgentAPI: fakeAAPI,
derpMapUpdates: derpMapUpdates,
}
}

type Client struct {
t testing.TB
logger slog.Logger
agentID uuid.UUID
manifest agentsdk.Manifest
metadata map[string]agentsdk.Metadata
statsChan chan *agentsdk.Stats
coordinator tailnet.Coordinator
server *drpcserver.Server
LastWorkspaceAgent func()
PatchWorkspaceLogs func() error
GetServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
t testing.TB
logger slog.Logger
agentID uuid.UUID
manifest agentsdk.Manifest
metadata map[string]agentsdk.Metadata
statsChan chan *agentsdk.Stats
coordinator tailnet.Coordinator
server *drpcserver.Server
fakeAgentAPI *FakeAgentAPI
LastWorkspaceAgent func()
PatchWorkspaceLogs func() error

mu sync.Mutex // Protects following.
lifecycleStates []codersdk.WorkspaceAgentLifecycle
Expand Down Expand Up @@ -221,20 +226,7 @@ func (c *Client) PatchLogs(ctx context.Context, logs agentsdk.PatchLogs) error {
}

func (c *Client) SetServiceBannerFunc(f func() (codersdk.ServiceBannerConfig, error)) {
c.mu.Lock()
defer c.mu.Unlock()

c.GetServiceBannerFunc = f
}

func (c *Client) GetServiceBanner(ctx context.Context) (codersdk.ServiceBannerConfig, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger.Debug(ctx, "get service banner")
if c.GetServiceBannerFunc != nil {
return c.GetServiceBannerFunc()
}
return codersdk.ServiceBannerConfig{}, nil
c.fakeAgentAPI.SetServiceBannerFunc(f)
}

func (c *Client) PushDERPMapUpdate(update *tailcfg.DERPMap) error {
Expand All @@ -254,3 +246,73 @@ type closeFunc func() error
func (c closeFunc) Close() error {
return c()
}

type FakeAgentAPI struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you generate this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could generate it as a Mock, but that won't fit as easily into our existing tests.

sync.Mutex
t testing.TB
logger slog.Logger

getServiceBannerFunc func() (codersdk.ServiceBannerConfig, error)
}

func (*FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
// TODO implement me
panic("implement me")
}

func (f *FakeAgentAPI) SetServiceBannerFunc(fn func() (codersdk.ServiceBannerConfig, error)) {
f.Lock()
defer f.Unlock()
f.getServiceBannerFunc = fn
f.logger.Info(context.Background(), "updated ServiceBannerFunc")
}

func (f *FakeAgentAPI) GetServiceBanner(context.Context, *agentproto.GetServiceBannerRequest) (*agentproto.ServiceBanner, error) {
f.Lock()
defer f.Unlock()
if f.getServiceBannerFunc == nil {
return &agentproto.ServiceBanner{}, nil
}
sb, err := f.getServiceBannerFunc()
if err != nil {
return nil, err
}
return agentproto.ServiceBannerFromSDK(sb), nil
}

func (*FakeAgentAPI) UpdateStats(context.Context, *agentproto.UpdateStatsRequest) (*agentproto.UpdateStatsResponse, error) {
// TODO implement me
panic("implement me")
}

func (*FakeAgentAPI) UpdateLifecycle(context.Context, *agentproto.UpdateLifecycleRequest) (*agentproto.Lifecycle, error) {
// TODO implement me
panic("implement me")
}

func (*FakeAgentAPI) BatchUpdateAppHealths(context.Context, *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
// TODO implement me
panic("implement me")
}

func (*FakeAgentAPI) UpdateStartup(context.Context, *agentproto.UpdateStartupRequest) (*agentproto.Startup, error) {
// TODO implement me
panic("implement me")
}

func (*FakeAgentAPI) BatchUpdateMetadata(context.Context, *agentproto.BatchUpdateMetadataRequest) (*agentproto.BatchUpdateMetadataResponse, error) {
// TODO implement me
panic("implement me")
}

func (*FakeAgentAPI) BatchCreateLogs(context.Context, *agentproto.BatchCreateLogsRequest) (*agentproto.BatchCreateLogsResponse, error) {
// TODO implement me
panic("implement me")
}

func NewFakeAgentAPI(t testing.TB, logger slog.Logger) *FakeAgentAPI {
return &FakeAgentAPI{
t: t,
logger: logger.Named("FakeAgentAPI"),
}
}
40 changes: 36 additions & 4 deletions cli/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func TestWorkspaceAgent(t *testing.T) {

ctx := inv.Context()
clitest.Start(t, inv)
coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID)
coderdtest.NewWorkspaceAgentWaiter(t, client, r.Workspace.ID).
MatchResources(matchAgentWithVersion).Wait()
workspace, err := client.Workspace(ctx, r.Workspace.ID)
require.NoError(t, err)
resources := workspace.LatestBuild.Resources
Expand Down Expand Up @@ -120,7 +121,9 @@ func TestWorkspaceAgent(t *testing.T) {

clitest.Start(t, inv)
ctx := inv.Context()
coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID)
coderdtest.NewWorkspaceAgentWaiter(t, client, r.Workspace.ID).
MatchResources(matchAgentWithVersion).
Wait()
workspace, err := client.Workspace(ctx, r.Workspace.ID)
require.NoError(t, err)
resources := workspace.LatestBuild.Resources
Expand Down Expand Up @@ -161,7 +164,9 @@ func TestWorkspaceAgent(t *testing.T) {
)

ctx := inv.Context()
coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID)
coderdtest.NewWorkspaceAgentWaiter(t, client, r.Workspace.ID).
MatchResources(matchAgentWithVersion).
Wait()
workspace, err := client.Workspace(ctx, r.Workspace.ID)
require.NoError(t, err)
resources := workspace.LatestBuild.Resources
Expand Down Expand Up @@ -212,7 +217,8 @@ func TestWorkspaceAgent(t *testing.T) {

clitest.Start(t, inv)

resources := coderdtest.AwaitWorkspaceAgents(t, client, r.Workspace.ID)
resources := coderdtest.NewWorkspaceAgentWaiter(t, client, r.Workspace.ID).
MatchResources(matchAgentWithSubsystems).Wait()
require.Len(t, resources, 1)
require.Len(t, resources[0].Agents, 1)
require.Len(t, resources[0].Agents[0].Subsystems, 2)
Expand All @@ -221,3 +227,29 @@ func TestWorkspaceAgent(t *testing.T) {
require.Equal(t, codersdk.AgentSubsystemExectrace, resources[0].Agents[0].Subsystems[1])
})
}

func matchAgentWithVersion(rs []codersdk.WorkspaceResource) bool {
if len(rs) < 1 {
return false
}
if len(rs[0].Agents) < 1 {
return false
}
if rs[0].Agents[0].Version == "" {
return false
}
return true
}

func matchAgentWithSubsystems(rs []codersdk.WorkspaceResource) bool {
if len(rs) < 1 {
return false
}
if len(rs[0].Agents) < 1 {
return false
}
if len(rs[0].Agents[0].Subsystems) < 1 {
return false
}
return true
}
2 changes: 1 addition & 1 deletion coderd/agentapi/servicebanner_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/appearance"
"github.com/coder/coder/v2/codersdk"
"github.com/stretchr/testify/require"
)

func TestGetServiceBanner(t *testing.T) {
Expand Down
Loading