Skip to content

Daily Active User Metrics #3735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
502aa52
agent: add ConnStats
ammario Aug 29, 2022
3893045
agent: add StatsReporter
ammario Aug 29, 2022
c03ad90
Frontend tests pass
ammario Sep 1, 2022
1bd9cec
Split DAUChart into its own file
ammario Sep 1, 2022
e46329b
Get FE tests passing with real data!
ammario Sep 1, 2022
d472b13
Test frontend
ammario Sep 1, 2022
e0295e0
Fix compilation error
ammario Sep 1, 2022
2f1a423
Rename ConnStats to StatsConn
ammario Sep 1, 2022
0a50cc9
continues instead of returns
ammario Sep 1, 2022
7feab5e
Fix some test failures
ammario Sep 1, 2022
a4d2cf7
Redo tests
ammario Sep 1, 2022
52c9d10
Address review comments
ammario Sep 1, 2022
3f9901e
REVAMP — backend tests work
ammario Sep 1, 2022
7840509
Black triangle
ammario Sep 1, 2022
39170cf
Consolidate template state machines
ammario Sep 1, 2022
eb373d6
Move workspaceagent endpoint
ammario Sep 1, 2022
a3d87b8
Address most review comments
ammario Sep 1, 2022
31ba0c6
Improve contrast in chart
ammario Sep 1, 2022
5b906c1
Add more agent tests
ammario Sep 1, 2022
49d9386
Fix JS ci
ammario Sep 1, 2022
b14a077
A bunch of minor touch ups
ammario Sep 1, 2022
8da24c4
Stabilize protoc
ammario Sep 1, 2022
00ec953
Merge remote-tracking branch 'origin/main' into metrics
ammario Sep 1, 2022
4940319
Update lockfile
ammario Sep 1, 2022
22b2028
Address comments + attempt to fix protoc
ammario Sep 1, 2022
0157365
fixup! Address comments + attempt to fix protoc
ammario Sep 1, 2022
b166cdd
Try to fix protoc...
ammario Sep 1, 2022
4201998
PROTO!
ammario Sep 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
agent: add StatsReporter
  • Loading branch information
ammario committed Sep 1, 2022
commit 38930457e318b8993f44a14968e3a8d46dba00fa
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ site/out/
.vscode/*.log
**/*.swp
.coderv2/*
**/__debug_bin
28 changes: 27 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ bin: $(shell find . -not -path './vendor/*' -type f -name '*.go') go.mod go.sum
darwin:amd64,arm64
.PHONY: bin

build: site/out/index.html $(shell find . -not -path './vendor/*' -type f -name '*.go') go.mod go.sum $(shell find ./examples/templates)
GO_FILES=$(shell find . -not -path './vendor/*' -type f -name '*.go') go.mod go.sum $(shell find ./examples/templates)

