Skip to content

fix: Refactor agent to consume API client #4715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 150 additions & 160 deletions agent/agent.go

Large diffs are not rendered by default.

164 changes: 116 additions & 48 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -503,6 +504,45 @@ func TestAgent(t *testing.T) {
require.NoError(t, err)
t.Logf("%.2f MBits/s", res[len(res)-1].MBitsPerSecond())
})

t.Run("Reconnect", func(t *testing.T) {
t.Parallel()
// After the agent is disconnected from a coordinator, it's supposed
// to reconnect!
coordinator := tailnet.NewCoordinator()
agentID := uuid.New()
statsCh := make(chan *codersdk.AgentStats)
derpMap := tailnettest.RunDERPAndSTUN(t)
client := &client{
t: t,
agentID: agentID,
metadata: codersdk.WorkspaceAgentMetadata{
DERPMap: derpMap,
},
statsChan: statsCh,
coordinator: coordinator,
}
initialized := atomic.Int32{}
closer := agent.New(agent.Options{
ExchangeToken: func(ctx context.Context) error {
initialized.Add(1)
return nil
},
Client: client,
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
})
t.Cleanup(func() {
_ = closer.Close()
})

require.Eventually(t, func() bool {
return coordinator.Node(agentID) != nil
}, testutil.WaitShort, testutil.IntervalFast)
client.lastWorkspaceAgent()
require.Eventually(t, func() bool {
return initialized.Load() == 2
}, testutil.WaitShort, testutil.IntervalFast)
})
}

func setupSSHCommand(t *testing.T, beforeArgs []string, afterArgs []string) *exec.Cmd {
Expand Down Expand Up @@ -572,57 +612,15 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
agentID := uuid.New()
statsCh := make(chan *codersdk.AgentStats)
closer := agent.New(agent.Options{
FetchMetadata: func(ctx context.Context) (codersdk.WorkspaceAgentMetadata, error) {
return metadata, nil
},
CoordinatorDialer: func(ctx context.Context) (net.Conn, error) {
clientConn, serverConn := net.Pipe()
closed := make(chan struct{})
t.Cleanup(func() {
_ = serverConn.Close()
_ = clientConn.Close()
<-closed
})
go func() {
_ = coordinator.ServeAgent(serverConn, agentID)
close(closed)
}()
return clientConn, nil
Client: &client{
t: t,
agentID: agentID,
metadata: metadata,
statsChan: statsCh,
coordinator: coordinator,
},
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
ReconnectingPTYTimeout: ptyTimeout,
StatsReporter: func(ctx context.Context, log slog.Logger, statsFn func() *codersdk.AgentStats) (io.Closer, error) {
doneCh := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)

go func() {
defer close(doneCh)

t := time.NewTicker(time.Millisecond * 100)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
select {
case statsCh <- statsFn():
case <-ctx.Done():
return
default:
// We don't want to send old stats.
continue
}
}
}()
return closeFunc(func() error {
cancel()
<-doneCh
close(statsCh)
return nil
}), nil
},
})
t.Cleanup(func() {
_ = closer.Close()
Expand Down Expand Up @@ -679,3 +677,73 @@ func assertWritePayload(t *testing.T, w io.Writer, payload []byte) {
assert.NoError(t, err, "write payload")
assert.Equal(t, len(payload), n, "payload length does not match")
}

type client struct {
t *testing.T
agentID uuid.UUID
metadata codersdk.WorkspaceAgentMetadata
statsChan chan *codersdk.AgentStats
coordinator tailnet.Coordinator
lastWorkspaceAgent func()
}

func (c *client) WorkspaceAgentMetadata(_ context.Context) (codersdk.WorkspaceAgentMetadata, error) {
return c.metadata, nil
}

func (c *client) ListenWorkspaceAgent(_ context.Context) (net.Conn, error) {
clientConn, serverConn := net.Pipe()
closed := make(chan struct{})
c.lastWorkspaceAgent = func() {
_ = serverConn.Close()
_ = clientConn.Close()
<-closed
}
c.t.Cleanup(c.lastWorkspaceAgent)
go func() {
_ = c.coordinator.ServeAgent(serverConn, c.agentID)
close(closed)
}()
return clientConn, nil
}

func (c *client) AgentReportStats(ctx context.Context, _ slog.Logger, stats func() *codersdk.AgentStats) (io.Closer, error) {
doneCh := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)

go func() {
defer close(doneCh)

t := time.NewTicker(time.Millisecond * 100)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
select {
case c.statsChan <- stats():
case <-ctx.Done():
return
default:
// We don't want to send old stats.
continue
}
}
}()
return closeFunc(func() error {
cancel()
<-doneCh
close(c.statsChan)
return nil
}), nil
}

func (*client) PostWorkspaceAgentAppHealth(_ context.Context, _ codersdk.PostWorkspaceAppHealthsRequest) error {
return nil
}

