diff --git a/cmd/coder/sync.go b/cmd/coder/sync.go index a7285bd0..b28afd08 100644 --- a/cmd/coder/sync.go +++ b/cmd/coder/sync.go @@ -2,6 +2,7 @@ package main import ( "os" + "path/filepath" "strings" "github.com/spf13/pflag" @@ -57,11 +58,16 @@ func (cmd *syncCmd) Run(fl *pflag.FlagSet) { env := findEnv(entClient, envName) + absLocal, err := filepath.Abs(local) + if err != nil { + flog.Fatal("make abs path out of %v: %v", local, absLocal) + } + s := sync.Sync{ Init: cmd.init, Environment: env, RemoteDir: remoteDir, - LocalDir: local, + LocalDir: absLocal, Client: entClient, } for err == nil || err == sync.ErrRestartSync { diff --git a/internal/sync/eventcache.go b/internal/sync/eventcache.go new file mode 100644 index 00000000..4e0d9c60 --- /dev/null +++ b/internal/sync/eventcache.go @@ -0,0 +1,69 @@ +package sync + +import ( + "os" + "time" + + "github.com/rjeczalik/notify" + "go.coder.com/flog" +) + +type timedEvent struct { + CreatedAt time.Time + notify.EventInfo +} + +type eventCache map[string]timedEvent + +func (cache eventCache) Add(ev timedEvent) { + log := flog.New() + log.Prefix = ev.Path() + ": " + lastEvent, ok := cache[ev.Path()] + if ok { + switch { + // If the file was quickly created and then destroyed, pretend nothing ever happened. + case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove: + delete(cache, ev.Path()) + log.Info("ignored Create then Remove") + return + } + log.Info("replaced %s with %s", lastEvent.Event(), ev.Event()) + } + // Only let the latest event for a path have action. + cache[ev.Path()] = ev +} + +// SequentialEvents returns the list of events that pertain to directories. +// The set of returned events is disjoint with ConcurrentEvents. +func (cache eventCache) SequentialEvents() []timedEvent { + var r []timedEvent + for _, ev := range cache { + info, err := os.Stat(ev.Path()) + if err == nil && !info.IsDir() { + continue + } + // Include files that have deleted here. + // It's unclear whether they're files or folders. + r = append(r, ev) + + } + return r +} + +// ConcurrentEvents returns the list of events that are safe to process after SequentialEvents. +// The set of returns events is disjoint with SequentialEvents. +func (cache eventCache) ConcurrentEvents() []timedEvent { + var r []timedEvent + for _, ev := range cache { + info, err := os.Stat(ev.Path()) + if err != nil { + continue + } + if info.IsDir() { + continue + } + r = append(r, ev) + + } + return r +} diff --git a/internal/sync/sync.go b/internal/sync/sync.go index d669751f..5c695260 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -9,12 +9,14 @@ import ( "os/exec" "path" "path/filepath" + "sync" + "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/rjeczalik/notify" "go.coder.com/flog" - "golang.org/x/crypto/ssh/terminal" + "golang.org/x/sync/semaphore" "golang.org/x/xerrors" "cdr.dev/coder-cli/internal/entclient" @@ -24,8 +26,10 @@ import ( // Sync runs a live sync daemon. type Sync struct { // Init sets whether the sync will do the initial init and then return fast. - Init bool - LocalDir string + Init bool + // LocalDir is an absolute path. + LocalDir string + // RemoteDir is an absolute path. RemoteDir string entclient.Environment *entclient.Client @@ -35,13 +39,17 @@ func (s Sync) syncPaths(delete bool, local, remote string) error { self := os.Args[0] args := []string{"-zz", - "-a", "--progress", + "-a", "--delete", "-e", self + " sh", local, s.Environment.Name + ":" + remote, } if delete { args = append([]string{"--delete"}, args...) } + if os.Getenv("DEBUG_RSYNC") != "" { + args = append([]string{"--progress"}, args...) + } + // See https://unix.stackexchange.com/questions/188737/does-compression-option-z-with-rsync-speed-up-backup // on compression level. // (AB): compression sped up the initial sync of the enterprise repo by 30%, leading me to believe it's @@ -82,9 +90,11 @@ func (s Sync) initSync() error { start := time.Now() // Delete old files on initial sync (e.g git checkout). - err := s.syncPaths(true, s.LocalDir, s.RemoteDir) + // Add the "/." to the local directory so rsync doesn't try to place the directory + // into the remote dir. + err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir) if err == nil { - flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond)) + flog.Success("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond)) } return err } @@ -104,6 +114,12 @@ func (s Sync) handleCreate(localPath string) error { target := s.convertPath(localPath) err := s.syncPaths(false, localPath, target) if err != nil { + _, statErr := os.Stat(localPath) + // File was quickly deleted. + if os.IsNotExist(statErr) { + return nil + } + return err } return nil @@ -133,11 +149,10 @@ func (s Sync) handleRename(localPath string) error { return s.handleCreate(localPath) } -func (s Sync) work(ev notify.EventInfo) { +func (s Sync) work(ev timedEvent) { var ( - localPath = ev.Path() - remotePath = s.convertPath(localPath) - err error + localPath = ev.Path() + err error ) switch ev.Event() { case notify.Write, notify.Create: @@ -150,61 +165,137 @@ func (s Sync) work(ev notify.EventInfo) { flog.Info("unhandled event %v %+v", ev.Event(), ev.Path()) } + log := fmt.Sprintf("%v %v (%v)", + ev.Event(), filepath.Base(localPath), time.Since(ev.CreatedAt).Truncate(time.Millisecond*10), + ) if err != nil { - flog.Error("%v: %v -> %v: %v", ev.Event(), localPath, remotePath, err) + flog.Error(log+": %v", err) } else { - flog.Success("%v: %v -> %v", ev.Event(), localPath, remotePath) + flog.Success(log) } } -func setConsoleTitle(title string) { - if !terminal.IsTerminal(int(os.Stdout.Fd())) { - return +var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it") + +// workEventGroup converges a group of events to prevent duplicate work. +func (s Sync) workEventGroup(evs []timedEvent) { + cache := make(eventCache) + for _, ev := range evs { + cache.Add(ev) + } + + // We want to process events concurrently but safely for speed. + // Because the event cache prevents duplicate events for the same file, race conditions of that type + // are impossible. + // What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed + // and then a file is moved to it. AFAIK this dependecy only exists with Directories. + // So, we sequentially process the list of directory Renames and Creates, and then concurrently + // perform all Writes. + for _, ev := range cache.SequentialEvents() { + s.work(ev) } - fmt.Printf("\033]0;%s\007", title) -} -// maxinflightInotify sets the maximum number of inotifies before the sync just restarts. -// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly -// with individual rsyncs. -const maxInflightInotify = 16 + sem := semaphore.NewWeighted(8) -var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it") + var wg sync.WaitGroup + for _, ev := range cache.ConcurrentEvents() { + setConsoleTitle(fmtUpdateTitle(ev.Path())) -func (s Sync) Run() error { - setConsoleTitle("⏳ syncing project") - err := s.initSync() - if err != nil { - return err + wg.Add(1) + sem.Acquire(context.Background(), 1) + ev := ev + go func() { + defer sem.Release(1) + defer wg.Done() + s.work(ev) + }() } - if s.Init { - return nil - } + wg.Wait() +} - // This queue is twice as large as the max in flight so we can check when it's full reliably. - events := make(chan notify.EventInfo, maxInflightInotify*2) +const ( + // maxinflightInotify sets the maximum number of inotifies before the sync just restarts. + // Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly + // with individual rsyncs. + maxInflightInotify = 8 + maxEventDelay = time.Second * 7 + // maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server. + // This sets a lower bound for perceivable latency, but the higher it is, the better the optimization. + maxAcceptableDispatch = time.Millisecond * 50 +) + +// Run starts the sync synchronously. +// Use this command to debug what wasn't sync'd correctly: +// rsync -e "coder sh" -nicr ~/Projects/cdr/coder-cli/. ammar:/home/coder/coder-cli/ +func (s Sync) Run() error { + events := make(chan notify.EventInfo, maxInflightInotify) // Set up a recursive watch. - err = notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All) + // We do this before the initial sync so we can capture any changes that may have happened during sync. + err := notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All) if err != nil { return xerrors.Errorf("create watch: %w", err) } defer notify.Stop(events) + setConsoleTitle("⏳ syncing project") + err = s.initSync() + if err != nil { + return err + } + + if s.Init { + return nil + } - const watchingFilesystemTitle = "🛰 watching filesystem" - setConsoleTitle(watchingFilesystemTitle) flog.Info("watching %s for changes", s.LocalDir) - for ev := range events { - if len(events) > maxInflightInotify { - return ErrRestartSync + + var droppedEvents uint64 + // Timed events lets us track how long each individual file takes to update. + timedEvents := make(chan timedEvent, cap(events)) + go func() { + defer close(timedEvents) + for event := range events { + select { + case timedEvents <- timedEvent{ + CreatedAt: time.Now(), + EventInfo: event, + }: + default: + if atomic.AddUint64(&droppedEvents, 1) == 1 { + flog.Info("dropped event, sync should restart soon") + } + } } + }() - setConsoleTitle("🚀 updating " + filepath.Base(ev.Path())) - s.work(ev) + var ( + eventGroup []timedEvent + dispatchEventGroup = time.NewTicker(maxAcceptableDispatch) + ) + defer dispatchEventGroup.Stop() + for { + const watchingFilesystemTitle = "🛰 watching filesystem" setConsoleTitle(watchingFilesystemTitle) - } - return nil + select { + case ev := <-timedEvents: + if atomic.LoadUint64(&droppedEvents) > 0 { + return ErrRestartSync + } + + eventGroup = append(eventGroup, ev) + case <-dispatchEventGroup.C: + if len(eventGroup) == 0 { + continue + } + // We're too backlogged and should restart the sync. + if time.Since(eventGroup[0].CreatedAt) > maxEventDelay { + return ErrRestartSync + } + s.workEventGroup(eventGroup) + eventGroup = eventGroup[:0] + } + } } diff --git a/internal/sync/title.go b/internal/sync/title.go new file mode 100644 index 00000000..20164644 --- /dev/null +++ b/internal/sync/title.go @@ -0,0 +1,21 @@ +package sync + +import ( + "fmt" + "os" + "path/filepath" + + "golang.org/x/crypto/ssh/terminal" +) + +func setConsoleTitle(title string) { + if !terminal.IsTerminal(int(os.Stdout.Fd())) { + return + } + fmt.Printf("\033]0;%s\007", title) +} + + +func fmtUpdateTitle(path string) string { + return "🚀 updating " + filepath.Base(path) +} \ No newline at end of file