Skip to content

Commit ae38bbe

Browse files
authored
chore: refactor agent stats streaming (coder#5112)
1 parent 13a4cfa commit ae38bbe

20 files changed

+524
-300
lines changed

agent/agent.go

+30-15
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"golang.org/x/xerrors"
3333
"tailscale.com/net/speedtest"
3434
"tailscale.com/tailcfg"
35+
"tailscale.com/types/netlogtype"
3536

3637
"cdr.dev/slog"
3738
"github.com/coder/coder/agent/usershell"
@@ -98,7 +99,6 @@ func New(options Options) io.Closer {
9899
exchangeToken: options.ExchangeToken,
99100
filesystem: options.Filesystem,
100101
tempDir: options.TempDir,
101-
stats: &Stats{},
102102
}
103103
server.init(ctx)
104104
return server
@@ -126,7 +126,6 @@ type agent struct {
126126
sshServer *ssh.Server
127127

128128
network *tailnet.Conn
129-
stats *Stats
130129
}
131130

132131
// runLoop attempts to start the agent in a retry loop.
@@ -238,22 +237,16 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
238237
return nil, xerrors.New("closed")
239238
}
240239
network, err := tailnet.NewConn(&tailnet.Options{
241-
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.TailnetIP, 128)},
242-
DERPMap: derpMap,
243-
Logger: a.logger.Named("tailnet"),
240+
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.TailnetIP, 128)},
241+
DERPMap: derpMap,
242+
Logger: a.logger.Named("tailnet"),
243+
EnableTrafficStats: true,
244244
})
245245
if err != nil {
246246
a.closeMutex.Unlock()
247247
return nil, xerrors.Errorf("create tailnet: %w", err)
248248
}
249249
a.network = network
250-
network.SetForwardTCPCallback(func(conn net.Conn, listenerExists bool) net.Conn {
251-
if listenerExists {
252-
// If a listener already exists, we would double-wrap the conn.
253-
return conn
254-
}
255-
return a.stats.wrapConn(conn)
256-
})
257250
a.connCloseWait.Add(4)
258251
a.closeMutex.Unlock()
259252

@@ -268,7 +261,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
268261
if err != nil {
269262
return
270263
}
271-
go a.sshServer.HandleConn(a.stats.wrapConn(conn))
264+
go a.sshServer.HandleConn(conn)
272265
}
273266
}()
274267

@@ -284,7 +277,6 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
284277
a.logger.Debug(ctx, "accept pty failed", slog.Error(err))
285278
return
286279
}
287-
conn = a.stats.wrapConn(conn)
288280
// This cannot use a JSON decoder, since that can
289281
// buffer additional data that is required for the PTY.
290282
rawLen := make([]byte, 2)
@@ -523,7 +515,13 @@ func (a *agent) init(ctx context.Context) {
523515

524516
go a.runLoop(ctx)
525517
cl, err := a.client.AgentReportStats(ctx, a.logger, func() *codersdk.AgentStats {
526-
return a.stats.Copy()
518+
stats := map[netlogtype.Connection]netlogtype.Counts{}
519+
a.closeMutex.Lock()
520+
if a.network != nil {
521+
stats = a.network.ExtractTrafficStats()
522+
}
523+
a.closeMutex.Unlock()
524+
return convertAgentStats(stats)
527525
})
528526
if err != nil {
529527
a.logger.Error(ctx, "report stats", slog.Error(err))
@@ -537,6 +535,23 @@ func (a *agent) init(ctx context.Context) {
537535
}()
538536
}
539537

538+
func convertAgentStats(counts map[netlogtype.Connection]netlogtype.Counts) *codersdk.AgentStats {
539+
stats := &codersdk.AgentStats{
540+
ConnsByProto: map[string]int64{},
541+
NumConns: int64(len(counts)),
542+
}
543+
544+
for conn, count := range counts {
545+
stats.ConnsByProto[conn.Proto.String()]++
546+
stats.RxPackets += int64(count.RxPackets)
547+
stats.RxBytes += int64(count.RxBytes)
548+
stats.TxPackets += int64(count.TxPackets)
549+
stats.TxBytes += int64(count.TxBytes)
550+
}
551+
552+
return stats
553+
}
554+
540555
// createCommand processes raw command input with OpenSSH-like behavior.
541556
// If the rawCommand provided is empty, it will default to the users shell.
542557
// This injects environment variables specified by the user at launch too.

agent/agent_test.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,16 @@ func TestAgent(t *testing.T) {
6969
session, err := sshClient.NewSession()
7070
require.NoError(t, err)
7171
defer session.Close()
72+
require.NoError(t, session.Run("echo test"))
7273

73-
assert.EqualValues(t, 1, (<-stats).NumConns)
74-
assert.Greater(t, (<-stats).RxBytes, int64(0))
75-
assert.Greater(t, (<-stats).TxBytes, int64(0))
74+
var s *codersdk.AgentStats
75+
require.Eventuallyf(t, func() bool {
76+
var ok bool
77+
s, ok = <-stats
78+
return ok && s.NumConns > 0 && s.RxBytes > 0 && s.TxBytes > 0
79+
}, testutil.WaitLong, testutil.IntervalFast,
80+
"never saw stats: %+v", s,
81+
)
7682
})
7783

