Skip to content

fix: Refactor agent to consume API client #4715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: Refactor agent to consume API client
This simplifies a lot of code by creating an interface for
the codersdk client into the agent. It also moves agent
authentication code so instance identity will work between
restarts.

Fixes #3485 and #4082.
  • Loading branch information
kylecarbs committed Oct 24, 2022
commit 44eee7158357f1c09095dd6ecebf3b415c33f34c
285 changes: 136 additions & 149 deletions agent/agent.go

Large diffs are not rendered by default.

122 changes: 74 additions & 48 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,57 +572,15 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
agentID := uuid.New()
statsCh := make(chan *codersdk.AgentStats)
closer := agent.New(agent.Options{
FetchMetadata: func(ctx context.Context) (codersdk.WorkspaceAgentMetadata, error) {
return metadata, nil
},
CoordinatorDialer: func(ctx context.Context) (net.Conn, error) {
clientConn, serverConn := net.Pipe()
closed := make(chan struct{})
t.Cleanup(func() {
_ = serverConn.Close()
_ = clientConn.Close()
<-closed
})
go func() {
_ = coordinator.ServeAgent(serverConn, agentID)
close(closed)
}()
return clientConn, nil
Client: &client{
t: t,
agentID: agentID,
metadata: metadata,
statsChan: statsCh,
coordinator: coordinator,
},
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
ReconnectingPTYTimeout: ptyTimeout,
StatsReporter: func(ctx context.Context, log slog.Logger, statsFn func() *codersdk.AgentStats) (io.Closer, error) {
doneCh := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)

go func() {
defer close(doneCh)

t := time.NewTicker(time.Millisecond * 100)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
select {
case statsCh <- statsFn():
case <-ctx.Done():
return
default:
// We don't want to send old stats.
continue
}
}
}()
return closeFunc(func() error {
cancel()
<-doneCh
close(statsCh)
return nil
}), nil
},
})
t.Cleanup(func() {
_ = closer.Close()
Expand Down Expand Up @@ -679,3 +637,71 @@ func assertWritePayload(t *testing.T, w io.Writer, payload []byte) {
assert.NoError(t, err, "write payload")
assert.Equal(t, len(payload), n, "payload length does not match")
}

type client struct {
t *testing.T
agentID uuid.UUID
metadata codersdk.WorkspaceAgentMetadata
statsChan chan *codersdk.AgentStats
coordinator tailnet.Coordinator
}

func (c *client) WorkspaceAgentMetadata(_ context.Context) (codersdk.WorkspaceAgentMetadata, error) {
return c.metadata, nil
}

func (c *client) ListenWorkspaceAgent(_ context.Context) (net.Conn, error) {
clientConn, serverConn := net.Pipe()
closed := make(chan struct{})
c.t.Cleanup(func() {
_ = serverConn.Close()
_ = clientConn.Close()
<-closed
})
go func() {
_ = c.coordinator.ServeAgent(serverConn, c.agentID)
close(closed)
}()
return clientConn, nil
}

func (c *client) AgentReportStats(ctx context.Context, _ slog.Logger, stats func() *codersdk.AgentStats) (io.Closer, error) {
doneCh := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)

go func() {
defer close(doneCh)

t := time.NewTicker(time.Millisecond * 100)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
select {
case c.statsChan <- stats():
case <-ctx.Done():
return
default:
// We don't want to send old stats.
continue
}
}
}()
return closeFunc(func() error {
cancel()
<-doneCh
close(c.statsChan)
return nil
}), nil
}

func (*client) PostWorkspaceAgentAppHealth(_ context.Context, _ codersdk.PostWorkspaceAppHealthsRequest) error {
return nil
}

func (*client) PostWorkspaceAgentVersion(_ context.Context, _ string) error {
return nil
}
10 changes: 1 addition & 9 deletions agent/apphealth.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,8 @@ type PostWorkspaceAgentAppHealth func(context.Context, codersdk.PostWorkspaceApp
type WorkspaceAppHealthReporter func(ctx context.Context)

// NewWorkspaceAppHealthReporter creates a WorkspaceAppHealthReporter that reports app health to coderd.
func NewWorkspaceAppHealthReporter(logger slog.Logger, workspaceAgentApps WorkspaceAgentApps, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.WorkspaceApp, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
runHealthcheckLoop := func(ctx context.Context) error {
apps, err := workspaceAgentApps(ctx)
if err != nil {
if xerrors.Is(err, context.Canceled) {
return nil
}
return xerrors.Errorf("getting workspace apps: %w", err)
}

// no need to run this loop if no apps for this workspace.
if len(apps) == 0 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion agent/apphealth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func setupAppReporter(ctx context.Context, t *testing.T, apps []codersdk.Workspa
return nil
}

go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), workspaceAgentApps, postWorkspaceAgentAppHealth)(ctx)
go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), apps, postWorkspaceAgentAppHealth)(ctx)

