Skip to content

Commit bb424c2

Browse files
committed
workspace app health reporter
1 parent 2de6d48 commit bb424c2

File tree

10 files changed

+171
-155
lines changed

10 files changed

+171
-155
lines changed

agent/agent.go

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,13 @@ const (
5151
)
5252

5353
type Options struct {
54-
CoordinatorDialer CoordinatorDialer
55-
FetchMetadata FetchMetadata
56-
FetchWorkspaceApps FetchWorkspaceApps
57-
PostWorkspaceAppHealth PostWorkspaceAppHealth
58-
StatsReporter StatsReporter
59-
ReconnectingPTYTimeout time.Duration
60-
EnvironmentVariables map[string]string
61-
Logger slog.Logger
54+
CoordinatorDialer CoordinatorDialer
55+
FetchMetadata FetchMetadata
56+
StatsReporter StatsReporter
57+
WorkspaceAppHealthReporter WorkspaceAppHealthReporter
58+
ReconnectingPTYTimeout time.Duration
59+
EnvironmentVariables map[string]string
60+
Logger slog.Logger
6261
}
6362

6463
// CoordinatorDialer is a function that constructs a new broker.
@@ -69,25 +68,24 @@ type CoordinatorDialer func(context.Context) (net.Conn, error)
6968
type FetchMetadata func(context.Context) (codersdk.WorkspaceAgentMetadata, error)
7069

7170
type FetchWorkspaceApps func(context.Context) ([]codersdk.WorkspaceApp, error)
72-
type PostWorkspaceAppHealth func(context.Context, map[string]codersdk.WorkspaceAppHealth) error
71+
type PostWorkspaceAppHealth func(context.Context, codersdk.PostWorkspaceAppHealthsRequest) error
7372

