Skip to content

Commit bf3224e

Browse files
authored
fix: Refactor agent to consume API client (#4715)
* fix: Refactor agent to consume API client This simplifies a lot of code by creating an interface for the codersdk client into the agent. It also moves agent authentication code so instance identity will work between restarts. Fixes #3485 and #4082. * Fix client reconnections
1 parent c9bf2a9 commit bf3224e

19 files changed

+376
-361
lines changed

agent/agent.go

+150-160
Large diffs are not rendered by default.

agent/agent_test.go

+116-48
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"strconv"
1717
"strings"
1818
"sync"
19+
"sync/atomic"
1920
"testing"
2021
"time"
2122

@@ -503,6 +504,45 @@ func TestAgent(t *testing.T) {
503504
require.NoError(t, err)
504505
t.Logf("%.2f MBits/s", res[len(res)-1].MBitsPerSecond())
505506
})
507+
508+
t.Run("Reconnect", func(t *testing.T) {
509+
t.Parallel()
510+
// After the agent is disconnected from a coordinator, it's supposed
511+
// to reconnect!
512+
coordinator := tailnet.NewCoordinator()
513+
agentID := uuid.New()
514+
statsCh := make(chan *codersdk.AgentStats)
515+
derpMap := tailnettest.RunDERPAndSTUN(t)
516+
client := &client{
517+
t: t,
518+
agentID: agentID,
519+
metadata: codersdk.WorkspaceAgentMetadata{
520+
DERPMap: derpMap,
521+
},
522+
statsChan: statsCh,
523+
coordinator: coordinator,
524+
}
525+
initialized := atomic.Int32{}
526+
closer := agent.New(agent.Options{
527+
ExchangeToken: func(ctx context.Context) error {
528+
initialized.Add(1)
529+
return nil
530+
},
531+
Client: client,
532+
Logger: slogtest.Make(t, nil).Leveled(slog.LevelInfo),
533+
})
534+
t.Cleanup(func() {
535+
_ = closer.Close()
536+
})
537+
538+
require.Eventually(t, func() bool {
539+
return coordinator.Node(agentID) != nil
540+
}, testutil.WaitShort, testutil.IntervalFast)
541+
client.lastWorkspaceAgent()
542+
require.Eventually(t, func() bool {
543+
return initialized.Load() == 2
544+
}, testutil.WaitShort, testutil.IntervalFast)
545+
})
506546
}
507547

