Skip to content

Commit f670895

Browse files
committed
Add API
1 parent bdde550 commit f670895

File tree

7 files changed

+168
-21
lines changed

7 files changed

+168
-21
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ site/out/
4242
.vscode/*.log
4343
**/*.swp
4444
.coderv2/*
45+
**/__debug_bin

agent/agent.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -350,23 +350,13 @@ func (a *agent) init(ctx context.Context) {
350350

351351
go a.run(ctx)
352352
if a.statsReporter != nil {
353-
// If each report is approximately 100 bytes, and send a report every
354-
// 60 seconds, we send 60*24*100 or 144kB a day per agent. If there
355-
// are 100 agents with a retention policy of 30 days, we have 432MB
356-
// of logs, which we consider acceptable.
357-
go func() {
358-
timer := time.NewTimer(time.Minute)
359-
defer timer.Stop()
360-
361-
select {
362-
case <-timer.C:
363-
a.stats.RLock()
364-
a.statsReporter(a.stats)
365-
a.stats.RUnlock()
366-
case <-ctx.Done():
367-
return
368-
}
369-
}()
353+
err := a.statsReporter(ctx, a.logger, func() *Stats {
354+
return a.stats.Copy()
355+
})
356+
if err != nil {
357+
a.logger.Error(ctx, "report stats", slog.Error(err))
358+
return
359+
}
370360
}
371361
}
372362

agent/stats.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package agent
22

33
import (
4+
"context"
45
"net"
56
"sync"
67
"sync/atomic"
78
"time"
9+
10+
"golang.org/x/exp/maps"
11+
12+
"cdr.dev/slog"
813
)
914

1015
// ConnStats wraps a net.Conn with statistics.
@@ -44,6 +49,15 @@ type Stats struct {
4449
ActiveConns map[int64]*ConnStats `json:"active_conns,omitempty"`
4550
}
4651

52+
func (s *Stats) Copy() *Stats {
53+
s.RLock()
54+
ss := &Stats{
55+
ActiveConns: maps.Clone(s.ActiveConns),
56+
}
57+
s.RUnlock()
58+
return ss
59+
}
60+
4761
// goConn launches a new connection-processing goroutine, account for
4862
// s.Conns in a thread-safe manner.
4963
func (s *Stats) goConn(conn net.Conn, protocol string, fn func(conn net.Conn)) {
@@ -71,4 +85,4 @@ func (s *Stats) goConn(conn net.Conn, protocol string, fn func(conn net.Conn)) {
7185
}
7286

7387
// StatsReporter periodically accept and records agent stats.
74-
type StatsReporter func(s *Stats)
88+
type StatsReporter func(ctx context.Context, log slog.Logger, stats func() *Stats) error

coderd/coderd.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ func New(options *Options) *API {
132132
},
133133
httpmw.Prometheus(options.PrometheusRegistry),
134134
// Build-Version is helpful for debugging.
135-
func(h http.Handler) http.Handler {
136-
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
135+
func(next http.Handler) http.Handler {
136+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
137137
w.Header().Add("Build-Version", buildinfo.Version())
138+
next.ServeHTTP(w, r)
138139
})
139140
},
140141
)
@@ -342,6 +343,8 @@ func New(options *Options) *API {
342343
r.Use(httpmw.ExtractWorkspaceAgent(options.Database))
343344
r.Get("/metadata", api.workspaceAgentMetadata)
344345
r.Get("/listen", api.workspaceAgentListen)
346+
r.Get("/report-stats", api.workspaceAgentReportStats)
347+
345348
r.Get("/gitsshkey", api.agentGitSSHKey)
346349
r.Get("/turn", api.workspaceAgentTurn)
347350
r.Get("/iceservers", api.workspaceAgentICEServers)

coderd/workspaceagents.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
226226
return
227227
}
228228
}
229-
230229
}
231230

232231
func (api *API) workspaceAgentListen(rw http.ResponseWriter, r *http.Request) {

coderd/workspaceagents_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,76 @@ func TestWorkspaceAgent(t *testing.T) {
7171
})
7272
}
7373