7473
func New(options Options) io.Closer {
7574
if options.ReconnectingPTYTimeout == 0 {
7675
options.ReconnectingPTYTimeout = 5 * time.Minute
7776
}
7877
ctx, cancelFunc := context.WithCancel(context.Background())
7978
server := &agent{
80-
reconnectingPTYTimeout: options.ReconnectingPTYTimeout,
81-
logger: options.Logger,
82-
closeCancel: cancelFunc,
83-
closed: make(chan struct{}),
84-
envVars: options.EnvironmentVariables,
85-
coordinatorDialer: options.CoordinatorDialer,
86-
fetchMetadata: options.FetchMetadata,
87-
stats: &Stats{},
88-
statsReporter: options.StatsReporter,
89-
fetchWorkspaceApps: options.FetchWorkspaceApps,
90-
postWorkspaceAppHealth: options.PostWorkspaceAppHealth,
79+
reconnectingPTYTimeout: options.ReconnectingPTYTimeout,
80+
logger: options.Logger,
81+
closeCancel: cancelFunc,
82+
closed: make(chan struct{}),
83+
envVars: options.EnvironmentVariables,
84+
coordinatorDialer: options.CoordinatorDialer,
85+
fetchMetadata: options.FetchMetadata,
86+
stats: &Stats{},
87+
statsReporter: options.StatsReporter,
88+
workspaceAppHealthReporter: options.WorkspaceAppHealthReporter,
9189
}
9290
server.init(ctx)
9391
return server
@@ -110,12 +108,11 @@ type agent struct {
110108
fetchMetadata FetchMetadata
111109
sshServer *ssh.Server
112110

113-
network *tailnet.Conn
114-
coordinatorDialer CoordinatorDialer
115-
stats *Stats
116-
statsReporter StatsReporter
117-
fetchWorkspaceApps FetchWorkspaceApps
118-
postWorkspaceAppHealth PostWorkspaceAppHealth
111+
network *tailnet.Conn
112+
coordinatorDialer CoordinatorDialer
113+
stats *Stats
114+
statsReporter StatsReporter
115+
workspaceAppHealthReporter WorkspaceAppHealthReporter
119116
}
120117

121118
func (a *agent) run(ctx context.Context) {
@@ -161,7 +158,7 @@ func (a *agent) run(ctx context.Context) {
161158
go a.runTailnet(ctx, metadata.DERPMap)
162159
}
163160

164-
go reportAppHealth(ctx, a.logger, a.fetchWorkspaceApps, a.postWorkspaceAppHealth)
161+
go a.workspaceAppHealthReporter(ctx)
165162
}
166163

167164
func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {

agent/apphealth.go

Lines changed: 99 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -13,118 +13,124 @@ import (
1313
"github.com/coder/retry"
1414
)
1515

16-
func reportAppHealth(ctx context.Context, logger slog.Logger, fetchApps FetchWorkspaceApps, reportHealth PostWorkspaceAppHealth) {
17-
r := retry.New(time.Second, 30*time.Second)
18-
for {
19-
err := func() error {
20-
apps, err := fetchApps(ctx)
21-
if err != nil {
22-
return xerrors.Errorf("getting workspace apps: %w", err)
23-
}
16+
type WorkspaceAppHealthReporter func(ctx context.Context)
2417

25-
if len(apps) == 0 {
26-
return nil
27-
}
18+
func NewWorkspaceAppHealthReporter(logger slog.Logger, client *codersdk.Client) WorkspaceAppHealthReporter {
19+
return func(ctx context.Context) {
20+
r := retry.New(time.Second, 30*time.Second)
21+
for {
22+
err := func() error {
23+
apps, err := client.WorkspaceAgentApps(ctx)
24+
if err != nil {
25+
return xerrors.Errorf("getting workspace apps: %w", err)
26+
}
2827

29-
health := make(map[string]codersdk.WorkspaceAppHealth, 0)
30-
for _, app := range apps {
31-
health[app.Name] = app.Health
32-
}
28+
if len(apps) == 0 {
29+
return nil
30+
}
3331

34-
tickers := make(chan string)
35-
for _, app := range apps {
36-
if shouldStartTicker(app) {
37-
t := time.NewTicker(time.Duration(app.HealthcheckInterval) * time.Second)
38-
go func() {
39-
for {
40-
select {
41-
case <-ctx.Done():
42-
return
43-
case <-t.C:
44-
tickers <- app.Name
45-
}
46-
}
47-
}()
32+
health := make(map[string]codersdk.WorkspaceAppHealth, 0)
33+
for _, app := range apps {
34+
health[app.Name] = app.Health
4835
}
49-
}
50-
var mu sync.RWMutex
51-
var failures map[string]int
52-
go func() {
53-
for {
54-
select {
55-
case <-ctx.Done():
56-
return
57-
case name := <-tickers:
58-
for _, app := range apps {
59-
if app.Name != name {
60-
continue
61-
}
6236

63-
client := &http.Client{
64-
Timeout: time.Duration(app.HealthcheckInterval),
65-
}
66-
err := func() error {
67-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, app.HealthcheckURL, nil)
68-
if err != nil {
69-
return err
37+
tickers := make(chan string)
38+
for _, app := range apps {
39+
if shouldStartTicker(app) {
40+
t := time.NewTicker(time.Duration(app.HealthcheckInterval) * time.Second)
41+
go func() {
42+
for {
43+
select {
44+
case <-ctx.Done():
45+
return
46+
case <-t.C:
47+
tickers <- app.Name
7048
}
71-
res, err := client.Do(req)
72-
if err != nil {
73-
return err
49+
}
50+
}()
51+
}
52+
}
53+
var mu sync.RWMutex
54+
var failures map[string]int
55+
go func() {
56+
for {
57+
select {
58+
case <-ctx.Done():
59+
return
60+
case name := <-tickers:
61+
for _, app := range apps {
62+
if app.Name != name {
63+
continue
7464
}
75-
res.Body.Close()
76-
if res.StatusCode >= http.StatusInternalServerError {
77-
return xerrors.Errorf("error status code: %d", res.StatusCode)
65+
66+
client := &http.Client{
67+
Timeout: time.Duration(app.HealthcheckInterval),
7868
}
69+
err := func() error {
70+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, app.HealthcheckURL, nil)
71+
if err != nil {
72+
return err
73+
}
74+
res, err := client.Do(req)
75+
if err != nil {
76+
return err
77+
}
78+
res.Body.Close()
79+
if res.StatusCode >= http.StatusInternalServerError {
80+
return xerrors.Errorf("error status code: %d", res.StatusCode)
81+
}
7982

80-
return nil
81-
}()
82-
if err == nil {
83-
mu.Lock()
84-
failures[app.Name]++
85-
if failures[app.Name] > int(app.HealthcheckThreshold) {
86-
health[app.Name] = codersdk.WorkspaceAppHealthUnhealthy
83+
return nil
84+
}()
85+
if err == nil {
86+
mu.Lock()
87+
failures[app.Name]++
88+
if failures[app.Name] > int(app.HealthcheckThreshold) {
89+
health[app.Name] = codersdk.WorkspaceAppHealthUnhealthy
90+
}
91+
mu.Unlock()
92+
} else {
93+
mu.Lock()
94+
failures[app.Name] = 0
95+
health[app.Name] = codersdk.WorkspaceAppHealthHealthy
96+
mu.Unlock()
8797
}
88-
mu.Unlock()
89-
} else {
90-
mu.Lock()
91-
failures[app.Name] = 0
92-
health[app.Name] = codersdk.WorkspaceAppHealthHealthy
93-
mu.Unlock()
9498
}
9599
}
96100
}
97-
}
98-
}()
101+
}()
99102

100-
reportTicker := time.NewTicker(time.Second)
101-
lastHealth := make(map[string]codersdk.WorkspaceAppHealth, 0)
102-
for {
103-
select {
104-
case <-ctx.Done():
105-
return nil
106-
case <-reportTicker.C:
107-
mu.RLock()
108-
changed := healthChanged(lastHealth, health)
109-
mu.RUnlock()
110-
if changed {
111-
lastHealth = health
112-
err := reportHealth(ctx, health)
113-
if err != nil {
114-
logger.Error(ctx, "failed to report workspace app stat", slog.Error(err))
103+
reportTicker := time.NewTicker(time.Second)
104+
lastHealth := make(map[string]codersdk.WorkspaceAppHealth, 0)
105+
for {
106+
select {
107+
case <-ctx.Done():
108+
return nil
109+
case <-reportTicker.C:
110+
mu.RLock()
111+
changed := healthChanged(lastHealth, health)
112+
mu.RUnlock()
113+
if changed {
114+
lastHealth = health
115+
err := client.PostWorkspaceAgentAppHealth(ctx, codersdk.PostWorkspaceAppHealthsRequest{
116+
Healths: health,
117+
})
118+
if err != nil {
119+
logger.Error(ctx, "failed to report workspace app stat", slog.Error(err))
120+
}
115121
}
116122
}
117123
}
124+
}()
125+
if err != nil {
126+
logger.Error(ctx, "failed running workspace app reporter", slog.Error(err))
127+
// continue loop with backoff on non-nil errors
128+
r.Wait(ctx)
129+
continue
118130
}
119-
}()
120-
if err != nil {
121-
logger.Error(ctx, "failed running workspace app reporter", slog.Error(err))
122-
// continue loop with backoff on non-nil errors
123-
r.Wait(ctx)
124-
continue
125-
}
126131

127-
return
132+
return
133+
}
128134
}
129135
}
130136

cli/agent.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,10 @@ func workspaceAgent() *cobra.Command {
189189
// shells so "gitssh" works!
190190
"CODER_AGENT_TOKEN": client.SessionToken,
191191
},
192-
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
193-
StatsReporter: client.AgentReportStats,
192+
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
193+
StatsReporter: client.AgentReportStats,
194+
FetchWorkspaceApps: client.WorkspaceAgentApps,
195+
PostWorkspaceAppHealth: client.PostWorkspaceAgentAppHealth,
194196
})
195197
<-cmd.Context().Done()
196198
return closer.Close()

cli/configssh_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,10 @@ func TestConfigSSH(t *testing.T) {
107107
agentClient := codersdk.New(client.URL)
108108
agentClient.SessionToken = authToken
109109
agentCloser := agent.New(agent.Options{
110-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
111-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
112-
Logger: slogtest.Make(t, nil).Named("agent"),
110+
FetchMetadata: agentClient.WorkspaceAgentMetadata,
111+
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
112+
Logger: slogtest.Make(t, nil).Named("agent"),
113+
WorkspaceAppHealthReporter: func(context.Context) {},
113114
})
114115
defer func() {
115116
_ = agentCloser.Close()

cli/speedtest_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ func TestSpeedtest(t *testing.T) {
2424
agentClient := codersdk.New(client.URL)
2525
agentClient.SessionToken = agentToken
2626
agentCloser := agent.New(agent.Options{
27-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
28-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
29-
Logger: slogtest.Make(t, nil).Named("agent"),
27+
FetchMetadata: agentClient.WorkspaceAgentMetadata,
28+
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
29+
Logger: slogtest.Make(t, nil).Named("agent"),
30+
WorkspaceAppHealthReporter: func(context.Context) {},
3031
})
3132
defer agentCloser.Close()
3233
coderdtest.AwaitWorkspaceAgents(t, client, workspace.LatestBuild.ID)

cli/ssh_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,10 @@ func TestSSH(t *testing.T) {
8989
agentClient := codersdk.New(client.URL)
9090
agentClient.SessionToken = agentToken
9191
agentCloser := agent.New(agent.Options{
92-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
93-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
94-
Logger: slogtest.Make(t, nil).Named("agent"),
92+
FetchMetadata: agentClient.WorkspaceAgentMetadata,
93+
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
94+
Logger: slogtest.Make(t, nil).Named("agent"),
95+
WorkspaceAppHealthReporter: func(context.Context) {},
9596
})
9697
defer func() {
9798
_ = agentCloser.Close()
@@ -110,9 +111,10 @@ func TestSSH(t *testing.T) {
110111
agentClient := codersdk.New(client.URL)
111112
agentClient.SessionToken = agentToken
112113
agentCloser := agent.New(agent.Options{
113-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
114-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
115-
Logger: slogtest.Make(t, nil).Named("agent"),
114+
FetchMetadata: agentClient.WorkspaceAgentMetadata,
115+
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
116+
Logger: slogtest.Make(t, nil).Named("agent"),
117+
WorkspaceAppHealthReporter: func(context.Context) {},
116118
})
117119
<-ctx.Done()
118120
_ = agentCloser.Close()
@@ -178,9 +180,10 @@ func TestSSH(t *testing.T) {
178180
agentClient := codersdk.New(client.URL)
179181
agentClient.SessionToken = agentToken
180182
agentCloser := agent.New(agent.Options{
181-
FetchMetadata: agentClient.WorkspaceAgentMetadata,
182-
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
183-
Logger: slogtest.Make(t, nil).Named("agent"),
183+
FetchMetadata: agentClient.WorkspaceAgentMetadata,
184+
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
185+
Logger: slogtest.Make(t, nil).Named("agent"),
186+
WorkspaceAppHealthReporter: func(context.Context) {},
184187
})
185188
defer agentCloser.Close()
186189

0 commit comments

Comments
 (0)