Skip to content

chore: refactor agent stats streaming #5112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/xerrors"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg"
"tailscale.com/types/netlogtype"

"cdr.dev/slog"
"github.com/coder/coder/agent/usershell"
Expand Down Expand Up @@ -98,7 +99,6 @@ func New(options Options) io.Closer {
exchangeToken: options.ExchangeToken,
filesystem: options.Filesystem,
tempDir: options.TempDir,
stats: &Stats{},
}
server.init(ctx)
return server
Expand Down Expand Up @@ -126,7 +126,6 @@ type agent struct {
sshServer *ssh.Server

network *tailnet.Conn
stats *Stats
}

// runLoop attempts to start the agent in a retry loop.
Expand Down Expand Up @@ -238,22 +237,16 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
return nil, xerrors.New("closed")
}
network, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.TailnetIP, 128)},
DERPMap: derpMap,
Logger: a.logger.Named("tailnet"),
Addresses: []netip.Prefix{netip.PrefixFrom(codersdk.TailnetIP, 128)},
DERPMap: derpMap,
Logger: a.logger.Named("tailnet"),
EnableTrafficStats: true,
})
if err != nil {
a.closeMutex.Unlock()
return nil, xerrors.Errorf("create tailnet: %w", err)
}
a.network = network
network.SetForwardTCPCallback(func(conn net.Conn, listenerExists bool) net.Conn {
if listenerExists {
// If a listener already exists, we would double-wrap the conn.
return conn
}
return a.stats.wrapConn(conn)
})
a.connCloseWait.Add(4)
a.closeMutex.Unlock()

Expand All @@ -268,7 +261,7 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
if err != nil {
return
}
go a.sshServer.HandleConn(a.stats.wrapConn(conn))
go a.sshServer.HandleConn(conn)
}
}()

