Skip to content

Commit 1a8d639

Browse files
committed
agent: add StatsReporter
1 parent ea3c26f commit 1a8d639

38 files changed

+1466
-65
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ site/out/
4242
.vscode/*.log
4343
**/*.swp
4444
.coderv2/*
45+
**/__debug_bin

Makefile

+27-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ bin: $(shell find . -not -path './vendor/*' -type f -name '*.go') go.mod go.sum
3030
darwin:amd64,arm64
3131
.PHONY: bin
3232

33-
build: site/out/index.html $(shell find . -not -path './vendor/*' -type f -name '*.go') go.mod go.sum $(shell find ./examples/templates)
33+
GO_FILES=$(shell find . -not -path './vendor/*' -type f -name '*.go') go.mod go.sum $(shell find ./examples/templates)
34+
35+
build: site/out/index.html $(GO_FILES)
3436
rm -rf ./dist
3537
mkdir -p ./dist
3638
rm -f ./site/out/bin/coder*
@@ -55,6 +57,30 @@ build: site/out/index.html $(shell find . -not -path './vendor/*' -type f -name
5557
darwin:amd64,arm64
5658
.PHONY: build
5759

60+
# Builds a test binary for just Linux
61+
build-linux-test: site/out/index.html $(GO_FILES)
62+
rm -rf ./dist
63+
mkdir -p ./dist
64+
rm -f ./site/out/bin/coder*
65+
66+
# build slim artifacts and copy them to the site output directory
67+
./scripts/build_go_slim.sh \
68+
--version "$(VERSION)" \
69+
--compress 6 \
70+
--output ./dist/ \
71+
linux:amd64,armv7,arm64 \
72+
windows:amd64,arm64 \
73+
darwin:amd64,arm64
74+
75+
# build not-so-slim artifacts with the default name format
76+
./scripts/build_go_matrix.sh \
77+
--version "$(VERSION)" \
78+
--output ./dist/ \
79+
--archive \
80+
--package-linux \
81+
linux:amd64
82+
.PHONY: build-linux-test
83+
5884
# Runs migrations to output a dump of the database.
5985
coderd/database/dump.sql: coderd/database/gen/dump/main.go $(wildcard coderd/database/migrations/*.sql)
6086
go run coderd/database/gen/dump/main.go

agent/agent.go

+42-12
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type Options struct {
6565
WebRTCDialer WebRTCDialer
6666
FetchMetadata FetchMetadata
6767

68+
StatsReporter StatsReporter
6869
ReconnectingPTYTimeout time.Duration
6970
EnvironmentVariables map[string]string
7071
Logger slog.Logger
@@ -100,6 +101,10 @@ func New(options Options) io.Closer {
100101
envVars: options.EnvironmentVariables,
101102
coordinatorDialer: options.CoordinatorDialer,
102103
fetchMetadata: options.FetchMetadata,
104+
stats: &Stats{
105+
ProtocolStats: make(map[string]*ProtocolStats),
106+
},
107+
statsReporter: options.StatsReporter,
103108
}
104109
server.init(ctx)
105110
return server
@@ -125,6 +130,8 @@ type agent struct {
125130

126131
network *tailnet.Conn
127132
coordinatorDialer CoordinatorDialer
133+
stats *Stats
134+
statsReporter StatsReporter
128135
}
129136

130137
func (a *agent) run(ctx context.Context) {
@@ -194,6 +201,12 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
194201
a.logger.Critical(ctx, "create tailnet", slog.Error(err))
195202
return
196203
}
204+
a.network.SetForwardTCPCallback(func(conn net.Conn, listenerExists bool) net.Conn {
205+
if listenerExists {
206+
return conn
207+
}
208+
return &ConnStats{ProtocolStats: &ProtocolStats{}, Conn: conn}
209+
})
197210
go a.runCoordinator(ctx)
198211

199212
sshListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(tailnetSSHPort))
@@ -207,7 +220,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
207220
if err != nil {
208221
return
209222
}
210-
go a.sshServer.HandleConn(conn)
223+
a.sshServer.HandleConn(a.stats.wrapConn(conn, ProtocolSSH))
211224
}
212225
}()
213226
reconnectingPTYListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(tailnetReconnectingPTYPort))
@@ -364,17 +377,17 @@ func (a *agent) runStartupScript(ctx context.Context, script string) error {
364377
return nil
365378
}
366379

367-
func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) {
380+
func (a *agent) handlePeerConn(ctx context.Context, peerConn *peer.Conn) {
368381
go func() {
369382
select {
370383
case <-a.closed:
371-
case <-conn.Closed():
384+
case <-peerConn.Closed():
372385
}
373-
_ = conn.Close()
386+
_ = peerConn.Close()
374387
a.connCloseWait.Done()
375388
}()
376389
for {
377-
channel, err := conn.Accept(ctx)
390+
channel, err := peerConn.Accept(ctx)
378391
if err != nil {
379392
if errors.Is(err, peer.ErrClosed) || a.isClosed() {
380393
return
@@ -383,44 +396,46 @@ func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) {
383396
return
384397
}
385398

399+
conn := channel.NetConn()
400+
386401
switch channel.Protocol() {
387402
case ProtocolSSH:
388-
go a.sshServer.HandleConn(channel.NetConn())
403+
go a.sshServer.HandleConn(a.stats.wrapConn(conn, channel.Protocol()))
389404
case ProtocolReconnectingPTY:
390405
rawID := channel.Label()
391406
// The ID format is referenced in conn.go.
392407
// <uuid>:<height>:<width>
393408
idParts := strings.SplitN(rawID, ":", 4)
394409
if len(idParts) != 4 {
395410
a.logger.Warn(ctx, "client sent invalid id format", slog.F("raw-id", rawID))
396-
continue
411+
return
397412
}
398413
id := idParts[0]
399414
// Enforce a consistent format for IDs.
400415
_, err := uuid.Parse(id)
401416
if err != nil {
402417
a.logger.Warn(ctx, "client sent reconnection token that isn't a uuid", slog.F("id", id), slog.Error(err))
403-
continue
418+
return
404419
}
405420
// Parse the initial terminal dimensions.
406421
height, err := strconv.Atoi(idParts[1])
407422
if err != nil {
408423
a.logger.Warn(ctx, "client sent invalid height", slog.F("id", id), slog.F("height", idParts[1]))
409-
continue
424+
return
410425
}
411426
width, err := strconv.Atoi(idParts[2])
412427
if err != nil {
413428
a.logger.Warn(ctx, "client sent invalid width", slog.F("id", id), slog.F("width", idParts[2]))
414-
continue
429+
return
415430
}
416431
go a.handleReconnectingPTY(ctx, reconnectingPTYInit{
417432
ID: id,
418433
Height: uint16(height),
419434
Width: uint16(width),
420435
Command: idParts[3],
421-
}, channel.NetConn())
436+
}, a.stats.wrapConn(conn, channel.Protocol()))
422437
case ProtocolDial:
423-
go a.handleDial(ctx, channel.Label(), channel.NetConn())
438+
go a.handleDial(ctx, channel.Label(), a.stats.wrapConn(conn, channel.Protocol()))
424439
default:
425440
a.logger.Warn(ctx, "unhandled protocol from channel",
426441
slog.F("protocol", channel.Protocol()),
@@ -514,6 +529,21 @@ func (a *agent) init(ctx context.Context) {
514529
}
515530

516531
go a.run(ctx)
532+
if a.statsReporter != nil {
533+
cl, err := a.statsReporter(ctx, a.logger, func() *Stats {
534+
return a.stats.Copy()
535+
})
536+
if err != nil {
537+
a.logger.Error(ctx, "report stats", slog.Error(err))
538+
return
539+
}
540+
a.connCloseWait.Add(1)
541+
go func() {
542+
defer a.connCloseWait.Done()
543+
<-a.closed
544+
cl.Close()
545+
}()
546+
}
517547
}
518548

519549
// createCommand processes raw command input with OpenSSH-like behavior.

agent/stats.go

+61-9
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,90 @@
11
package agent
22

33
import (
4+
"context"
5+
"io"
46
"net"
5-
"time"
7+
"sync"
8+
"sync/atomic"
9+
10+
"cdr.dev/slog"
611
)
712

813
// ConnStats wraps a net.Conn with statistics.
914
type ConnStats struct {
10-
CreatedAt time.Time `json:"created_at,omitempty"`
11-
Protocol string `json:"protocol,omitempty"`
12-
RxBytes uint64 `json:"rx_bytes,omitempty"`
13-
TxBytes uint64 `json:"tx_bytes,omitempty"`
14-
15+
*ProtocolStats
1516
net.Conn `json:"-"`
1617
}
1718

1819
var _ net.Conn = new(ConnStats)
1920

2021
func (c *ConnStats) Read(b []byte) (n int, err error) {
2122
n, err = c.Conn.Read(b)
22-
c.RxBytes += uint64(n)
23+
atomic.AddInt64(&c.RxBytes, int64(n))
2324
return n, err
2425
}
2526

2627
func (c *ConnStats) Write(b []byte) (n int, err error) {
2728
n, err = c.Conn.Write(b)
28-
c.TxBytes += uint64(n)
29+
atomic.AddInt64(&c.TxBytes, int64(n))
2930
return n, err
3031
}
3132

33+
type ProtocolStats struct {
34+
NumConns int64 `json:"num_comms"`
35+
36+
// RxBytes must be read with atomic.
37+
RxBytes int64 `json:"rx_bytes"`
38+
39+
// TxBytes must be read with atomic.
40+
TxBytes int64 `json:"tx_bytes"`
41+
}
42+
3243
var _ net.Conn = new(ConnStats)
3344

3445
// Stats records the Agent's network connection statistics for use in
3546
// user-facing metrics and debugging.
3647
type Stats struct {
37-
Conns []ConnStats `json:"conns,omitempty"`
48+
sync.RWMutex `json:"-"`
49+
ProtocolStats map[string]*ProtocolStats `json:"conn_stats,omitempty"`
3850
}
51+
52+
func (s *Stats) Copy() *Stats {
53+
s.RLock()
54+
ss := Stats{ProtocolStats: make(map[string]*ProtocolStats, len(s.ProtocolStats))}
55+
for k, cs := range s.ProtocolStats {
56+
ss.ProtocolStats[k] = &ProtocolStats{
57+
NumConns: atomic.LoadInt64(&cs.NumConns),
58+
RxBytes: atomic.LoadInt64(&cs.RxBytes),
59+
TxBytes: atomic.LoadInt64(&cs.TxBytes),
60+
}
61+
}
62+
s.RUnlock()
63+
return &ss
64+
}
65+
66+
// wrapConn returns a new connection that records statistics.
67+
func (s *Stats) wrapConn(conn net.Conn, protocol string) net.Conn {
68+
s.Lock()
69+
ps, ok := s.ProtocolStats[protocol]
70+
if !ok {
71+
ps = &ProtocolStats{}
72+
s.ProtocolStats[protocol] = ps
73+
}
74+
s.Unlock()
75+
76+
atomic.AddInt64(&ps.NumConns, 1)
77+
cs := &ConnStats{
78+
ProtocolStats: ps,
79+
Conn: conn,
80+
}
81+
82+
return cs
83+
}
84+
85+
// StatsReporter periodically accept and records agent stats.
86+
type StatsReporter func(
87+
ctx context.Context,
88+
log slog.Logger,
89+
stats func() *Stats,
90+
) (io.Closer, error)

agent/stats_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestConnStats(t *testing.T) {
2020
c1, c2 := net.Pipe()
2121

2222
payload := []byte("dogs & cats")
23-
statsConn := &agent.ConnStats{Conn: c1}
23+
statsConn := &agent.ConnStats{Conn: c1, ProtocolStats: &agent.ProtocolStats{}}
2424

2525
got := make(chan []byte)
2626
go func() {
@@ -44,7 +44,7 @@ func TestConnStats(t *testing.T) {
4444
c1, c2 := net.Pipe()
4545

4646
payload := []byte("cats & dogs")
47-
statsConn := &agent.ConnStats{Conn: c1}
47+
statsConn := &agent.ConnStats{Conn: c1, ProtocolStats: &agent.ProtocolStats{}}
4848

4949
go func() {
5050
c2.Write(payload)

cli/agent.go

+1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ func workspaceAgent() *cobra.Command {
192192
"CODER_AGENT_TOKEN": client.SessionToken,
193193
},
194194
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
195+
StatsReporter: client.AgentReportStats,
195196
})
196197
<-cmd.Context().Done()
197198
return closer.Close()

cli/server.go

+25-16
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
120120
autoImportTemplates []string
121121
spooky bool
122122
verbose bool
123+
metricsCacheRefreshInterval time.Duration
124+
agentStatReportInterval time.Duration
123125
)
124126

125127
root := &cobra.Command{
@@ -345,21 +347,23 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
345347
}
346348

347349
options := &coderd.Options{
348-
AccessURL: accessURLParsed,
349-
ICEServers: iceServers,
350-
Logger: logger.Named("coderd"),
351-
Database: databasefake.New(),
352-
DERPMap: derpMap,
353-
Pubsub: database.NewPubsubInMemory(),
354-
CacheDir: cacheDir,
355-
GoogleTokenValidator: googleTokenValidator,
356-
SecureAuthCookie: secureAuthCookie,
357-
SSHKeygenAlgorithm: sshKeygenAlgorithm,
358-
TailscaleEnable: tailscaleEnable,
359-
TURNServer: turnServer,
360-
TracerProvider: tracerProvider,
361-
Telemetry: telemetry.NewNoop(),
362-
AutoImportTemplates: validatedAutoImportTemplates,
350+
AccessURL: accessURLParsed,
351+
ICEServers: iceServers,
352+
Logger: logger.Named("coderd"),
353+
Database: databasefake.New(),
354+
DERPMap: derpMap,
355+
Pubsub: database.NewPubsubInMemory(),
356+
CacheDir: cacheDir,
357+
GoogleTokenValidator: googleTokenValidator,
358+
SecureAuthCookie: secureAuthCookie,
359+
SSHKeygenAlgorithm: sshKeygenAlgorithm,
360+
TailscaleEnable: tailscaleEnable,
361+
TURNServer: turnServer,
362+
TracerProvider: tracerProvider,
363+
Telemetry: telemetry.NewNoop(),
364+
AutoImportTemplates: validatedAutoImportTemplates,
365+
MetricsCacheRefreshInterval: metricsCacheRefreshInterval,
366+
AgentStatsReportInterval: agentStatReportInterval,
363367
}
364368

365369
if oauth2GithubClientSecret != "" {
@@ -834,8 +838,13 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
834838
`Accepted values are "ed25519", "ecdsa", or "rsa4096"`)
835839
cliflag.StringArrayVarP(root.Flags(), &autoImportTemplates, "auto-import-template", "", "CODER_TEMPLATE_AUTOIMPORT", []string{}, "Which templates to auto-import. Available auto-importable templates are: kubernetes")
836840
cliflag.BoolVarP(root.Flags(), &spooky, "spooky", "", "", false, "Specifies spookiness level")
837-
cliflag.BoolVarP(root.Flags(), &verbose, "verbose", "v", "CODER_VERBOSE", false, "Enables verbose logging.")
838841
_ = root.Flags().MarkHidden("spooky")
842+
cliflag.BoolVarP(root.Flags(), &verbose, "verbose", "v", "CODER_VERBOSE", false, "Enables verbose logging.")
843+
844+
cliflag.DurationVarP(root.Flags(), &metricsCacheRefreshInterval, "metrics-cache-refresh-interval", "", "CODER_METRICS_CACHE_REFRESH_INTERVAL", time.Hour, "How frequently metrics are refreshed")
845+
_ = root.Flags().MarkHidden("metrics-cache-refresh-interval")
846+
cliflag.DurationVarP(root.Flags(), &agentStatReportInterval, "agent-stats-report-interval", "", "CODER_AGENT_STATS_REPORT_INTERVAL", time.Minute*10, "How frequently agent stats are recorded")
847+
_ = root.Flags().MarkHidden("agent-stats-report-interval")
839848

840849
return root
841850
}

0 commit comments

Comments
 (0)