Skip to content

Commit 7a9dc0c

Browse files
committed
agent: add StatsReporter
1 parent 811afd4 commit 7a9dc0c

File tree

4 files changed

+99
-26
lines changed

4 files changed

+99
-26
lines changed

agent/agent.go

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type Options struct {
5454
EnableWireguard bool
5555
UploadWireguardKeys UploadWireguardKeys
5656
ListenWireguardPeers ListenWireguardPeers
57+
StatsReporter StatsReporter
5758
ReconnectingPTYTimeout time.Duration
5859
EnvironmentVariables map[string]string
5960
Logger slog.Logger
@@ -93,6 +94,10 @@ func New(dialer Dialer, options *Options) io.Closer {
9394
enableWireguard: options.EnableWireguard,
9495
postKeys: options.UploadWireguardKeys,
9596
listenWireguardPeers: options.ListenWireguardPeers,
97+
stats: &Stats{
98+
ActiveConns: make(map[int64]*ConnStats),
99+
},
100+
statsReporter: options.StatsReporter,
96101
}
97102
server.init(ctx)
98103
return server
@@ -120,6 +125,9 @@ type agent struct {
120125
network *peerwg.Network
121126
postKeys UploadWireguardKeys
122127
listenWireguardPeers ListenWireguardPeers
128+
129+
stats *Stats
130+
statsReporter StatsReporter
123131
}
124132

125133
func (a *agent) run(ctx context.Context) {
@@ -220,17 +228,17 @@ func (a *agent) runStartupScript(ctx context.Context, script string) error {
220228
return nil
221229
}
222230

223-
func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) {
231+
func (a *agent) handlePeerConn(ctx context.Context, peerConn *peer.Conn) {
224232
go func() {
225233
select {
226234
case <-a.closed:
227-
case <-conn.Closed():
235+
case <-peerConn.Closed():
228236
}
229-
_ = conn.Close()
237+
_ = peerConn.Close()
230238
a.connCloseWait.Done()
231239
}()
232240
for {
233-
channel, err := conn.Accept(ctx)
241+
channel, err := peerConn.Accept(ctx)
234242
if err != nil {
235243
if errors.Is(err, peer.ErrClosed) || a.isClosed() {
236244
return
@@ -239,19 +247,21 @@ func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) {
239247
return
240248
}
241249

242-
switch channel.Protocol() {
243-
case ProtocolSSH:
244-
go a.sshServer.HandleConn(channel.NetConn())
245-
case ProtocolReconnectingPTY:
246-
go a.handleReconnectingPTY(ctx, channel.Label(), channel.NetConn())
247-
case ProtocolDial:
248-
go a.handleDial(ctx, channel.Label(), channel.NetConn())
249-
default:
250-
a.logger.Warn(ctx, "unhandled protocol from channel",
251-
slog.F("protocol", channel.Protocol()),
252-
slog.F("label", channel.Label()),
253-
)
254-
}
250+
a.stats.goConn(channel.NetConn(), channel.Protocol(), func(conn net.Conn) {
251+
switch channel.Protocol() {
252+
case ProtocolSSH:
253+
a.sshServer.HandleConn(conn)
254+
case ProtocolReconnectingPTY:
255+
a.handleReconnectingPTY(ctx, channel.Label(), conn)
256+
case ProtocolDial:
257+
a.handleDial(ctx, channel.Label(), conn)
258+
default:
259+
a.logger.Warn(ctx, "unhandled protocol from channel",
260+
slog.F("protocol", channel.Protocol()),
261+
slog.F("label", channel.Label()),
262+
)
263+
}
264+
})
255265
}
256266
}
257267

@@ -339,6 +349,25 @@ func (a *agent) init(ctx context.Context) {
339349
}
340350

341351
go a.run(ctx)
352+
if a.statsReporter != nil {
353+
// If each report is approximately 100 bytes, and send a report every
354+
// 60 seconds, we send 60*24*100 or 144kB a day per agent. If there
355+
// are 100 agents with a retention policy of 30 days, we have 432MB
356+
// of logs, which we consider acceptable.
357+
go func() {
358+
timer := time.NewTimer(time.Minute)
359+
defer timer.Stop()
360+
361+
select {
362+
case <-timer.C:
363+
a.stats.RLock()
364+
a.statsReporter(a.stats)
365+
a.stats.RUnlock()
366+
case <-ctx.Done():
367+
return
368+
}
369+
}()
370+
}
342371
}
343372

344373
// createCommand processes raw command input with OpenSSH-like behavior.

agent/stats.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,21 @@ package agent
22

33
import (
44
"net"
5+
"sync"
6+
"sync/atomic"
57
"time"
68
)
79

810
// ConnStats wraps a net.Conn with statistics.
911
type ConnStats struct {
1012
CreatedAt time.Time `json:"created_at,omitempty"`
1113
Protocol string `json:"protocol,omitempty"`
12-
RxBytes uint64 `json:"rx_bytes,omitempty"`
13-
TxBytes uint64 `json:"tx_bytes,omitempty"`
14+
15+
// RxBytes must be read with atomic.
16+
RxBytes uint64 `json:"rx_bytes,omitempty"`
17+
18+
// TxBytes must be read with atomic.
19+
TxBytes uint64 `json:"tx_bytes,omitempty"`
1420

1521
net.Conn `json:"-"`
1622
}
@@ -19,13 +25,13 @@ var _ net.Conn = new(ConnStats)
1925

2026
func (c *ConnStats) Read(b []byte) (n int, err error) {
2127
n, err = c.Conn.Read(b)
22-
c.RxBytes += uint64(n)
28+
atomic.AddUint64(&c.RxBytes, uint64(n))
2329
return n, err
2430
}
2531

2632
func (c *ConnStats) Write(b []byte) (n int, err error) {
2733
n, err = c.Conn.Write(b)
28-
c.TxBytes += uint64(n)
34+
atomic.AddUint64(&c.TxBytes, uint64(n))
2935
return n, err
3036
}
3137

@@ -34,5 +40,36 @@ var _ net.Conn = new(ConnStats)
3440
// Stats records the Agent's network connection statistics for use in
3541
// user-facing metrics and debugging.
3642
type Stats struct {
37-
Conns []ConnStats `json:"conns,omitempty"`
43+
sync.RWMutex `json:"-"`
44+
// ActiveConns are identified by their start time in nanoseconds.
45+
ActiveConns map[int64]*ConnStats `json:"active_conns,omitempty"`
3846
}
47+
48+
// goConn launches a new connection-processing goroutine, account for
49+
// s.Conns in a thread-safe manner.
50+
func (s *Stats) goConn(conn net.Conn, protocol string, fn func(conn net.Conn)) {
51+
sc := &ConnStats{
52+
CreatedAt: time.Now(),
53+
Protocol: protocol,
54+
Conn: conn,
55+
}
56+
57+
key := sc.CreatedAt.UnixNano()
58+
59+
s.Lock()
60+
s.ActiveConns[key] = sc
61+
s.Unlock()
62+
63+
go func() {
64+
defer func() {
65+
s.Lock()
66+
delete(s.ActiveConns, key)
67+
s.Unlock()
68+
}()
69+
70+
fn(sc)
71+
}()
72+
}
73+
74+
// StatsReporter periodically accept and records agent stats.
75+
type StatsReporter func(s *Stats)

coderd/coderd.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ func New(options *Options) *API {
131131
})
132132
},
133133
httpmw.Prometheus(options.PrometheusRegistry),
134+
// Build-Version is helpful for debugging.
135+
func(h http.Handler) http.Handler {
136+
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
137+
w.Header().Add("Build-Version", buildinfo.Version())
138+
})
139+
},
134140
)
135141