74+
func TestWorkspaceReportStats(t *testing.T) {
75+
t.Parallel()
76+
client := coderdtest.New(t, &coderdtest.Options{
77+
IncludeProvisionerD: true,
78+
})
79+
80+
user := coderdtest.CreateFirstUser(t, client)
81+
authToken := uuid.NewString()
82+
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
83+
Parse: echo.ParseComplete,
84+
ProvisionDryRun: echo.ProvisionComplete,
85+
Provision: []*proto.Provision_Response{{
86+
Type: &proto.Provision_Response_Complete{
87+
Complete: &proto.Provision_Complete{
88+
Resources: []*proto.Resource{{
89+
Name: "example",
90+
Type: "aws_instance",
91+
Agents: []*proto.Agent{{
92+
Id: uuid.NewString(),
93+
Auth: &proto.Agent_Token{
94+
Token: authToken,
95+
},
96+
}},
97+
}},
98+
},
99+
},
100+
}},
101+
})
102+
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
103+
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
104+
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
105+
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
106+
107+
agentClient := codersdk.New(client.URL)
108+
agentClient.SessionToken = authToken
109+
agentCloser := agent.New(agentClient.ListenWorkspaceAgent, &agent.Options{
110+
Logger: slogtest.Make(t, nil),
111+
StatsReporter: agentClient.AgentReportStats,
112+
})
113+
defer func() {
114+
_ = agentCloser.Close()
115+
}()
116+
resources := coderdtest.AwaitWorkspaceAgents(t, client, workspace.LatestBuild.ID)
117+
118+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
119+
defer cancel()
120+
121+
opts := &peer.ConnOptions{
122+
Logger: slogtest.Make(t, nil).Named("client"),
123+
}
124+
125+
conn, err := client.DialWorkspaceAgent(ctx, resources[0].Agents[0].ID, opts)
126+
require.NoError(t, err)
127+
defer func() {
128+
_ = conn.Close()
129+
}()
130+
131+
sshConn, err := conn.SSHClient()
132+
require.NoError(t, err)
133+
134+
session, err := sshConn.NewSession()
135+
require.NoError(t, err)
136+
137+
_, err = session.Output("echo hello")
138+
require.NoError(t, err)
139+
140+
time.Sleep(time.Second * 10)
141+
require.NoError(t, err)
142+
}
143+
74144
func TestWorkspaceAgentListen(t *testing.T) {
75145
t.Parallel()
76146

codersdk/workspaceagents.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net"
99
"net/http"
1010
"net/http/cookiejar"
11+
"time"
1112

1213
"cloud.google.com/go/compute/metadata"
1314
"github.com/google/uuid"
@@ -16,6 +17,7 @@ import (
1617
"golang.org/x/net/proxy"
1718
"golang.org/x/xerrors"
1819
"nhooyr.io/websocket"
20+
"nhooyr.io/websocket/wsjson"
1921

2022
"cdr.dev/slog"
2123

@@ -26,6 +28,7 @@ import (
2628
"github.com/coder/coder/peerbroker"
2729
"github.com/coder/coder/peerbroker/proto"
2830
"github.com/coder/coder/provisionersdk"
31+
"github.com/coder/retry"
2932
)
3033

3134
type GoogleInstanceIdentityToken struct {
@@ -481,6 +484,73 @@ func (c *Client) turnProxyDialer(ctx context.Context, httpClient *http.Client, p
481484
})
482485
}
483486

487+
// AgentReportStats begins a stat streaming connection with the Coder server.
488+
// It is resilient to network failures and intermittent coderd issues.
489+
func (c *Client) AgentReportStats(ctx context.Context, log slog.Logger, stats func() *agent.Stats) error {
490+
serverURL, err := c.URL.Parse("/api/v2/workspaceagents/me/report-stats")
491+
if err != nil {
492+
return xerrors.Errorf("parse url: %w", err)
493+
}
494+
495+
jar, err := cookiejar.New(nil)
496+
if err != nil {
497+
return xerrors.Errorf("create cookie jar: %w", err)
498+
}
499+
500+
jar.SetCookies(serverURL, []*http.Cookie{{
501+
Name: SessionTokenKey,
502+
Value: c.SessionToken,
503+
}})
504+
httpClient := &http.Client{
505+
Jar: jar,
506+
}
507+
508+
go func() {
509+
for r := retry.New(time.Second, time.Hour); r.Wait(ctx); {
510+
err = func() error {
511+
conn, res, err := websocket.Dial(ctx, serverURL.String(), &websocket.DialOptions{
512+
HTTPClient: httpClient,
513+
// Need to disable compression to avoid a data-race.
514+
CompressionMode: websocket.CompressionDisabled,
515+
})
516+
if err != nil {
517+
if res == nil {
518+
return err
519+
}
520+
return readBodyAsError(res)
521+
}
522+
523+
for {
524+
var req AgentStatsReportRequest
525+
err := wsjson.Read(ctx, conn, &req)
526+
if err != nil {
527+
return err
528+
}
529+
530+
s := stats()
531+
resp := AgentStatsReportResponse{
532+
Conns: make([]agent.ConnStats, 0, len(s.ActiveConns)),
533+
}
534+
535+
for _, cs := range s.ActiveConns {
536+
resp.Conns = append(resp.Conns, *cs)
537+
}
538+
539+
err = wsjson.Write(ctx, conn, resp)
540+
if err != nil {
541+
return err
542+
}
543+
}
544+
}()
545+
if err != nil {
546+
log.Error(ctx, "report stats", slog.Error(err))
547+
}
548+
}
549+
}()
550+
551+
return nil
552+
}
553+
484554
// AgentStatsReportRequest is a WebSocket request by coderd
485555
// to the agent for stats.
486556
type AgentStatsReportRequest struct {

0 commit comments

Comments
 (0)