Skip to content

Commit 44eee71

Browse files
committed
fix: Refactor agent to consume API client
This simplifies a lot of code by creating an interface for the codersdk client into the agent. It also moves agent authentication code so instance identity will work between restarts. Fixes #3485 and #4082.
1 parent f75a54c commit 44eee71

18 files changed

+315
-351
lines changed

agent/agent.go

+136-149
Large diffs are not rendered by default.

agent/agent_test.go

+74-48
Original file line numberDiff line numberDiff line change
@@ -572,57 +572,15 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
572572
agentID := uuid.New()
573573
statsCh := make(chan *codersdk.AgentStats)
574574
closer := agent.New(agent.Options{
575-
FetchMetadata: func(ctx context.Context) (codersdk.WorkspaceAgentMetadata, error) {
576-
return metadata, nil
577-
},
578-
CoordinatorDialer: func(ctx context.Context) (net.Conn, error) {
579-
clientConn, serverConn := net.Pipe()
580-
closed := make(chan struct{})
581-
t.Cleanup(func() {
582-
_ = serverConn.Close()
583-
_ = clientConn.Close()
584-
<-closed
585-
})
586-
go func() {
587-
_ = coordinator.ServeAgent(serverConn, agentID)
588-
close(closed)
589-
}()
590-
return clientConn, nil
575+
Client: &client{
576+
t: t,
577+
agentID: agentID,
578+
metadata: metadata,
579+
statsChan: statsCh,
580+
coordinator: coordinator,
591581
},
592582
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
593583
ReconnectingPTYTimeout: ptyTimeout,
594-
StatsReporter: func(ctx context.Context, log slog.Logger, statsFn func() *codersdk.AgentStats) (io.Closer, error) {
595-
doneCh := make(chan struct{})
596-
ctx, cancel := context.WithCancel(ctx)
597-
598-
go func() {
599-
defer close(doneCh)
600-
601-
t := time.NewTicker(time.Millisecond * 100)
602-
defer t.Stop()
603-
for {
604-
select {
605-
case <-ctx.Done():
606-
return
607-
case <-t.C:
608-
}
609-
select {
610-
case statsCh <- statsFn():
611-
case <-ctx.Done():
612-
return
613-
default:
614-
// We don't want to send old stats.
615-
continue
616-
}
617-
}
618-
}()
619-
return closeFunc(func() error {
620-
cancel()
621-
<-doneCh
622-
close(statsCh)
623-
return nil
624-
}), nil
625-
},
626584
})
627585
t.Cleanup(func() {
628586
_ = closer.Close()
@@ -679,3 +637,71 @@ func assertWritePayload(t *testing.T, w io.Writer, payload []byte) {
679637
assert.NoError(t, err, "write payload")
680638
assert.Equal(t, len(payload), n, "payload length does not match")
681639
}
640+
641+
type client struct {
642+
t *testing.T
643+
agentID uuid.UUID
644+
metadata codersdk.WorkspaceAgentMetadata
645+
statsChan chan *codersdk.AgentStats
646+
coordinator tailnet.Coordinator
647+
}
648+
649+
func (c *client) WorkspaceAgentMetadata(_ context.Context) (codersdk.WorkspaceAgentMetadata, error) {
650+
return c.metadata, nil
651+
}
652+
653+
func (c *client) ListenWorkspaceAgent(_ context.Context) (net.Conn, error) {
654+
clientConn, serverConn := net.Pipe()
655+
closed := make(chan struct{})
656+
c.t.Cleanup(func() {
657+
_ = serverConn.Close()
658+
_ = clientConn.Close()
659+
<-closed
660+
})
661+
go func() {
662+
_ = c.coordinator.ServeAgent(serverConn, c.agentID)
663+
close(closed)
664+
}()
665+
return clientConn, nil
666+
}
667+
668+
func (c *client) AgentReportStats(ctx context.Context, _ slog.Logger, stats func() *codersdk.AgentStats) (io.Closer, error) {
669+
doneCh := make(chan struct{})
670+
ctx, cancel := context.WithCancel(ctx)
671+
672+
go func() {
673+
defer close(doneCh)
674+
675+
t := time.NewTicker(time.Millisecond * 100)
676+
defer t.Stop()
677+
for {
678+
select {
679+
case <-ctx.Done():
680+
return
681+
case <-t.C:
682+
}
683+
select {
684+
case c.statsChan <- stats():
685+
case <-ctx.Done():
686+
return
687+
default:
688+
// We don't want to send old stats.
689+
continue
690+
}
691+
}
692+
}()
693+
return closeFunc(func() error {
694+
cancel()
695+
<-doneCh
696+
close(c.statsChan)
697+
return nil
698+
}), nil
699+
}
700+
701+
func (*client) PostWorkspaceAgentAppHealth(_ context.Context, _ codersdk.PostWorkspaceAppHealthsRequest) error {
702+
return nil
703+
}
704+
705+
func (*client) PostWorkspaceAgentVersion(_ context.Context, _ string) error {
706+
return nil
707+
}

agent/apphealth.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,8 @@ type PostWorkspaceAgentAppHealth func(context.Context, codersdk.PostWorkspaceApp
2323
type WorkspaceAppHealthReporter func(ctx context.Context)
2424

2525
// NewWorkspaceAppHealthReporter creates a WorkspaceAppHealthReporter that reports app health to coderd.
26-
func NewWorkspaceAppHealthReporter(logger slog.Logger, workspaceAgentApps WorkspaceAgentApps, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
26+
func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.WorkspaceApp, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
2727
runHealthcheckLoop := func(ctx context.Context) error {
28-
apps, err := workspaceAgentApps(ctx)
29-
if err != nil {
30-
if xerrors.Is(err, context.Canceled) {
31-
return nil
32-
}
33-
return xerrors.Errorf("getting workspace apps: %w", err)
34-
}
35-
3628
// no need to run this loop if no apps for this workspace.
3729
if len(apps) == 0 {
3830
return nil

agent/apphealth_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func setupAppReporter(ctx context.Context, t *testing.T, apps []codersdk.Workspa
199199
return nil
200200
}
201201

202-
go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), workspaceAgentApps, postWorkspaceAgentAppHealth)(ctx)
202+
go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), apps, postWorkspaceAgentAppHealth)(ctx)
203203

204204
return workspaceAgentApps, func() {
205205
for _, closeFn := range closers {

agent/stats.go

-10
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package agent
22

33
import (
4-
"context"
5-
"io"
64
"net"
75
"sync/atomic"
86

9-
"cdr.dev/slog"
107
"github.com/coder/coder/codersdk"
118
)
129

@@ -59,10 +56,3 @@ func (s *Stats) wrapConn(conn net.Conn) net.Conn {
5956

6057
return cs
6158
}
62-
63-
// StatsReporter periodically accept and records agent stats.
64-
type StatsReporter func(
65-
ctx context.Context,
66-
log slog.Logger,
67-
stats func() *codersdk.AgentStats,
68-
) (io.Closer, error)

cli/agent.go

+13-45
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"os"
1010
"path/filepath"
1111
"runtime"
12-
"time"
1312

1413
"cloud.google.com/go/compute/metadata"
1514
"github.com/spf13/cobra"
@@ -23,7 +22,6 @@ import (
2322
"github.com/coder/coder/buildinfo"
2423
"github.com/coder/coder/cli/cliflag"
2524
"github.com/coder/coder/codersdk"
26-
"github.com/coder/retry"
2725
)
2826

2927
func workspaceAgent() *cobra.Command {
@@ -143,43 +141,6 @@ func workspaceAgent() *cobra.Command {
143141
}
144142
}
145143

146-
if exchangeToken != nil {
147-
logger.Info(cmd.Context(), "exchanging identity token")
148-
// Agent's can start before resources are returned from the provisioner
149-
// daemon. If there are many resources being provisioned, this time
150-
// could be significant. This is arbitrarily set at an hour to prevent
151-
// tons of idle agents from pinging coderd.
152-
ctx, cancelFunc := context.WithTimeout(cmd.Context(), time.Hour)
153-
defer cancelFunc()
154-
for retry.New(100*time.Millisecond, 5*time.Second).Wait(ctx) {
155-
var response codersdk.WorkspaceAgentAuthenticateResponse
156-
157-
response, err = exchangeToken(ctx)
158-
if err != nil {
159-
logger.Warn(ctx, "authenticate workspace", slog.F("method", auth), slog.Error(err))
160-
continue
161-
}
162-
client.SessionToken = response.SessionToken
163-
logger.Info(ctx, "authenticated", slog.F("method", auth))
164-
break
165-
}
166-
if err != nil {
167-
return xerrors.Errorf("agent failed to authenticate in time: %w", err)
168-
}
169-
}
170-
171-
retryCtx, cancelRetry := context.WithTimeout(cmd.Context(), time.Hour)
172-
defer cancelRetry()
173-
for retrier := retry.New(100*time.Millisecond, 5*time.Second); retrier.Wait(retryCtx); {
174-
err := client.PostWorkspaceAgentVersion(retryCtx, version)
175-
if err != nil {
176-
logger.Warn(retryCtx, "post agent version: %w", slog.Error(err), slog.F("version", version))
177-
continue
178-
}
179-
logger.Info(retryCtx, "updated agent version", slog.F("version", version))
180-
break
181-
}
182-
183144
executablePath, err := os.Executable()
184145
if err != nil {
185146
return xerrors.Errorf("getting os executable: %w", err)
@@ -190,17 +151,24 @@ func workspaceAgent() *cobra.Command {
190151
}
191152

192153
closer := agent.New(agent.Options{
193-
FetchMetadata: client.WorkspaceAgentMetadata,
194-
Logger: logger,
154+
Client: client,
155+
Logger: logger,
156+
ExchangeToken: func(ctx context.Context) error {
157+
if exchangeToken == nil {
158+
return nil
159+
}
160+
resp, err := exchangeToken(ctx)
161+
if err != nil {
162+
return err
163+
}
164+
client.SessionToken = resp.SessionToken
165+
return nil
166+
},
195167
EnvironmentVariables: map[string]string{
196168
// Override the "CODER_AGENT_TOKEN" variable in all
197169
// shells so "gitssh" works!
198170
"CODER_AGENT_TOKEN": client.SessionToken,
199171
},
200-
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
201-
StatsReporter: client.AgentReportStats,
202-
WorkspaceAgentApps: client.WorkspaceAgentApps,
203-
PostWorkspaceAgentAppHealth: client.PostWorkspaceAgentAppHealth,
204172
})
205173
<-cmd.Context().Done()
206174
return closer.Close()

cli/configssh_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,8 @@ func TestConfigSSH(t *testing.T) {
106106
agentClient := codersdk.New(client.URL)
107107
agentClient.SessionToken = authToken
108108
agentCloser := agent.New(agent.Options{
109-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
110-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
111-
Logger: slogtest.Make(t, nil).Named("agent"),
109+
Client: agentClient,
110+
Logger: slogtest.Make(t, nil).Named("agent"),
112111
})
113112
defer func() {
114113
_ = agentCloser.Close()

cli/speedtest_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ func TestSpeedtest(t *testing.T) {
2424
agentClient := codersdk.New(client.URL)
2525
agentClient.SessionToken = agentToken
2626
agentCloser := agent.New(agent.Options{
27-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
28-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
29-
Logger: slogtest.Make(t, nil).Named("agent"),
27+
Client: agentClient,
28+
Logger: slogtest.Make(t, nil).Named("agent"),
3029
})
3130
defer agentCloser.Close()
3231
coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)

cli/ssh_test.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,8 @@ func TestSSH(t *testing.T) {
8989
agentClient := codersdk.New(client.URL)
9090
agentClient.SessionToken = agentToken
9191
agentCloser := agent.New(agent.Options{
92-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
93-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
94-
Logger: slogtest.Make(t, nil).Named("agent"),
92+
Client: agentClient,
93+
Logger: slogtest.Make(t, nil).Named("agent"),
9594
})
9695
defer func() {
9796
_ = agentCloser.Close()
@@ -110,9 +109,8 @@ func TestSSH(t *testing.T) {
110109
agentClient := codersdk.New(client.URL)
111110
agentClient.SessionToken = agentToken
112111
agentCloser := agent.New(agent.Options{
113-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
114-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
115-
Logger: slogtest.Make(t, nil).Named("agent"),
112+
Client: agentClient,
113+
Logger: slogtest.Make(t, nil).Named("agent"),
116114
})
117115
<-ctx.Done()
118116
_ = agentCloser.Close()
@@ -178,9 +176,8 @@ func TestSSH(t *testing.T) {
178176
agentClient := codersdk.New(client.URL)
179177
agentClient.SessionToken = agentToken
180178
agentCloser := agent.New(agent.Options{
181-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
182-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
183-
Logger: slogtest.Make(t, nil).Named("agent"),
179+
Client: agentClient,
180+
Logger: slogtest.Make(t, nil).Named("agent"),
184181
})
185182
defer agentCloser.Close()
186183

coderd/coderd.go

-1
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,6 @@ func New(options *Options) *API {
471471
r.Post("/google-instance-identity", api.postWorkspaceAuthGoogleInstanceIdentity)
472472
r.Route("/me", func(r chi.Router) {
473473
r.Use(httpmw.ExtractWorkspaceAgent(options.Database))
474-
r.Get("/apps", api.workspaceAgentApps)
475474
r.Get("/metadata", api.workspaceAgentMetadata)
476475
r.Post("/version", api.postWorkspaceAgentVersion)
477476
r.Post("/app-health", api.postWorkspaceAppHealth)

coderd/coderdtest/authorize.go

-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ func AGPLRoutes(a *AuthTester) (map[string]string, map[string]RouteCheck) {
5757
"POST:/api/v2/workspaceagents/aws-instance-identity": {NoAuthorize: true},
5858
"POST:/api/v2/workspaceagents/azure-instance-identity": {NoAuthorize: true},
5959
"POST:/api/v2/workspaceagents/google-instance-identity": {NoAuthorize: true},
60-
"GET:/api/v2/workspaceagents/me/apps": {NoAuthorize: true},
6160
"GET:/api/v2/workspaceagents/me/gitsshkey": {NoAuthorize: true},
6261
"GET:/api/v2/workspaceagents/me/metadata": {NoAuthorize: true},
6362
"GET:/api/v2/workspaceagents/me/coordinate": {NoAuthorize: true},

coderd/templates_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -603,10 +603,8 @@ func TestTemplateMetrics(t *testing.T) {
603603
agentClient := codersdk.New(client.URL)
604604
agentClient.SessionToken = authToken
605605
agentCloser := agent.New(agent.Options{
606-
Logger: slogtest.Make(t, nil),
607-
StatsReporter: agentClient.AgentReportStats,
608-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
609-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
606+
Logger: slogtest.Make(t, nil),
607+
Client: agentClient,
610608
})
611609
defer func() {
612610
_ = agentCloser.Close()

0 commit comments

Comments
 (0)