136142
apps := func(r chi.Router) {

coderd/httpmw/workspaceparam.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ import (
88
"net/http"
99
"strings"
1010

11+
"github.com/go-chi/chi/v5"
12+
"github.com/google/uuid"
13+
1114
"github.com/coder/coder/coderd/database"
1215
"github.com/coder/coder/coderd/httpapi"
1316
"github.com/coder/coder/codersdk"
14-
"github.com/go-chi/chi/v5"
15-
"github.com/google/uuid"
1617
)
1718

1819
type workspaceParamContextKey struct{}
@@ -57,8 +58,8 @@ func ExtractWorkspaceParam(db database.Store) func(http.Handler) http.Handler {
5758
// "workspace_and_agent" URL parameter. `ExtractUserParam` must be called
5859
// before this.
5960
// This can be in the form of:
60-
// - "<workspace-name>.[workspace-agent]" : If multiple agents exist
61-
// - "<workspace-name>" : If one agent exists
61+
// - "<workspace-name>.[workspace-agent]" : If multiple agents exist
62+
// - "<workspace-name>" : If one agent exists
6263
func ExtractWorkspaceAndAgentParam(db database.Store) func(http.Handler) http.Handler {
6364
return func(next http.Handler) http.Handler {
6465
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {

0 commit comments

Comments
 (0)