Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Usage metric pushing #38

Merged
merged 1 commit into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/coder/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.coder.com/cli"
"go.coder.com/flog"

"cdr.dev/coder-cli/internal/activity"
"cdr.dev/wsep"
)

Expand Down Expand Up @@ -145,7 +146,10 @@ func runCommand(ctx context.Context, envName string, command string, args []stri
go func() {
stdin := process.Stdin()
defer stdin.Close()
_, err := io.Copy(stdin, os.Stdin)

ap := activity.NewPusher(entClient, env.ID, sshActivityName)
wr := ap.Writer(stdin)
_, err := io.Copy(wr, os.Stdin)
if err != nil {
cancel()
}
Expand All @@ -168,3 +172,5 @@ func runCommand(ctx context.Context, envName string, command string, args []stri
}
return err
}

const sshActivityName = "ssh"
10 changes: 5 additions & 5 deletions cmd/coder/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ func (cmd *syncCmd) Run(fl *pflag.FlagSet) {
}

s := sync.Sync{
Init: cmd.init,
Environment: env,
RemoteDir: remoteDir,
LocalDir: absLocal,
Client: entClient,
Init: cmd.init,
Env: env,
RemoteDir: remoteDir,
LocalDir: absLocal,
Client: entClient,
}
for err == nil || err == sync.ErrRestartSync {
err = s.Run()
Expand Down
3 changes: 1 addition & 2 deletions cmd/coder/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
"go.coder.com/flog"
)

type urlCmd struct {
}
type urlCmd struct{}

type DevURL struct {
Url string `json:"url"`
Expand Down
41 changes: 41 additions & 0 deletions internal/activity/pusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package activity

import (
"time"

"cdr.dev/coder-cli/internal/entclient"
"go.coder.com/flog"
"golang.org/x/time/rate"
)

const pushInterval = time.Minute

// Pusher pushes activity metrics no more than once per pushInterval. Pushes
// within the same interval are a no-op.
type Pusher struct {
envID string
source string

client *entclient.Client
rate *rate.Limiter
}

func NewPusher(c *entclient.Client, envID, source string) *Pusher {
return &Pusher{
envID: envID,
source: source,
client: c,
rate: rate.NewLimiter(rate.Every(pushInterval), 1),
}
}

func (p *Pusher) Push() {
if !p.rate.Allow() {
return
}

err := p.client.PushActivity(p.source, p.envID)
if err != nil {
flog.Error("push activity: %s", err.Error())
}
}
17 changes: 17 additions & 0 deletions internal/activity/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package activity

import "io"

type activityWriter struct {
p *Pusher
wr io.Writer
}

func (w *activityWriter) Write(p []byte) (n int, err error) {
w.p.Push()
return w.wr.Write(p)
}

func (p *Pusher) Writer(wr io.Writer) io.Writer {
return &activityWriter{p: p, wr: wr}
}
19 changes: 19 additions & 0 deletions internal/entclient/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package entclient

import "net/http"

func (c Client) PushActivity(source string, envID string) error {
res, err := c.request("POST", "/api/metrics/usage/push", map[string]string{
"source": source,
"environment_id": envID,
})
if err != nil {
return err
}

if res.StatusCode != http.StatusOK {
return bodyError(res)
}

return nil
}
37 changes: 26 additions & 11 deletions internal/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"go.coder.com/flog"

"cdr.dev/coder-cli/internal/activity"
"cdr.dev/coder-cli/internal/entclient"
"cdr.dev/wsep"
)
Expand All @@ -33,8 +34,11 @@ type Sync struct {
LocalDir string
// RemoteDir is an absolute path.
RemoteDir string
entclient.Environment
*entclient.Client
// DisableMetrics disables activity metric pushing.
DisableMetrics bool

Env entclient.Environment
Client *entclient.Client
}

func (s Sync) syncPaths(delete bool, local, remote string) error {
Expand All @@ -43,7 +47,7 @@ func (s Sync) syncPaths(delete bool, local, remote string) error {
args := []string{"-zz",
"-a",
"--delete",
"-e", self + " sh", local, s.Environment.Name + ":" + remote,
"-e", self + " sh", local, s.Env.Name + ":" + remote,
}
if delete {
args = append([]string{"--delete"}, args...)
Expand All @@ -68,7 +72,7 @@ func (s Sync) syncPaths(delete bool, local, remote string) error {
}

func (s Sync) remoteRm(ctx context.Context, remote string) error {
conn, err := s.Client.DialWsep(ctx, s.Environment)
conn, err := s.Client.DialWsep(ctx, s.Env)
if err != nil {
return err
}
Expand Down Expand Up @@ -229,13 +233,16 @@ func (s Sync) workEventGroup(evs []timedEvent) {
}

const (
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
// with individual rsyncs.
// maxinflightInotify sets the maximum number of inotifies before the
// sync just restarts. Syncing a large amount of small files (e.g .git
// or node_modules) is impossible to do performantly with individual
// rsyncs.
maxInflightInotify = 8
maxEventDelay = time.Second * 7
// maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server.
// This sets a lower bound for perceivable latency, but the higher it is, the better the optimization.
// maxAcceptableDispatch is the maximum amount of time before an event
// should begin its journey to the server. This sets a lower bound for
// perceivable latency, but the higher it is, the better the
// optimization.
maxAcceptableDispatch = time.Millisecond * 50
)

Expand All @@ -245,13 +252,17 @@ const (
func (s Sync) Run() error {
events := make(chan notify.EventInfo, maxInflightInotify)
// Set up a recursive watch.
// We do this before the initial sync so we can capture any changes that may have happened during sync.
// We do this before the initial sync so we can capture any changes
// that may have happened during sync.
err := notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All)
if err != nil {
return xerrors.Errorf("create watch: %w", err)
}
defer notify.Stop(events)

ap := activity.NewPusher(s.Client, s.Env.ID, activityName)
ap.Push()

setConsoleTitle("⏳ syncing project")
err = s.initSync()
if err != nil {
Expand All @@ -265,7 +276,8 @@ func (s Sync) Run() error {
flog.Info("watching %s for changes", s.LocalDir)

var droppedEvents uint64
// Timed events lets us track how long each individual file takes to update.
// Timed events lets us track how long each individual file takes to
// update.
timedEvents := make(chan timedEvent, cap(events))
go func() {
defer close(timedEvents)
Expand Down Expand Up @@ -309,6 +321,9 @@ func (s Sync) Run() error {
}
s.workEventGroup(eventGroup)
eventGroup = eventGroup[:0]
ap.Push()
}
}
}

const activityName = "sync"