Skip to content

Commit 314d7e8

Browse files
committed
chore: refactor agent stats streaming
Agents no longer use a websocket for streaming stats, and instead use the returned interval to determine when to send the next stat. Network stats are retrieved directly from Tailscale, getting rid of the need to wrap network connections.
1 parent 69e8c9e commit 314d7e8

18 files changed

+464
-295
lines changed

agent/agent.go

+30-15
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"golang.org/x/xerrors"
3333
"tailscale.com/net/speedtest"
3434
"tailscale.com/tailcfg"
35+
"tailscale.com/types/netlogtype"
3536

3637
"cdr.dev/slog"
3738
"github.com/coder/coder/agent/usershell"
@@ -98,7 +99,6 @@ func New(options Options) io.Closer {
9899
exchangeToken: options.ExchangeToken,
99100
filesystem: options.Filesystem,
100101
tempDir: options.TempDir,
101-
stats: &Stats{},
102102
}
103103
server.init(ctx)
104104
return server
@@ -126,7 +126,6 @@ type agent struct {
126126
sshServer *ssh.Server
127127

128128
network *tailnet.Conn
129-
stats *Stats
130129
}
131130

132131
// runLoop attempts to start the agent in a retry loop.
@@ -238,22 +237,16 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
238237
return nil, xerrors.New("closed")
239238
}
240239
network, err := tailnet.NewConn(&tailnet.Options{
241-
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.TailnetIP, 128)},
242-
DERPMap: derpMap,
243-
Logger: a.logger.Named("tailnet"),
240+
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.TailnetIP, 128)},
241+
DERPMap: derpMap,
242+
Logger: a.logger.Named("tailnet"),
243+
EnableTrafficStats: true,
244244
})
245245
if err != nil {
246246
a.closeMutex.Unlock()
247247
return nil, xerrors.Errorf("create tailnet: %w", err)
248248
}
249249
a.network = network
250-
network.SetForwardTCPCallback(func(conn net.Conn, listenerExists bool) net.Conn {
251-
if listenerExists {
252-
// If a listener already exists, we would double-wrap the conn.
253-
return conn
254-
}
255-
return a.stats.wrapConn(conn)
256-
})
257250
a.connCloseWait.Add(4)
258251
a.closeMutex.Unlock()
259252

@@ -268,7 +261,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
268261
if err != nil {
269262
return
270263
}
271-
go a.sshServer.HandleConn(a.stats.wrapConn(conn))
264+
go a.sshServer.HandleConn(conn)
272265
}
273266
}()
274267

@@ -284,7 +277,6 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
284277
a.logger.Debug(ctx, "accept pty failed", slog.Error(err))
285278
return
286279
}
287-
conn = a.stats.wrapConn(conn)
288280
// This cannot use a JSON decoder, since that can
289281
// buffer additional data that is required for the PTY.
290282
rawLen := make([]byte, 2)
@@ -523,7 +515,13 @@ func (a *agent) init(ctx context.Context) {
523515

524516
go a.runLoop(ctx)
525517
cl, err := a.client.AgentReportStats(ctx, a.logger, func() *codersdk.AgentStats {
526-
return a.stats.Copy()
518+
stats := map[netlogtype.Connection]netlogtype.Counts{}
519+
a.closeMutex.Lock()
520+
if a.network != nil {
521+
stats = a.network.ExtractTrafficStats()
522+
}
523+
a.closeMutex.Unlock()
524+
return convertAgentStats(stats)
527525
})
528526
if err != nil {
529527
a.logger.Error(ctx, "report stats", slog.Error(err))
@@ -537,6 +535,23 @@ func (a *agent) init(ctx context.Context) {
537535
}()
538536
}
539537

538+
func convertAgentStats(counts map[netlogtype.Connection]netlogtype.Counts) *codersdk.AgentStats {
539+
stats := &codersdk.AgentStats{
540+
ConnsByProto: map[string]int64{},
541+
NumConns: int64(len(counts)),
542+
}
543+
544+
for conn, count := range counts {
545+
stats.ConnsByProto[conn.Proto.String()]++
546+
stats.RxPackets += int64(count.RxPackets)
547+
stats.RxBytes += int64(count.RxBytes)
548+
stats.TxPackets += int64(count.TxPackets)
549+
stats.TxBytes += int64(count.TxBytes)
550+
}
551+
552+
return stats
553+
}
554+
540555
// createCommand processes raw command input with OpenSSH-like behavior.
541556
// If the rawCommand provided is empty, it will default to the users shell.
542557
// This injects environment variables specified by the user at launch too.

agent/agent_test.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,14 @@ func TestAgent(t *testing.T) {
7070
require.NoError(t, err)
7171
defer session.Close()
7272

73-
assert.EqualValues(t, 1, (<-stats).NumConns)
74-
assert.Greater(t, (<-stats).RxBytes, int64(0))
75-
assert.Greater(t, (<-stats).TxBytes, int64(0))
73+
var s *codersdk.AgentStats
74+
require.Eventuallyf(t, func() bool {
75+
var ok bool
76+
s, ok = (<-stats)
77+
return ok && s.NumConns > 0 && s.RxBytes > 0 && s.TxBytes > 0
78+
}, testutil.WaitLong, testutil.IntervalFast,
79+
"never saw stats: %+v", s,
80+
)
7681
})
7782