7884
t.Run("ReconnectingPTY", func(t *testing.T) {
@@ -97,7 +103,7 @@ func TestAgent(t *testing.T) {
97103
var s *codersdk.AgentStats
98104
require.Eventuallyf(t, func() bool {
99105
var ok bool
100-
s, ok = (<-stats)
106+
s, ok = <-stats
101107
return ok && s.NumConns > 0 && s.RxBytes > 0 && s.TxBytes > 0
102108
}, testutil.WaitLong, testutil.IntervalFast,
103109
"never saw stats: %+v", s,
@@ -675,7 +681,7 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
675681
}
676682
coordinator := tailnet.NewCoordinator()
677683
agentID := uuid.New()
678-
statsCh := make(chan *codersdk.AgentStats)
684+
statsCh := make(chan *codersdk.AgentStats, 50)
679685
fs := afero.NewMemMapFs()
680686
closer := agent.New(agent.Options{
681687
Client: &client{
@@ -693,9 +699,10 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
693699
_ = closer.Close()
694700
})
695701
conn, err := tailnet.NewConn(&tailnet.Options{
696-
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
697-
DERPMap: metadata.DERPMap,
698-
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
702+
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
703+
DERPMap: metadata.DERPMap,
704+
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
705+
EnableTrafficStats: true,
699706
})
700707
require.NoError(t, err)
701708
clientConn, serverConn := net.Pipe()
@@ -781,7 +788,7 @@ func (c *client) AgentReportStats(ctx context.Context, _ slog.Logger, stats func
781788
go func() {
782789
defer close(doneCh)
783790

784-
t := time.NewTicker(time.Millisecond * 100)
791+
t := time.NewTicker(500 * time.Millisecond)
785792
defer t.Stop()
786793
for {
787794
select {

agent/stats.go

-58
This file was deleted.

coderd/activitybump.go

+10-11
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"time"
88

9+
"github.com/google/uuid"
910
"golang.org/x/xerrors"
1011

1112
"cdr.dev/slog"
@@ -14,14 +15,14 @@ import (
1415

1516
// activityBumpWorkspace automatically bumps the workspace's auto-off timer
1617
// if it is set to expire soon.
17-
func activityBumpWorkspace(log slog.Logger, db database.Store, workspace database.Workspace) {
18+
func activityBumpWorkspace(log slog.Logger, db database.Store, workspaceID uuid.UUID) {
1819
// We set a short timeout so if the app is under load, these
1920
// low priority operations fail first.
2021
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
2122
defer cancel()
2223

2324
err := db.InTx(func(s database.Store) error {
24-
build, err := s.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspace.ID)
25+
build, err := s.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspaceID)
2526
if errors.Is(err, sql.ErrNoRows) {
2627
return nil
2728
} else if err != nil {
@@ -65,15 +66,13 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas
6566
return nil
6667
}, nil)
6768
if err != nil {
68-
log.Error(
69-
ctx, "bump failed",
70-
slog.Error(err),
71-
slog.F("workspace_id", workspace.ID),
72-
)
73-
} else {
74-
log.Debug(
75-
ctx, "bumped deadline from activity",
76-
slog.F("workspace_id", workspace.ID),
69+
log.Error(ctx, "bump failed", slog.Error(err),
70+
slog.F("workspace_id", workspaceID),
7771
)
72+
return
7873
}
74+
75+
log.Debug(ctx, "bumped deadline from activity",
76+
slog.F("workspace_id", workspaceID),
77+
)
7978
}

coderd/activitybump_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/stretchr/testify/require"
99

1010
"cdr.dev/slog/sloggers/slogtest"
11-
1211
"github.com/coder/coder/coderd/coderdtest"
1312
"github.com/coder/coder/coderd/database"
1413
"github.com/coder/coder/codersdk"

coderd/coderd.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func New(options *Options) *API {
132132
options.APIRateLimit = 512
133133
}
134134
if options.AgentStatsRefreshInterval == 0 {
135-
options.AgentStatsRefreshInterval = 10 * time.Minute
135+
options.AgentStatsRefreshInterval = 5 * time.Minute
136136
}
137137
if options.MetricsCacheRefreshInterval == 0 {
138138
options.MetricsCacheRefreshInterval = time.Hour
@@ -493,7 +493,10 @@ func New(options *Options) *API {
493493
r.Get("/gitauth", api.workspaceAgentsGitAuth)
494494
r.Get("/gitsshkey", api.agentGitSSHKey)
495495
r.Get("/coordinate", api.workspaceAgentCoordinate)
496-
r.Get("/report-stats", api.workspaceAgentReportStats)
496+
r.Post("/report-stats", api.workspaceAgentReportStats)
497+
// DEPRECATED in favor of the POST endpoint above.
498+
// TODO: remove in January 2023
499+
r.Get("/report-stats", api.workspaceAgentReportStatsWebsocket)
497500
})
498501
r.Route("/{workspaceagent}", func(r chi.Router) {
499502
r.Use(

coderd/coderdtest/authorize.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func AGPLRoutes(a *AuthTester) (map[string]string, map[string]RouteCheck) {
6464
"POST:/api/v2/workspaceagents/me/version": {NoAuthorize: true},
6565
"POST:/api/v2/workspaceagents/me/app-health": {NoAuthorize: true},
6666
"GET:/api/v2/workspaceagents/me/report-stats": {NoAuthorize: true},
67+
"POST:/api/v2/workspaceagents/me/report-stats": {NoAuthorize: true},
6768

6869
// These endpoints have more assertions. This is good, add more endpoints to assert if you can!
6970
"GET:/api/v2/organizations/{organization}": {AssertObject: rbac.ResourceOrganization.InOrg(a.Admin.OrganizationID)},

0 commit comments

Comments
 (0)