Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit b2ecc19

Browse files
committed
Use rate/limit, make Writer a method
1 parent d259dcc commit b2ecc19

File tree

4 files changed

+23
-39
lines changed

4 files changed

+23
-39
lines changed

cmd/coder/shell.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,8 @@ func runCommand(ctx context.Context, envName string, command string, args []stri
147147
stdin := process.Stdin()
148148
defer stdin.Close()
149149

150-
ap := &activity.Pusher{Source: sshActivityName, EnvID: env.ID, Client: entClient}
151-
defer ap.Start()()
152-
153-
wr := activity.Writer(ap, stdin)
150+
ap := activity.NewPusher(entClient, env.ID, sshActivityName)
151+
wr := ap.Writer(stdin)
154152
_, err := io.Copy(wr, os.Stdin)
155153
if err != nil {
156154
cancel()

internal/activity/pusher.go

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,41 @@
11
package activity
22

33
import (
4-
"context"
5-
"sync/atomic"
64
"time"
75

86
"cdr.dev/coder-cli/internal/entclient"
97
"go.coder.com/flog"
8+
"golang.org/x/time/rate"
109
)
1110

1211
const pushInterval = time.Minute
1312

1413
// Pusher pushes activity metrics no more than once per pushInterval. Pushes
1514
// within the same interval are a no-op.
1615
type Pusher struct {
17-
Source string
18-
EnvID string
19-
Client *entclient.Client
16+
envID string
17+
source string
2018

21-
state int64
19+
client *entclient.Client
20+
rate *rate.Limiter
2221
}
2322

24-
func (p *Pusher) Push() {
25-
if atomic.CompareAndSwapInt64(&p.state, 0, 1) {
26-
err := p.Client.PushActivity(p.Source, p.EnvID)
27-
if err != nil {
28-
flog.Error("failed to push activity: %s", err.Error())
29-
}
23+
func NewPusher(c *entclient.Client, envID, source string) *Pusher {
24+
return &Pusher{
25+
envID: envID,
26+
source: source,
27+
client: c,
28+
rate: rate.NewLimiter(rate.Every(pushInterval), 1),
3029
}
3130
}
3231

33-
// Start starts the reset routine. It resets the state every pushInterval,
34-
// allowing Push to push a new activity.
35-
func (p *Pusher) Start() (end func()) {
36-
ctx, cancel := context.WithCancel(context.Background())
37-
go func() {
38-
tick := time.NewTicker(pushInterval)
39-
defer tick.Stop()
40-
41-
for {
42-
select {
43-
case <-ctx.Done():
44-
return
45-
case <-tick.C:
46-
atomic.StoreInt64(&p.state, 0)
47-
}
48-
49-
}
50-
}()
51-
52-
return func() {
53-
cancel()
32+
func (p *Pusher) Push() {
33+
if !p.rate.Allow() {
34+
return
35+
}
36+
37+
err := p.client.PushActivity(p.source, p.envID)
38+
if err != nil {
39+
flog.Error("push activity: %s", err.Error())
5440
}
5541
}

internal/activity/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ func (w *activityWriter) Write(p []byte) (n int, err error) {
1212
return w.wr.Write(p)
1313
}
1414

15-
func Writer(p *Pusher, wr io.Writer) io.Writer {
15+
func (p *Pusher) Writer(wr io.Writer) io.Writer {
1616
return &activityWriter{p: p, wr: wr}
1717
}

internal/sync/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (s Sync) Run() error {
260260
}
261261
defer notify.Stop(events)
262262

263-
ap := activity.Pusher{Source: activityName, EnvID: s.Env.ID, Client: s.Client}
263+
rp := activity.Pusher{Source: activityName, EnvID: s.Env.ID, Client: s.Client}
264264
defer ap.Start()()
265265
ap.Push()
266266

0 commit comments

Comments
 (0)