return workspaceAgentApps, func() {
for _, closeFn := range closers {
Expand Down
10 changes: 0 additions & 10 deletions agent/stats.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package agent

import (
"context"
"io"
"net"
"sync/atomic"

"cdr.dev/slog"
"github.com/coder/coder/codersdk"
)

Expand Down Expand Up @@ -59,10 +56,3 @@ func (s *Stats) wrapConn(conn net.Conn) net.Conn {

return cs
}

// StatsReporter periodically accept and records agent stats.
type StatsReporter func(
ctx context.Context,
log slog.Logger,
stats func() *codersdk.AgentStats,
) (io.Closer, error)
58 changes: 13 additions & 45 deletions cli/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"path/filepath"
"runtime"
"time"

"cloud.google.com/go/compute/metadata"
"github.com/spf13/cobra"
Expand All @@ -23,7 +22,6 @@ import (
"github.com/coder/coder/buildinfo"
"github.com/coder/coder/cli/cliflag"
"github.com/coder/coder/codersdk"
"github.com/coder/retry"
)

func workspaceAgent() *cobra.Command {
Expand Down Expand Up @@ -143,43 +141,6 @@ func workspaceAgent() *cobra.Command {
}
}

if exchangeToken != nil {
logger.Info(cmd.Context(), "exchanging identity token")
// Agent's can start before resources are returned from the provisioner
// daemon. If there are many resources being provisioned, this time
// could be significant. This is arbitrarily set at an hour to prevent
// tons of idle agents from pinging coderd.
ctx, cancelFunc := context.WithTimeout(cmd.Context(), time.Hour)
defer cancelFunc()
for retry.New(100*time.Millisecond, 5*time.Second).Wait(ctx) {
var response codersdk.WorkspaceAgentAuthenticateResponse

response, err = exchangeToken(ctx)
if err != nil {
logger.Warn(ctx, "authenticate workspace", slog.F("method", auth), slog.Error(err))
continue
}
client.SessionToken = response.SessionToken
logger.Info(ctx, "authenticated", slog.F("method", auth))
break
}
if err != nil {
return xerrors.Errorf("agent failed to authenticate in time: %w", err)
}
}

retryCtx, cancelRetry := context.WithTimeout(cmd.Context(), time.Hour)
defer cancelRetry()
for retrier := retry.New(100*time.Millisecond, 5*time.Second); retrier.Wait(retryCtx); {
err := client.PostWorkspaceAgentVersion(retryCtx, version)
if err != nil {
logger.Warn(retryCtx, "post agent version: %w", slog.Error(err), slog.F("version", version))
continue
}
logger.Info(retryCtx, "updated agent version", slog.F("version", version))
break
}

executablePath, err := os.Executable()
if err != nil {
return xerrors.Errorf("getting os executable: %w", err)
Expand All @@ -190,17 +151,24 @@ func workspaceAgent() *cobra.Command {
}

closer := agent.New(agent.Options{
FetchMetadata: client.WorkspaceAgentMetadata,
Logger: logger,
Client: client,
Logger: logger,
ExchangeToken: func(ctx context.Context) error {
if exchangeToken == nil {
return nil
}
resp, err := exchangeToken(ctx)
if err != nil {
return err
}
client.SessionToken = resp.SessionToken
return nil
},
EnvironmentVariables: map[string]string{
// Override the "CODER_AGENT_TOKEN" variable in all
// shells so "gitssh" works!
"CODER_AGENT_TOKEN": client.SessionToken,
},
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
StatsReporter: client.AgentReportStats,
WorkspaceAgentApps: client.WorkspaceAgentApps,
PostWorkspaceAgentAppHealth: client.PostWorkspaceAgentAppHealth,
})
<-cmd.Context().Done()
return closer.Close()
Expand Down
5 changes: 2 additions & 3 deletions cli/configssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ func TestConfigSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = authToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer func() {
_ = agentCloser.Close()
Expand Down
5 changes: 2 additions & 3 deletions cli/speedtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ func TestSpeedtest(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer agentCloser.Close()
coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)
Expand Down
15 changes: 6 additions & 9 deletions cli/ssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ func TestSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer func() {
_ = agentCloser.Close()
Expand All @@ -110,9 +109,8 @@ func TestSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
<-ctx.Done()
_ = agentCloser.Close()
Expand Down Expand Up @@ -178,9 +176,8 @@ func TestSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer agentCloser.Close()

Expand Down
1 change: 0 additions & 1 deletion coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,6 @@ func New(options *Options) *API {
r.Post("/google-instance-identity", api.postWorkspaceAuthGoogleInstanceIdentity)
r.Route("/me", func(r chi.Router) {
r.Use(httpmw.ExtractWorkspaceAgent(options.Database))
r.Get("/apps", api.workspaceAgentApps)
r.Get("/metadata", api.workspaceAgentMetadata)
r.Post("/version", api.postWorkspaceAgentVersion)
r.Post("/app-health", api.postWorkspaceAppHealth)
Expand Down
1 change: 0 additions & 1 deletion coderd/coderdtest/authorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func AGPLRoutes(a *AuthTester) (map[string]string, map[string]RouteCheck) {
"POST:/api/v2/workspaceagents/aws-instance-identity": {NoAuthorize: true},
"POST:/api/v2/workspaceagents/azure-instance-identity": {NoAuthorize: true},
"POST:/api/v2/workspaceagents/google-instance-identity": {NoAuthorize: true},
"GET:/api/v2/workspaceagents/me/apps": {NoAuthorize: true},
"GET:/api/v2/workspaceagents/me/gitsshkey": {NoAuthorize: true},
"GET:/api/v2/workspaceagents/me/metadata": {NoAuthorize: true},
"GET:/api/v2/workspaceagents/me/coordinate": {NoAuthorize: true},
Expand Down
6 changes: 2 additions & 4 deletions coderd/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,8 @@ func TestTemplateMetrics(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = authToken
agentCloser := agent.New(agent.Options{
Logger: slogtest.Make(t, nil),
StatsReporter: agentClient.AgentReportStats,
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil),
Client: agentClient,
})
defer func() {
_ = agentCloser.Close()
Expand Down
Loading