Expand All @@ -284,7 +277,6 @@ func (a *agent) createTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) (*t
a.logger.Debug(ctx, "accept pty failed", slog.Error(err))
return
}
conn = a.stats.wrapConn(conn)
// This cannot use a JSON decoder, since that can
// buffer additional data that is required for the PTY.
rawLen := make([]byte, 2)
Expand Down Expand Up @@ -523,7 +515,13 @@ func (a *agent) init(ctx context.Context) {

go a.runLoop(ctx)
cl, err := a.client.AgentReportStats(ctx, a.logger, func() *codersdk.AgentStats {
return a.stats.Copy()
stats := map[netlogtype.Connection]netlogtype.Counts{}
a.closeMutex.Lock()
if a.network != nil {
stats = a.network.ExtractTrafficStats()
}
a.closeMutex.Unlock()
return convertAgentStats(stats)
})
if err != nil {
a.logger.Error(ctx, "report stats", slog.Error(err))
Expand All @@ -537,6 +535,23 @@ func (a *agent) init(ctx context.Context) {
}()
}

func convertAgentStats(counts map[netlogtype.Connection]netlogtype.Counts) *codersdk.AgentStats {
stats := &codersdk.AgentStats{
ConnsByProto: map[string]int64{},
NumConns: int64(len(counts)),
}

for conn, count := range counts {
stats.ConnsByProto[conn.Proto.String()]++
stats.RxPackets += int64(count.RxPackets)
stats.RxBytes += int64(count.RxBytes)
stats.TxPackets += int64(count.TxPackets)
stats.TxBytes += int64(count.TxBytes)
}

return stats
}

// createCommand processes raw command input with OpenSSH-like behavior.
// If the rawCommand provided is empty, it will default to the users shell.
// This injects environment variables specified by the user at launch too.
Expand Down
25 changes: 16 additions & 9 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,16 @@ func TestAgent(t *testing.T) {
session, err := sshClient.NewSession()
require.NoError(t, err)
defer session.Close()
require.NoError(t, session.Run("echo test"))

assert.EqualValues(t, 1, (<-stats).NumConns)
assert.Greater(t, (<-stats).RxBytes, int64(0))
assert.Greater(t, (<-stats).TxBytes, int64(0))
var s *codersdk.AgentStats
require.Eventuallyf(t, func() bool {
var ok bool
s, ok = <-stats
return ok && s.NumConns > 0 && s.RxBytes > 0 && s.TxBytes > 0
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats: %+v", s,
)
})

t.Run("ReconnectingPTY", func(t *testing.T) {
Expand All @@ -97,7 +103,7 @@ func TestAgent(t *testing.T) {
var s *codersdk.AgentStats
require.Eventuallyf(t, func() bool {
var ok bool
s, ok = (<-stats)
s, ok = <-stats
return ok && s.NumConns > 0 && s.RxBytes > 0 && s.TxBytes > 0
}, testutil.WaitLong, testutil.IntervalFast,
"never saw stats: %+v", s,
Expand Down Expand Up @@ -675,7 +681,7 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
}
coordinator := tailnet.NewCoordinator()
agentID := uuid.New()
statsCh := make(chan *codersdk.AgentStats)
statsCh := make(chan *codersdk.AgentStats, 50)
fs := afero.NewMemMapFs()
closer := agent.New(agent.Options{
Client: &client{
Expand All @@ -693,9 +699,10 @@ func setupAgent(t *testing.T, metadata codersdk.WorkspaceAgentMetadata, ptyTimeo
_ = closer.Close()
})
conn, err := tailnet.NewConn(&tailnet.Options{
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
DERPMap: metadata.DERPMap,
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)},
DERPMap: metadata.DERPMap,
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
EnableTrafficStats: true,
})
require.NoError(t, err)
clientConn, serverConn := net.Pipe()
Expand Down Expand Up @@ -781,7 +788,7 @@ func (c *client) AgentReportStats(ctx context.Context, _ slog.Logger, stats func
go func() {
defer close(doneCh)

t := time.NewTicker(time.Millisecond * 100)
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
for {
select {
Expand Down
58 changes: 0 additions & 58 deletions agent/stats.go

This file was deleted.

21 changes: 10 additions & 11 deletions coderd/activitybump.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand All @@ -14,14 +15,14 @@ import (

// activityBumpWorkspace automatically bumps the workspace's auto-off timer
// if it is set to expire soon.
func activityBumpWorkspace(log slog.Logger, db database.Store, workspace database.Workspace) {
func activityBumpWorkspace(log slog.Logger, db database.Store, workspaceID uuid.UUID) {
// We set a short timeout so if the app is under load, these
// low priority operations fail first.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()

err := db.InTx(func(s database.Store) error {
build, err := s.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspace.ID)
build, err := s.GetLatestWorkspaceBuildByWorkspaceID(ctx, workspaceID)
if errors.Is(err, sql.ErrNoRows) {
return nil
} else if err != nil {
Expand Down Expand Up @@ -65,15 +66,13 @@ func activityBumpWorkspace(log slog.Logger, db database.Store, workspace databas
return nil
}, nil)
if err != nil {
log.Error(
ctx, "bump failed",
slog.Error(err),
slog.F("workspace_id", workspace.ID),
)
} else {
log.Debug(
ctx, "bumped deadline from activity",
slog.F("workspace_id", workspace.ID),
log.Error(ctx, "bump failed", slog.Error(err),
slog.F("workspace_id", workspaceID),
)
return
}

log.Debug(ctx, "bumped deadline from activity",
slog.F("workspace_id", workspaceID),
)
}
1 change: 0 additions & 1 deletion coderd/activitybump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/require"

"cdr.dev/slog/sloggers/slogtest"

"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/codersdk"
Expand Down
7 changes: 5 additions & 2 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func New(options *Options) *API {
options.APIRateLimit = 512
}
if options.AgentStatsRefreshInterval == 0 {
options.AgentStatsRefreshInterval = 10 * time.Minute
options.AgentStatsRefreshInterval = 5 * time.Minute
}
if options.MetricsCacheRefreshInterval == 0 {
options.MetricsCacheRefreshInterval = time.Hour
Expand Down Expand Up @@ -493,7 +493,10 @@ func New(options *Options) *API {
r.Get("/gitauth", api.workspaceAgentsGitAuth)
r.Get("/gitsshkey", api.agentGitSSHKey)
r.Get("/coordinate", api.workspaceAgentCoordinate)
r.Get("/report-stats", api.workspaceAgentReportStats)
r.Post("/report-stats", api.workspaceAgentReportStats)
// DEPRECATED in favor of the POST endpoint above.
// TODO: remove in January 2023
r.Get("/report-stats", api.workspaceAgentReportStatsWebsocket)
})
r.Route("/{workspaceagent}", func(r chi.Router) {
r.Use(
Expand Down
1 change: 1 addition & 0 deletions coderd/coderdtest/authorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func AGPLRoutes(a *AuthTester) (map[string]string, map[string]RouteCheck) {
"POST:/api/v2/workspaceagents/me/version": {NoAuthorize: true},
"POST:/api/v2/workspaceagents/me/app-health": {NoAuthorize: true},
"GET:/api/v2/workspaceagents/me/report-stats": {NoAuthorize: true},
"POST:/api/v2/workspaceagents/me/report-stats": {NoAuthorize: true},

// These endpoints have more assertions. This is good, add more endpoints to assert if you can!
"GET:/api/v2/organizations/{organization}": {AssertObject: rbac.ResourceOrganization.InOrg(a.Admin.OrganizationID)},
Expand Down
Loading