508548
func setupSSHCommand(t *testing.T, beforeArgs []string, afterArgs []string) *exec.Cmd {
@@ -572,57 +612,15 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
572612
agentID := uuid.New()
573613
statsCh := make(chan *codersdk.AgentStats)
574614
closer := agent.New(agent.Options{
575-
FetchMetadata: func(ctx context.Context) (codersdk.WorkspaceAgentMetadata, error) {
576-
return metadata, nil
577-
},
578-
CoordinatorDialer: func(ctx context.Context) (net.Conn, error) {
579-
clientConn, serverConn := net.Pipe()
580-
closed := make(chan struct{})
581-
t.Cleanup(func() {
582-
_ = serverConn.Close()
583-
_ = clientConn.Close()
584-
<-closed
585-
})
586-
go func() {
587-
_ = coordinator.ServeAgent(serverConn, agentID)
588-
close(closed)
589-
}()
590-
return clientConn, nil
615+
Client: &client{
616+
t: t,
617+
agentID: agentID,
618+
metadata: metadata,
619+
statsChan: statsCh,
620+
coordinator: coordinator,
591621
},
592622
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
593623
ReconnectingPTYTimeout: ptyTimeout,
594-
StatsReporter: func(ctx context.Context, log slog.Logger, statsFn func() *codersdk.AgentStats) (io.Closer, error) {
595-
doneCh := make(chan struct{})
596-
ctx, cancel := context.WithCancel(ctx)
597-
598-
go func() {
599-
defer close(doneCh)
600-
601-
t := time.NewTicker(time.Millisecond * 100)
602-
defer t.Stop()
603-
for {
604-
select {
605-
case <-ctx.Done():
606-
return
607-
case <-t.C:
608-
}
609-
select {
610-
case statsCh <- statsFn():
611-
case <-ctx.Done():
612-
return
613-
default:
614-
// We don't want to send old stats.
615-
continue
616-
}
617-
}
618-
}()
619-
return closeFunc(func() error {
620-
cancel()
621-
<-doneCh
622-
close(statsCh)
623-
return nil
624-
}), nil
625-
},
626624
})
627625
t.Cleanup(func() {
628626
_ = closer.Close()
@@ -679,3 +677,73 @@ func assertWritePayload(t *testing.T, w io.Writer, payload []byte) {
679677
assert.NoError(t, err, "write payload")
680678
assert.Equal(t, len(payload), n, "payload length does not match")
681679
}
680+
681+
type client struct {
682+
t *testing.T
683+
agentID uuid.UUID
684+
metadata codersdk.WorkspaceAgentMetadata
685+
statsChan chan *codersdk.AgentStats
686+
coordinator tailnet.Coordinator
687+
lastWorkspaceAgent func()
688+
}
689+
690+
func (c *client) WorkspaceAgentMetadata(_ context.Context) (codersdk.WorkspaceAgentMetadata, error) {
691+
return c.metadata, nil
692+
}
693+
694+
func (c *client) ListenWorkspaceAgent(_ context.Context) (net.Conn, error) {
695+
clientConn, serverConn := net.Pipe()
696+
closed := make(chan struct{})
697+
c.lastWorkspaceAgent = func() {
698+
_ = serverConn.Close()
699+
_ = clientConn.Close()
700+
<-closed
701+
}
702+
c.t.Cleanup(c.lastWorkspaceAgent)
703+
go func() {
704+
_ = c.coordinator.ServeAgent(serverConn, c.agentID)
705+
close(closed)
706+
}()
707+
return clientConn, nil
708+
}
709+
710+
func (c *client) AgentReportStats(ctx context.Context, _ slog.Logger, stats func() *codersdk.AgentStats) (io.Closer, error) {
711+
doneCh := make(chan struct{})
712+
ctx, cancel := context.WithCancel(ctx)
713+
714+
go func() {
715+
defer close(doneCh)
716+
717+
t := time.NewTicker(time.Millisecond * 100)
718+
defer t.Stop()
719+
for {
720+
select {
721+
case <-ctx.Done():
722+
return
723+
case <-t.C:
724+
}
725+
select {
726+
case c.statsChan <- stats():
727+
case <-ctx.Done():
728+
return
729+
default:
730+
// We don't want to send old stats.
731+
continue
732+
}
733+
}
734+
}()
735+
return closeFunc(func() error {
736+
cancel()
737+
<-doneCh
738+
close(c.statsChan)
739+
return nil
740+
}), nil
741+
}
742+
743+
func (*client) PostWorkspaceAgentAppHealth(_ context.Context, _ codersdk.PostWorkspaceAppHealthsRequest) error {
744+
return nil
745+
}
746+
747+
func (*client) PostWorkspaceAgentVersion(_ context.Context, _ string) error {
748+
return nil
749+
}

agent/apphealth.go

+1-9
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,8 @@ type PostWorkspaceAgentAppHealth func(context.Context, codersdk.PostWorkspaceApp
2323
type WorkspaceAppHealthReporter func(ctx context.Context)
2424

2525
// NewWorkspaceAppHealthReporter creates a WorkspaceAppHealthReporter that reports app health to coderd.
26-
func NewWorkspaceAppHealthReporter(logger slog.Logger, workspaceAgentApps WorkspaceAgentApps, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
26+
func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.WorkspaceApp, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
2727
runHealthcheckLoop := func(ctx context.Context) error {
28-
apps, err := workspaceAgentApps(ctx)
29-
if err != nil {
30-
if xerrors.Is(err, context.Canceled) {
31-
return nil
32-
}
33-
return xerrors.Errorf("getting workspace apps: %w", err)
34-
}
35-
3628
// no need to run this loop if no apps for this workspace.
3729
if len(apps) == 0 {
3830
return nil

agent/apphealth_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func setupAppReporter(ctx context.Context, t *testing.T, apps []codersdk.Workspa
199199
return nil
200200
}
201201

202-
go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), workspaceAgentApps, postWorkspaceAgentAppHealth)(ctx)
202+
go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), apps, postWorkspaceAgentAppHealth)(ctx)
203203

204204
return workspaceAgentApps, func() {
205205
for _, closeFn := range closers {

agent/stats.go

-10
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package agent
22

33
import (
4-
"context"
5-
"io"
64
"net"
75
"sync/atomic"
86

9-
"cdr.dev/slog"
107
"github.com/coder/coder/codersdk"
118
)
129

@@ -59,10 +56,3 @@ func (s *Stats) wrapConn(conn net.Conn) net.Conn {
5956

6057
return cs
6158
}
62-
63-
// StatsReporter periodically accept and records agent stats.
64-
type StatsReporter func(
65-
ctx context.Context,
66-
log slog.Logger,
67-
stats func() *codersdk.AgentStats,
68-
) (io.Closer, error)

cli/agent.go

+15-44
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/coder/coder/buildinfo"
2424
"github.com/coder/coder/cli/cliflag"
2525
"github.com/coder/coder/codersdk"
26-
"github.com/coder/retry"
2726
)
2827

2928
func workspaceAgent() *cobra.Command {
@@ -80,6 +79,8 @@ func workspaceAgent() *cobra.Command {
8079
slog.F("version", version),
8180
)
8281
client := codersdk.New(coderURL)
82+
// Set a reasonable timeout so requests can't hang forever!
83+
client.HTTPClient.Timeout = 10 * time.Second
8384

8485
if pprofEnabled {
8586
srvClose := serveHandler(cmd.Context(), logger, nil, pprofAddress, "pprof")
@@ -143,43 +144,6 @@ func workspaceAgent() *cobra.Command {
143144
}
144145
}
145146

146-
if exchangeToken != nil {
147-
logger.Info(cmd.Context(), "exchanging identity token")
148-
// Agent's can start before resources are returned from the provisioner
149-
// daemon. If there are many resources being provisioned, this time
150-
// could be significant. This is arbitrarily set at an hour to prevent
151-
// tons of idle agents from pinging coderd.
152-
ctx, cancelFunc := context.WithTimeout(cmd.Context(), time.Hour)
153-
defer cancelFunc()
154-
for retry.New(100*time.Millisecond, 5*time.Second).Wait(ctx) {
155-
var response codersdk.WorkspaceAgentAuthenticateResponse
156-
157-
response, err = exchangeToken(ctx)
158-
if err != nil {
159-
logger.Warn(ctx, "authenticate workspace", slog.F("method", auth), slog.Error(err))
160-
continue
161-
}
162-
client.SessionToken = response.SessionToken
163-
logger.Info(ctx, "authenticated", slog.F("method", auth))
164-
break
165-
}
166-
if err != nil {
167-
return xerrors.Errorf("agent failed to authenticate in time: %w", err)
168-
}
169-
}
170-
171-
retryCtx, cancelRetry := context.WithTimeout(cmd.Context(), time.Hour)
172-
defer cancelRetry()
173-
for retrier := retry.New(100*time.Millisecond, 5*time.Second); retrier.Wait(retryCtx); {
174-
err := client.PostWorkspaceAgentVersion(retryCtx, version)
175-
if err != nil {
176-
logger.Warn(retryCtx, "post agent version: %w", slog.Error(err), slog.F("version", version))
177-
continue
178-
}
179-
logger.Info(retryCtx, "updated agent version", slog.F("version", version))
180-
break
181-
}
182-
183147
executablePath, err := os.Executable()
184148
if err != nil {
185149
return xerrors.Errorf("getting os executable: %w", err)
@@ -190,17 +154,24 @@ func workspaceAgent() *cobra.Command {
190154
}
191155

192156
closer := agent.New(agent.Options{
193-
FetchMetadata: client.WorkspaceAgentMetadata,
194-
Logger: logger,
157+
Client: client,
158+
Logger: logger,
159+
ExchangeToken: func(ctx context.Context) error {
160+
if exchangeToken == nil {
161+
return nil
162+
}
163+
resp, err := exchangeToken(ctx)
164+
if err != nil {
165+
return err
166+
}
167+
client.SessionToken = resp.SessionToken
168+
return nil
169+
},
195170
EnvironmentVariables: map[string]string{
196171
// Override the "CODER_AGENT_TOKEN" variable in all
197172
// shells so "gitssh" works!
198173
"CODER_AGENT_TOKEN": client.SessionToken,
199174
},
200-
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
201-
StatsReporter: client.AgentReportStats,
202-
WorkspaceAgentApps: client.WorkspaceAgentApps,
203-
PostWorkspaceAgentAppHealth: client.PostWorkspaceAgentAppHealth,
204175
})
205176
<-cmd.Context().Done()
206177
return closer.Close()

cli/configssh_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,8 @@ func TestConfigSSH(t *testing.T) {
106106
agentClient := codersdk.New(client.URL)
107107
agentClient.SessionToken = authToken
108108
agentCloser := agent.New(agent.Options{
109-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
110-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
111-
Logger: slogtest.Make(t, nil).Named("agent"),
109+
Client: agentClient,
110+
Logger: slogtest.Make(t, nil).Named("agent"),
112111
})
113112
defer func() {
114113
_ = agentCloser.Close()

cli/speedtest_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ func TestSpeedtest(t *testing.T) {
2424
agentClient := codersdk.New(client.URL)
2525
agentClient.SessionToken = agentToken
2626
agentCloser := agent.New(agent.Options{
27-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
28-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
29-
Logger: slogtest.Make(t, nil).Named("agent"),
27+
Client: agentClient,
28+
Logger: slogtest.Make(t, nil).Named("agent"),
3029
})
3130
defer agentCloser.Close()
3231
coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)

cli/ssh_test.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,8 @@ func TestSSH(t *testing.T) {
8989
agentClient := codersdk.New(client.URL)
9090
agentClient.SessionToken = agentToken
9191
agentCloser := agent.New(agent.Options{
92-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
93-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
94-
Logger: slogtest.Make(t, nil).Named("agent"),
92+
Client: agentClient,
93+
Logger: slogtest.Make(t, nil).Named("agent"),
9594
})
9695
defer func() {
9796
_ = agentCloser.Close()
@@ -110,9 +109,8 @@ func TestSSH(t *testing.T) {
110109
agentClient := codersdk.New(client.URL)
111110
agentClient.SessionToken = agentToken
112111
agentCloser := agent.New(agent.Options{
113-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
114-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
115-
Logger: slogtest.Make(t, nil).Named("agent"),
112+
Client: agentClient,
113+
Logger: slogtest.Make(t, nil).Named("agent"),
116114
})
117115
<-ctx.Done()
118116
_ = agentCloser.Close()
@@ -178,9 +176,8 @@ func TestSSH(t *testing.T) {
178176
agentClient := codersdk.New(client.URL)
179177
agentClient.SessionToken = agentToken
180178
agentCloser := agent.New(agent.Options{
181-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
182-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
183-
Logger: slogtest.Make(t, nil).Named("agent"),
179+
Client: agentClient,
180+
Logger: slogtest.Make(t, nil).Named("agent"),
184181
})
185182
defer agentCloser.Close()
186183

0 commit comments

Comments
 (0)