func (*client) PostWorkspaceAgentVersion(_ context.Context, _ string) error {
return nil
}
10 changes: 1 addition & 9 deletions agent/apphealth.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,8 @@ type PostWorkspaceAgentAppHealth func(context.Context, codersdk.PostWorkspaceApp
type WorkspaceAppHealthReporter func(ctx context.Context)

// NewWorkspaceAppHealthReporter creates a WorkspaceAppHealthReporter that reports app health to coderd.
func NewWorkspaceAppHealthReporter(logger slog.Logger, workspaceAgentApps WorkspaceAgentApps, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.WorkspaceApp, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
runHealthcheckLoop := func(ctx context.Context) error {
apps, err := workspaceAgentApps(ctx)
if err != nil {
if xerrors.Is(err, context.Canceled) {
return nil
}
return xerrors.Errorf("getting workspace apps: %w", err)
}

// no need to run this loop if no apps for this workspace.
if len(apps) == 0 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion agent/apphealth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func setupAppReporter(ctx context.Context, t *testing.T, apps []codersdk.Workspa
return nil
}

go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), workspaceAgentApps, postWorkspaceAgentAppHealth)(ctx)
go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), apps, postWorkspaceAgentAppHealth)(ctx)

return workspaceAgentApps, func() {
for _, closeFn := range closers {
Expand Down
10 changes: 0 additions & 10 deletions agent/stats.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package agent

import (
"context"
"io"
"net"
"sync/atomic"

"cdr.dev/slog"
"github.com/coder/coder/codersdk"
)

Expand Down Expand Up @@ -59,10 +56,3 @@ func (s *Stats) wrapConn(conn net.Conn) net.Conn {

return cs
}

// StatsReporter periodically accept and records agent stats.
type StatsReporter func(
ctx context.Context,
log slog.Logger,
stats func() *codersdk.AgentStats,
) (io.Closer, error)
59 changes: 15 additions & 44 deletions cli/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/coder/coder/buildinfo"
"github.com/coder/coder/cli/cliflag"
"github.com/coder/coder/codersdk"
"github.com/coder/retry"
)

func workspaceAgent() *cobra.Command {
Expand Down Expand Up @@ -80,6 +79,8 @@ func workspaceAgent() *cobra.Command {
slog.F("version", version),
)
client := codersdk.New(coderURL)
// Set a reasonable timeout so requests can't hang forever!
client.HTTPClient.Timeout = 10 * time.Second

if pprofEnabled {
srvClose := serveHandler(cmd.Context(), logger, nil, pprofAddress, "pprof")
Expand Down Expand Up @@ -143,43 +144,6 @@ func workspaceAgent() *cobra.Command {
}
}

if exchangeToken != nil {
logger.Info(cmd.Context(), "exchanging identity token")
// Agent's can start before resources are returned from the provisioner
// daemon. If there are many resources being provisioned, this time
// could be significant. This is arbitrarily set at an hour to prevent
// tons of idle agents from pinging coderd.
ctx, cancelFunc := context.WithTimeout(cmd.Context(), time.Hour)
defer cancelFunc()
for retry.New(100*time.Millisecond, 5*time.Second).Wait(ctx) {
var response codersdk.WorkspaceAgentAuthenticateResponse

response, err = exchangeToken(ctx)
if err != nil {
logger.Warn(ctx, "authenticate workspace", slog.F("method", auth), slog.Error(err))
continue
}
client.SessionToken = response.SessionToken
logger.Info(ctx, "authenticated", slog.F("method", auth))
break
}
if err != nil {
return xerrors.Errorf("agent failed to authenticate in time: %w", err)
}
}

retryCtx, cancelRetry := context.WithTimeout(cmd.Context(), time.Hour)
defer cancelRetry()
for retrier := retry.New(100*time.Millisecond, 5*time.Second); retrier.Wait(retryCtx); {
err := client.PostWorkspaceAgentVersion(retryCtx, version)
if err != nil {
logger.Warn(retryCtx, "post agent version: %w", slog.Error(err), slog.F("version", version))
continue
}
logger.Info(retryCtx, "updated agent version", slog.F("version", version))
break
}

executablePath, err := os.Executable()
if err != nil {
return xerrors.Errorf("getting os executable: %w", err)
Expand All @@ -190,17 +154,24 @@ func workspaceAgent() *cobra.Command {
}

closer := agent.New(agent.Options{
FetchMetadata: client.WorkspaceAgentMetadata,
Logger: logger,
Client: client,
Logger: logger,
ExchangeToken: func(ctx context.Context) error {
if exchangeToken == nil {
return nil
}
resp, err := exchangeToken(ctx)
if err != nil {
return err
}
client.SessionToken = resp.SessionToken
return nil
},
EnvironmentVariables: map[string]string{
// Override the "CODER_AGENT_TOKEN" variable in all
// shells so "gitssh" works!
"CODER_AGENT_TOKEN": client.SessionToken,
},
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
StatsReporter: client.AgentReportStats,
WorkspaceAgentApps: client.WorkspaceAgentApps,
PostWorkspaceAgentAppHealth: client.PostWorkspaceAgentAppHealth,
})
<-cmd.Context().Done()
return closer.Close()
Expand Down
5 changes: 2 additions & 3 deletions cli/configssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ func TestConfigSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = authToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer func() {
_ = agentCloser.Close()
Expand Down
5 changes: 2 additions & 3 deletions cli/speedtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ func TestSpeedtest(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer agentCloser.Close()
coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)
Expand Down
15 changes: 6 additions & 9 deletions cli/ssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ func TestSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer func() {
_ = agentCloser.Close()
Expand All @@ -110,9 +109,8 @@ func TestSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
<-ctx.Done()
_ = agentCloser.Close()
Expand Down Expand Up @@ -178,9 +176,8 @@ func TestSSH(t *testing.T) {
agentClient := codersdk.New(client.URL)
agentClient.SessionToken = agentToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent"),
Client: agentClient,
Logger: slogtest.Make(t, nil).Named("agent"),
})
defer agentCloser.Close()

Expand Down
Loading