build: site/out/index.html $(GO_FILES)
rm -rf ./dist
mkdir -p ./dist
rm -f ./site/out/bin/coder*
Expand All @@ -55,6 +57,30 @@ build: site/out/index.html $(shell find . -not -path './vendor/*' -type f -name
darwin:amd64,arm64
.PHONY: build

# Builds a test binary for just Linux
build-linux-test: site/out/index.html $(GO_FILES)
rm -rf ./dist
mkdir -p ./dist
rm -f ./site/out/bin/coder*

# build slim artifacts and copy them to the site output directory
./scripts/build_go_slim.sh \
--version "$(VERSION)" \
--compress 6 \
--output ./dist/ \
linux:amd64,armv7,arm64 \
windows:amd64,arm64 \
darwin:amd64,arm64

# build not-so-slim artifacts with the default name format
./scripts/build_go_matrix.sh \
--version "$(VERSION)" \
--output ./dist/ \
--archive \
--package-linux \
linux:amd64
.PHONY: build-linux-test

# Runs migrations to output a dump of the database.
coderd/database/dump.sql: coderd/database/gen/dump/main.go $(wildcard coderd/database/migrations/*.sql)
go run coderd/database/gen/dump/main.go
Expand Down
54 changes: 42 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Options struct {
WebRTCDialer WebRTCDialer
FetchMetadata FetchMetadata

StatsReporter StatsReporter
ReconnectingPTYTimeout time.Duration
EnvironmentVariables map[string]string
Logger slog.Logger
Expand Down Expand Up @@ -100,6 +101,10 @@ func New(options Options) io.Closer {
envVars: options.EnvironmentVariables,
coordinatorDialer: options.CoordinatorDialer,
fetchMetadata: options.FetchMetadata,
stats: &Stats{
ProtocolStats: make(map[string]*ProtocolStats),
},
statsReporter: options.StatsReporter,
}
server.init(ctx)
return server
Expand All @@ -125,6 +130,8 @@ type agent struct {

network *tailnet.Conn
coordinatorDialer CoordinatorDialer
stats *Stats
statsReporter StatsReporter
}

func (a *agent) run(ctx context.Context) {
Expand Down Expand Up @@ -194,6 +201,12 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
a.logger.Critical(ctx, "create tailnet", slog.Error(err))
return
}
a.network.SetForwardTCPCallback(func(conn net.Conn, listenerExists bool) net.Conn {
if listenerExists {
return conn
}
return &ConnStats{ProtocolStats: &ProtocolStats{}, Conn: conn}
})
go a.runCoordinator(ctx)

sshListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(tailnetSSHPort))
Expand All @@ -207,7 +220,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
if err != nil {
return
}
go a.sshServer.HandleConn(conn)
a.sshServer.HandleConn(a.stats.wrapConn(conn, ProtocolSSH))
}
}()
reconnectingPTYListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(tailnetReconnectingPTYPort))
Expand Down Expand Up @@ -364,17 +377,17 @@ func (a *agent) runStartupScript(ctx context.Context, script string) error {
return nil
}

func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) {
func (a *agent) handlePeerConn(ctx context.Context, peerConn *peer.Conn) {
go func() {
select {
case <-a.closed:
case <-conn.Closed():
case <-peerConn.Closed():
}
_ = conn.Close()
_ = peerConn.Close()
a.connCloseWait.Done()
}()
for {
channel, err := conn.Accept(ctx)
channel, err := peerConn.Accept(ctx)
if err != nil {
if errors.Is(err, peer.ErrClosed) || a.isClosed() {
return
Expand All @@ -383,44 +396,46 @@ func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) {
return
}

conn := channel.NetConn()

switch channel.Protocol() {
case ProtocolSSH:
go a.sshServer.HandleConn(channel.NetConn())
go a.sshServer.HandleConn(a.stats.wrapConn(conn, channel.Protocol()))
case ProtocolReconnectingPTY:
rawID := channel.Label()
// The ID format is referenced in conn.go.
// <uuid>:<height>:<width>
idParts := strings.SplitN(rawID, ":", 4)
if len(idParts) != 4 {
a.logger.Warn(ctx, "client sent invalid id format", slog.F("raw-id", rawID))
continue
return
}
id := idParts[0]
// Enforce a consistent format for IDs.
_, err := uuid.Parse(id)
if err != nil {
a.logger.Warn(ctx, "client sent reconnection token that isn't a uuid", slog.F("id", id), slog.Error(err))
continue
return
}
// Parse the initial terminal dimensions.
height, err := strconv.Atoi(idParts[1])
if err != nil {
a.logger.Warn(ctx, "client sent invalid height", slog.F("id", id), slog.F("height", idParts[1]))
continue
return
}
width, err := strconv.Atoi(idParts[2])
if err != nil {
a.logger.Warn(ctx, "client sent invalid width", slog.F("id", id), slog.F("width", idParts[2]))
continue
return
}
go a.handleReconnectingPTY(ctx, reconnectingPTYInit{
ID: id,
Height: uint16(height),
Width: uint16(width),
Command: idParts[3],
}, channel.NetConn())
}, a.stats.wrapConn(conn, channel.Protocol()))
case ProtocolDial:
go a.handleDial(ctx, channel.Label(), channel.NetConn())
go a.handleDial(ctx, channel.Label(), a.stats.wrapConn(conn, channel.Protocol()))
default:
a.logger.Warn(ctx, "unhandled protocol from channel",
slog.F("protocol", channel.Protocol()),
Expand Down Expand Up @@ -514,6 +529,21 @@ func (a *agent) init(ctx context.Context) {
}

go a.run(ctx)
if a.statsReporter != nil {
cl, err := a.statsReporter(ctx, a.logger, func() *Stats {
return a.stats.Copy()
})
if err != nil {
a.logger.Error(ctx, "report stats", slog.Error(err))
return
}
a.connCloseWait.Add(1)
go func() {
defer a.connCloseWait.Done()
<-a.closed
cl.Close()
}()
}
}

// createCommand processes raw command input with OpenSSH-like behavior.
Expand Down
70 changes: 61 additions & 9 deletions agent/stats.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,90 @@
package agent

import (
"context"
"io"
"net"
"time"
"sync"
"sync/atomic"

"cdr.dev/slog"
)

// ConnStats wraps a net.Conn with statistics.
type ConnStats struct {
CreatedAt time.Time `json:"created_at,omitempty"`
Protocol string `json:"protocol,omitempty"`
RxBytes uint64 `json:"rx_bytes,omitempty"`
TxBytes uint64 `json:"tx_bytes,omitempty"`

*ProtocolStats
net.Conn `json:"-"`
}

var _ net.Conn = new(ConnStats)

func (c *ConnStats) Read(b []byte) (n int, err error) {
n, err = c.Conn.Read(b)
c.RxBytes += uint64(n)
atomic.AddInt64(&c.RxBytes, int64(n))
return n, err
}

func (c *ConnStats) Write(b []byte) (n int, err error) {
n, err = c.Conn.Write(b)
c.TxBytes += uint64(n)
atomic.AddInt64(&c.TxBytes, int64(n))
return n, err
}

type ProtocolStats struct {
NumConns int64 `json:"num_comms"`

// RxBytes must be read with atomic.
RxBytes int64 `json:"rx_bytes"`

// TxBytes must be read with atomic.
TxBytes int64 `json:"tx_bytes"`
}

var _ net.Conn = new(ConnStats)

// Stats records the Agent's network connection statistics for use in
// user-facing metrics and debugging.
type Stats struct {
Conns []ConnStats `json:"conns,omitempty"`
sync.RWMutex `json:"-"`
ProtocolStats map[string]*ProtocolStats `json:"conn_stats,omitempty"`
}

func (s *Stats) Copy() *Stats {
s.RLock()
ss := Stats{ProtocolStats: make(map[string]*ProtocolStats, len(s.ProtocolStats))}
for k, cs := range s.ProtocolStats {
ss.ProtocolStats[k] = &ProtocolStats{
NumConns: atomic.LoadInt64(&cs.NumConns),
RxBytes: atomic.LoadInt64(&cs.RxBytes),
TxBytes: atomic.LoadInt64(&cs.TxBytes),
}
}
s.RUnlock()
return &ss
}

// wrapConn returns a new connection that records statistics.
func (s *Stats) wrapConn(conn net.Conn, protocol string) net.Conn {
s.Lock()
ps, ok := s.ProtocolStats[protocol]
if !ok {
ps = &ProtocolStats{}
s.ProtocolStats[protocol] = ps
}
s.Unlock()

atomic.AddInt64(&ps.NumConns, 1)
cs := &ConnStats{
ProtocolStats: ps,
Conn: conn,
}

return cs
}

// StatsReporter periodically accept and records agent stats.
type StatsReporter func(
ctx context.Context,
log slog.Logger,
stats func() *Stats,
) (io.Closer, error)
4 changes: 2 additions & 2 deletions agent/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestConnStats(t *testing.T) {
c1, c2 := net.Pipe()

payload := []byte("dogs & cats")
statsConn := &agent.ConnStats{Conn: c1}
statsConn := &agent.ConnStats{Conn: c1, ProtocolStats: &agent.ProtocolStats{}}

got := make(chan []byte)
go func() {
Expand All @@ -44,7 +44,7 @@ func TestConnStats(t *testing.T) {
c1, c2 := net.Pipe()

payload := []byte("cats & dogs")
statsConn := &agent.ConnStats{Conn: c1}
statsConn := &agent.ConnStats{Conn: c1, ProtocolStats: &agent.ProtocolStats{}}

go func() {
c2.Write(payload)
Expand Down
1 change: 1 addition & 0 deletions cli/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func workspaceAgent() *cobra.Command {
"CODER_AGENT_TOKEN": client.SessionToken,
},
CoordinatorDialer: client.ListenWorkspaceAgentTailnet,
StatsReporter: client.AgentReportStats,
})
<-cmd.Context().Done()
return closer.Close()
Expand Down
41 changes: 25 additions & 16 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
autoImportTemplates []string
spooky bool
verbose bool
metricsCacheRefreshInterval time.Duration
agentStatReportInterval time.Duration
)

root := &cobra.Command{
Expand Down Expand Up @@ -345,21 +347,23 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
}

options := &coderd.Options{
AccessURL: accessURLParsed,
ICEServers: iceServers,
Logger: logger.Named("coderd"),
Database: databasefake.New(),
DERPMap: derpMap,
Pubsub: database.NewPubsubInMemory(),
CacheDir: cacheDir,
GoogleTokenValidator: googleTokenValidator,
SecureAuthCookie: secureAuthCookie,
SSHKeygenAlgorithm: sshKeygenAlgorithm,
TailscaleEnable: tailscaleEnable,
TURNServer: turnServer,
TracerProvider: tracerProvider,
Telemetry: telemetry.NewNoop(),
AutoImportTemplates: validatedAutoImportTemplates,
AccessURL: accessURLParsed,
ICEServers: iceServers,
Logger: logger.Named("coderd"),
Database: databasefake.New(),
DERPMap: derpMap,
Pubsub: database.NewPubsubInMemory(),
CacheDir: cacheDir,
GoogleTokenValidator: googleTokenValidator,
SecureAuthCookie: secureAuthCookie,
SSHKeygenAlgorithm: sshKeygenAlgorithm,
TailscaleEnable: tailscaleEnable,
TURNServer: turnServer,
TracerProvider: tracerProvider,
Telemetry: telemetry.NewNoop(),
AutoImportTemplates: validatedAutoImportTemplates,
MetricsCacheRefreshInterval: metricsCacheRefreshInterval,
AgentStatsReportInterval: agentStatReportInterval,
}

if oauth2GithubClientSecret != "" {
Expand Down Expand Up @@ -834,8 +838,13 @@ func Server(newAPI func(*coderd.Options) *coderd.API) *cobra.Command {
`Accepted values are "ed25519", "ecdsa", or "rsa4096"`)
cliflag.StringArrayVarP(root.Flags(), &autoImportTemplates, "auto-import-template", "", "CODER_TEMPLATE_AUTOIMPORT", []string{}, "Which templates to auto-import. Available auto-importable templates are: kubernetes")
cliflag.BoolVarP(root.Flags(), &spooky, "spooky", "", "", false, "Specifies spookiness level")
cliflag.BoolVarP(root.Flags(), &verbose, "verbose", "v", "CODER_VERBOSE", false, "Enables verbose logging.")
_ = root.Flags().MarkHidden("spooky")
cliflag.BoolVarP(root.Flags(), &verbose, "verbose", "v", "CODER_VERBOSE", false, "Enables verbose logging.")

cliflag.DurationVarP(root.Flags(), &metricsCacheRefreshInterval, "metrics-cache-refresh-interval", "", "CODER_METRICS_CACHE_REFRESH_INTERVAL", time.Hour, "How frequently metrics are refreshed")
_ = root.Flags().MarkHidden("metrics-cache-refresh-interval")
cliflag.DurationVarP(root.Flags(), &agentStatReportInterval, "agent-stats-report-interval", "", "CODER_AGENT_STATS_REPORT_INTERVAL", time.Minute*10, "How frequently agent stats are recorded")
_ = root.Flags().MarkHidden("agent-stats-report-interval")

return root
}
Expand Down
Loading