7883
t.Run("ReconnectingPTY", func(t *testing.T) {
@@ -693,9 +698,10 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
693698
_ = closer.Close()
694699
})
695700
conn, err := tailnet.NewConn(&tailnet.Options{
696-
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
697-
DERPMap: metadata.DERPMap,
698-
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
701+
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
702+
DERPMap: metadata.DERPMap,
703+
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
704+
EnableTrafficStats: true,
699705
})
700706
require.NoError(t, err)
701707
clientConn, serverConn := net.Pipe()

agent/stats.go

-58
This file was deleted.

coderd/activitybump.go

+10-11
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"time"
88

9+
"github.com/google/uuid"
910
"golang.org/x/xerrors"
1011

1112
"cdr.dev/slog"
@@ -14,14 +15,14 @@ import (
1415

1516
// activityBumpWorkspace automatically bumps the workspace's auto-off timer
1617
// if it is set to expire soon.
17-
func activityBumpWorkspace(log slog.Logger, db database.Store, workspace database.Workspace) {
18+
func activityBumpWorkspace(log slog.Logger, db database.Store, workspaceID uuid.UUID) {
1819
// We set a short timeout so if the app is under load, these
1920
// low priority operations fail first.
2021
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
2122
defer cancel()
2223

2324
err := db.InTx(func(s database.Store) error {
24-
build, err := s.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspace.ID)
25+
build, err := s.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspaceID)
2526
if errors.Is(err, sql.ErrNoRows) {
2627
return nil
2728
} else if err != nil {
@@ -65,15 +66,13 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas
6566
return nil
6667
}, nil)
6768
if err != nil {
68-
log.Error(
69-
ctx, "bump failed",
70-
slog.Error(err),
71-
slog.F("workspace_id", workspace.ID),
72-
)
73-
} else {
74-
log.Debug(
75-
ctx, "bumped deadline from activity",
76-
slog.F("workspace_id", workspace.ID),
69+
log.Error(ctx, "bump failed", slog.Error(err),
70+
slog.F("workspace_id", workspaceID),
7771
)
72+
return
7873
}
74+
75+
log.Debug(ctx, "bumped deadline from activity",
76+
slog.F("workspace_id", workspaceID),
77+
)
7978
}

coderd/coderd.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func New(options *Options) *API {
132132
options.APIRateLimit = 512
133133
}
134134
if options.AgentStatsRefreshInterval == 0 {
135-
options.AgentStatsRefreshInterval = 10 * time.Minute
135+
options.AgentStatsRefreshInterval = 5 * time.Minute
136136
}
137137
if options.MetricsCacheRefreshInterval == 0 {
138138
options.MetricsCacheRefreshInterval = time.Hour
@@ -493,7 +493,9 @@ func New(options *Options) *API {
493493
r.Get("/gitauth", api.workspaceAgentsGitAuth)
494494
r.Get("/gitsshkey", api.agentGitSSHKey)
495495
r.Get("/coordinate", api.workspaceAgentCoordinate)
496-
r.Get("/report-stats", api.workspaceAgentReportStats)
496+
r.Post("/report-stats", api.workspaceAgentReportStats)
497+
// DEPRECATED
498+
r.Get("/report-stats", api.workspaceAgentReportStatsWebsocket)
497499
})
498500
r.Route("/{workspaceagent}", func(r chi.Router) {
499501
r.Use(

coderd/coderdtest/authorize.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func AGPLRoutes(a *AuthTester) (map[string]string, map[string]RouteCheck) {
6464
"POST:/api/v2/workspaceagents/me/version": {NoAuthorize: true},
6565
"POST:/api/v2/workspaceagents/me/app-health": {NoAuthorize: true},
6666
"GET:/api/v2/workspaceagents/me/report-stats": {NoAuthorize: true},
67+
"POST:/api/v2/workspaceagents/me/report-stats": {NoAuthorize: true},
6768

6869
// These endpoints have more assertions. This is good, add more endpoints to assert if you can!
6970
"GET:/api/v2/organizations/{organization}": {AssertObject: rbac.ResourceOrganization.InOrg(a.Admin.OrganizationID)},

0 commit comments

Comments
 (0)