Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit 209ec24

Browse files
authored
Merge pull request #8 from cdr/speedup
Speed up event processing
2 parents 87bbc0b + b0c4bc3 commit 209ec24

File tree

4 files changed

+230
-43
lines changed

4 files changed

+230
-43
lines changed

cmd/coder/sync.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"os"
5+
"path/filepath"
56
"strings"
67

78
"github.com/spf13/pflag"
@@ -57,11 +58,16 @@ func (cmd *syncCmd) Run(fl *pflag.FlagSet) {
5758

5859
env := findEnv(entClient, envName)
5960

61+
absLocal, err := filepath.Abs(local)
62+
if err != nil {
63+
flog.Fatal("make abs path out of %v: %v", local, absLocal)
64+
}
65+
6066
s := sync.Sync{
6167
Init: cmd.init,
6268
Environment: env,
6369
RemoteDir: remoteDir,
64-
LocalDir: local,
70+
LocalDir: absLocal,
6571
Client: entClient,
6672
}
6773
for err == nil || err == sync.ErrRestartSync {

internal/sync/eventcache.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package sync
2+
3+
import (
4+
"os"
5+
"time"
6+
7+
"github.com/rjeczalik/notify"
8+
"go.coder.com/flog"
9+
)
10+
11+
type timedEvent struct {
12+
CreatedAt time.Time
13+
notify.EventInfo
14+
}
15+
16+
type eventCache map[string]timedEvent
17+
18+
func (cache eventCache) Add(ev timedEvent) {
19+
log := flog.New()
20+
log.Prefix = ev.Path() + ": "
21+
lastEvent, ok := cache[ev.Path()]
22+
if ok {
23+
switch {
24+
// If the file was quickly created and then destroyed, pretend nothing ever happened.
25+
case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove:
26+
delete(cache, ev.Path())
27+
log.Info("ignored Create then Remove")
28+
return
29+
}
30+
log.Info("replaced %s with %s", lastEvent.Event(), ev.Event())
31+
}
32+
// Only let the latest event for a path have action.
33+
cache[ev.Path()] = ev
34+
}
35+
36+
// SequentialEvents returns the list of events that pertain to directories.
37+
// The set of returned events is disjoint with ConcurrentEvents.
38+
func (cache eventCache) SequentialEvents() []timedEvent {
39+
var r []timedEvent
40+
for _, ev := range cache {
41+
info, err := os.Stat(ev.Path())
42+
if err == nil && !info.IsDir() {
43+
continue
44+
}
45+
// Include files that have deleted here.
46+
// It's unclear whether they're files or folders.
47+
r = append(r, ev)
48+
49+
}
50+
return r
51+
}
52+
53+
// ConcurrentEvents returns the list of events that are safe to process after SequentialEvents.
54+
// The set of returns events is disjoint with SequentialEvents.
55+
func (cache eventCache) ConcurrentEvents() []timedEvent {
56+
var r []timedEvent
57+
for _, ev := range cache {
58+
info, err := os.Stat(ev.Path())
59+
if err != nil {
60+
continue
61+
}
62+
if info.IsDir() {
63+
continue
64+
}
65+
r = append(r, ev)
66+
67+
}
68+
return r
69+
}

internal/sync/sync.go

Lines changed: 133 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ import (
99
"os/exec"
1010
"path"
1111
"path/filepath"
12+
"sync"
13+
"sync/atomic"
1214
"time"
1315

1416
"github.com/gorilla/websocket"
1517
"github.com/rjeczalik/notify"
1618
"go.coder.com/flog"
17-
"golang.org/x/crypto/ssh/terminal"
19+
"golang.org/x/sync/semaphore"
1820
"golang.org/x/xerrors"
1921

2022
"cdr.dev/coder-cli/internal/entclient"
@@ -24,8 +26,10 @@ import (
2426
// Sync runs a live sync daemon.
2527
type Sync struct {
2628
// Init sets whether the sync will do the initial init and then return fast.
27-
Init bool
28-
LocalDir string
29+
Init bool
30+
// LocalDir is an absolute path.
31+
LocalDir string
32+
// RemoteDir is an absolute path.
2933
RemoteDir string
3034
entclient.Environment
3135
*entclient.Client
@@ -35,13 +39,17 @@ func (s Sync) syncPaths(delete bool, local, remote string) error {
3539
self := os.Args[0]
3640

3741
args := []string{"-zz",
38-
"-a", "--progress",
42+
"-a",
3943
"--delete",
4044
"-e", self + " sh", local, s.Environment.Name + ":" + remote,
4145
}
4246
if delete {
4347
args = append([]string{"--delete"}, args...)
4448
}
49+
if os.Getenv("DEBUG_RSYNC") != "" {
50+
args = append([]string{"--progress"}, args...)
51+
}
52+
4553
// See https://unix.stackexchange.com/questions/188737/does-compression-option-z-with-rsync-speed-up-backup
4654
// on compression level.
4755
// (AB): compression sped up the initial sync of the enterprise repo by 30%, leading me to believe it's
@@ -82,9 +90,11 @@ func (s Sync) initSync() error {
8290

8391
start := time.Now()
8492
// Delete old files on initial sync (e.g git checkout).
85-
err := s.syncPaths(true, s.LocalDir, s.RemoteDir)
93+
// Add the "/." to the local directory so rsync doesn't try to place the directory
94+
// into the remote dir.
95+
err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir)
8696
if err == nil {
87-
flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond))
97+
flog.Success("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond))
8898
}
8999
return err
90100
}
@@ -104,6 +114,12 @@ func (s Sync) handleCreate(localPath string) error {
104114
target := s.convertPath(localPath)
105115
err := s.syncPaths(false, localPath, target)
106116
if err != nil {
117+
_, statErr := os.Stat(localPath)
118+
// File was quickly deleted.
119+
if os.IsNotExist(statErr) {
120+
return nil
121+
}
122+
107123
return err
108124
}
109125
return nil
@@ -133,11 +149,10 @@ func (s Sync) handleRename(localPath string) error {
133149
return s.handleCreate(localPath)
134150
}
135151

136-
func (s Sync) work(ev notify.EventInfo) {
152+
func (s Sync) work(ev timedEvent) {
137153
var (
138-
localPath = ev.Path()
139-
remotePath = s.convertPath(localPath)
140-
err error
154+
localPath = ev.Path()
155+
err error
141156
)
142157
switch ev.Event() {
143158
case notify.Write, notify.Create:
@@ -150,61 +165,137 @@ func (s Sync) work(ev notify.EventInfo) {
150165
flog.Info("unhandled event %v %+v", ev.Event(), ev.Path())
151166
}
152167

168+
log := fmt.Sprintf("%v %v (%v)",
169+
ev.Event(), filepath.Base(localPath), time.Since(ev.CreatedAt).Truncate(time.Millisecond*10),
170+
)
153171
if err != nil {
154-
flog.Error("%v: %v -> %v: %v", ev.Event(), localPath, remotePath, err)
172+
flog.Error(log+": %v", err)
155173
} else {
156-
flog.Success("%v: %v -> %v", ev.Event(), localPath, remotePath)
174+
flog.Success(log)
157175
}
158176
}
159177

160-
func setConsoleTitle(title string) {
161-
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
162-
return
178+
var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it")
179+
180+
// workEventGroup converges a group of events to prevent duplicate work.
181+
func (s Sync) workEventGroup(evs []timedEvent) {
182+
cache := make(eventCache)
183+
for _, ev := range evs {
184+
cache.Add(ev)
185+
}
186+
187+
// We want to process events concurrently but safely for speed.
188+
// Because the event cache prevents duplicate events for the same file, race conditions of that type
189+
// are impossible.
190+
// What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed
191+
// and then a file is moved to it. AFAIK this dependecy only exists with Directories.
192+
// So, we sequentially process the list of directory Renames and Creates, and then concurrently
193+
// perform all Writes.
194+
for _, ev := range cache.SequentialEvents() {
195+
s.work(ev)
163196
}
164-
fmt.Printf("\033]0;%s\007", title)
165-
}
166197

167-
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
168-
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
169-
// with individual rsyncs.
170-
const maxInflightInotify = 16
198+
sem := semaphore.NewWeighted(8)
171199

172-
var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it")
200+
var wg sync.WaitGroup
201+
for _, ev := range cache.ConcurrentEvents() {
202+
setConsoleTitle(fmtUpdateTitle(ev.Path()))
173203

174-
func (s Sync) Run() error {
175-
setConsoleTitle("⏳ syncing project")
176-
err := s.initSync()
177-
if err != nil {
178-
return err
204+
wg.Add(1)
205+
sem.Acquire(context.Background(), 1)
206+
ev := ev
207+
go func() {
208+
defer sem.Release(1)
209+
defer wg.Done()
210+
s.work(ev)
211+
}()
179212
}
180213

181-
if s.Init {
182-
return nil
183-
}
214+
wg.Wait()
215+
}
184216

185-
// This queue is twice as large as the max in flight so we can check when it's full reliably.
186-
events := make(chan notify.EventInfo, maxInflightInotify*2)
217+
const (
218+
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
219+
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
220+
// with individual rsyncs.
221+
maxInflightInotify = 8
222+
maxEventDelay = time.Second * 7
223+
// maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server.
224+
// This sets a lower bound for perceivable latency, but the higher it is, the better the optimization.
225+
maxAcceptableDispatch = time.Millisecond * 50
226+
)
227+
228+
// Run starts the sync synchronously.
229+
// Use this command to debug what wasn't sync'd correctly:
230+
// rsync -e "coder sh" -nicr ~/Projects/cdr/coder-cli/. ammar:/home/coder/coder-cli/
231+
func (s Sync) Run() error {
232+
events := make(chan notify.EventInfo, maxInflightInotify)
187233
// Set up a recursive watch.
188-
err = notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All)
234+
// We do this before the initial sync so we can capture any changes that may have happened during sync.
235+
err := notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All)
189236
if err != nil {
190237
return xerrors.Errorf("create watch: %w", err)
191238
}
192239
defer notify.Stop(events)
193240

241+
setConsoleTitle("⏳ syncing project")
242+
err = s.initSync()
243+
if err != nil {
244+
return err
245+
}
246+
247+
if s.Init {
248+
return nil
249+
}
194250

195-
const watchingFilesystemTitle = "🛰 watching filesystem"
196-
setConsoleTitle(watchingFilesystemTitle)
197251

198252
flog.Info("watching %s for changes", s.LocalDir)
199-
for ev := range events {
200-
if len(events) > maxInflightInotify {
201-
return ErrRestartSync
253+
254+
var droppedEvents uint64
255+
// Timed events lets us track how long each individual file takes to update.
256+
timedEvents := make(chan timedEvent, cap(events))
257+
go func() {
258+
defer close(timedEvents)
259+
for event := range events {
260+
select {
261+
case timedEvents <- timedEvent{
262+
CreatedAt: time.Now(),
263+
EventInfo: event,
264+
}:
265+
default:
266+
if atomic.AddUint64(&droppedEvents, 1) == 1 {
267+
flog.Info("dropped event, sync should restart soon")
268+
}
269+
}
202270
}
271+
}()
203272

204-
setConsoleTitle("🚀 updating " + filepath.Base(ev.Path()))
205-
s.work(ev)
273+
var (
274+
eventGroup []timedEvent
275+
dispatchEventGroup = time.NewTicker(maxAcceptableDispatch)
276+
)
277+
defer dispatchEventGroup.Stop()
278+
for {
279+
const watchingFilesystemTitle = "🛰 watching filesystem"
206280
setConsoleTitle(watchingFilesystemTitle)
207-
}
208281

209-
return nil
282+
select {
283+
case ev := <-timedEvents:
284+
if atomic.LoadUint64(&droppedEvents) > 0 {
285+
return ErrRestartSync
286+
}
287+
288+
eventGroup = append(eventGroup, ev)
289+
case <-dispatchEventGroup.C:
290+
if len(eventGroup) == 0 {
291+
continue
292+
}
293+
// We're too backlogged and should restart the sync.
294+
if time.Since(eventGroup[0].CreatedAt) > maxEventDelay {
295+
return ErrRestartSync
296+
}
297+
s.workEventGroup(eventGroup)
298+
eventGroup = eventGroup[:0]
299+
}
300+
}
210301
}

internal/sync/title.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package sync
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
8+
"golang.org/x/crypto/ssh/terminal"
9+
)
10+
11+
func setConsoleTitle(title string) {
12+
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
13+
return
14+
}
15+
fmt.Printf("\033]0;%s\007", title)
16+
}
17+
18+
19+
func fmtUpdateTitle(path string) string {
20+
return "🚀 updating " + filepath.Base(path)
21+
}

0 commit comments

Comments
 (0)