From bb101e08c698487973ce2ca9dce05eebac4a598d Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 13:19:40 -0500 Subject: [PATCH 01/12] Speed up event processing by introducing duplicate cache --- cmd/coder/sync.go | 8 ++- internal/sync/sync.go | 118 ++++++++++++++++++++++++++++-------- internal/sync/timedevent.go | 12 ++++ 3 files changed, 113 insertions(+), 25 deletions(-) create mode 100644 internal/sync/timedevent.go diff --git a/cmd/coder/sync.go b/cmd/coder/sync.go index a7285bd0..b28afd08 100644 --- a/cmd/coder/sync.go +++ b/cmd/coder/sync.go @@ -2,6 +2,7 @@ package main import ( "os" + "path/filepath" "strings" "github.com/spf13/pflag" @@ -57,11 +58,16 @@ func (cmd *syncCmd) Run(fl *pflag.FlagSet) { env := findEnv(entClient, envName) + absLocal, err := filepath.Abs(local) + if err != nil { + flog.Fatal("make abs path out of %v: %v", local, absLocal) + } + s := sync.Sync{ Init: cmd.init, Environment: env, RemoteDir: remoteDir, - LocalDir: local, + LocalDir: absLocal, Client: entClient, } for err == nil || err == sync.ErrRestartSync { diff --git a/internal/sync/sync.go b/internal/sync/sync.go index d669751f..b28665d3 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -24,8 +24,10 @@ import ( // Sync runs a live sync daemon. type Sync struct { // Init sets whether the sync will do the initial init and then return fast. - Init bool - LocalDir string + Init bool + // LocalDir is an absolute path. + LocalDir string + // RemoteDir is an absolute path. RemoteDir string entclient.Environment *entclient.Client @@ -104,6 +106,12 @@ func (s Sync) handleCreate(localPath string) error { target := s.convertPath(localPath) err := s.syncPaths(false, localPath, target) if err != nil { + _, statErr := os.Stat(localPath) + // File was quickly deleted. + if os.IsNotExist(statErr) { + return nil + } + return err } return nil @@ -133,11 +141,10 @@ func (s Sync) handleRename(localPath string) error { return s.handleCreate(localPath) } -func (s Sync) work(ev notify.EventInfo) { +func (s Sync) work(ev timedEvent) { var ( - localPath = ev.Path() - remotePath = s.convertPath(localPath) - err error + localPath = ev.Path() + err error ) switch ev.Event() { case notify.Write, notify.Create: @@ -150,27 +157,63 @@ func (s Sync) work(ev notify.EventInfo) { flog.Info("unhandled event %v %+v", ev.Event(), ev.Path()) } + log := fmt.Sprintf("%v %v (%v)", + ev.Event(), filepath.Base(localPath), time.Since(ev.CreatedAt).Truncate(time.Millisecond*10), + ) if err != nil { - flog.Error("%v: %v -> %v: %v", ev.Event(), localPath, remotePath, err) + flog.Error(log+": %v", err) } else { - flog.Success("%v: %v -> %v", ev.Event(), localPath, remotePath) + flog.Success(log) } } func setConsoleTitle(title string) { if !terminal.IsTerminal(int(os.Stdout.Fd())) { - return + return } fmt.Printf("\033]0;%s\007", title) } -// maxinflightInotify sets the maximum number of inotifies before the sync just restarts. -// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly -// with individual rsyncs. -const maxInflightInotify = 16 var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it") +// workEventGroup converges a group of events to prevent duplicate work. +func (s Sync) workEventGroup(evs []timedEvent) { + cache := map[string]timedEvent{} + for _, ev := range evs { + log := flog.New() + log.Prefix = ev.Path() + ": " + lastEvent, ok := cache[ev.Path()] + if ok { + switch { + // If the file was quickly created and then destroyed, pretend nothing ever happened. + case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove: + delete(cache, ev.Path()) + log.Info("ignored Create then Remove") + continue + } + } + if ok { + log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event()) + } + // Only let the latest event for a path have action. + cache[ev.Path()] = ev + } + for _, ev := range cache { + setConsoleTitle("🚀 updating " + filepath.Base(ev.Path())) + s.work(ev) + } +} + + +const ( + // maxinflightInotify sets the maximum number of inotifies before the sync just restarts. + // Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly + // with individual rsyncs. + maxInflightInotify = 8 + maxEventDelay = time.Second * 7 +) + func (s Sync) Run() error { setConsoleTitle("⏳ syncing project") err := s.initSync() @@ -191,20 +234,47 @@ func (s Sync) Run() error { } defer notify.Stop(events) - - const watchingFilesystemTitle = "🛰 watching filesystem" - setConsoleTitle(watchingFilesystemTitle) - flog.Info("watching %s for changes", s.LocalDir) - for ev := range events { - if len(events) > maxInflightInotify { - return ErrRestartSync + + // Timed events lets us track how long each individual file takes to update. + timedEvents := make(chan timedEvent, cap(events)) + go func() { + defer close(timedEvents) + for event := range events { + timedEvents <- timedEvent{ + CreatedAt: time.Now(), + EventInfo: event, + } } + }() - setConsoleTitle("🚀 updating " + filepath.Base(ev.Path())) - s.work(ev) + var ( + eventGroup []timedEvent + dispatchEventGroup = time.NewTicker(time.Millisecond * 10) + ) + defer dispatchEventGroup.Stop() + for { + const watchingFilesystemTitle = "🛰 watching filesystem" setConsoleTitle(watchingFilesystemTitle) - } - return nil + select { + case ev := <-timedEvents: + if len(events) > maxInflightInotify { + return ErrRestartSync + } + + eventGroup = append(eventGroup, ev) + case <-dispatchEventGroup.C: + if len(eventGroup) == 0 { + continue + } + // We're too backlogged and should restart the sync. + if time.Since(eventGroup[0].CreatedAt) > maxEventDelay { + return ErrRestartSync + } + + s.workEventGroup(eventGroup) + eventGroup = eventGroup[:0] + } + } } diff --git a/internal/sync/timedevent.go b/internal/sync/timedevent.go new file mode 100644 index 00000000..4d896107 --- /dev/null +++ b/internal/sync/timedevent.go @@ -0,0 +1,12 @@ +package sync + +import ( + "time" + + "github.com/rjeczalik/notify" +) + +type timedEvent struct { + CreatedAt time.Time + notify.EventInfo +} From 22887355bc4dfae880a4596348c74ab5e12b9764 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 13:45:52 -0500 Subject: [PATCH 02/12] Process safe events concurrently --- internal/sync/eventcache.go | 72 +++++++++++++++++++++++++++++++++++++ internal/sync/sync.go | 62 ++++++++++++++++---------------- internal/sync/timedevent.go | 12 ------- internal/sync/title.go | 21 +++++++++++ 4 files changed, 124 insertions(+), 43 deletions(-) create mode 100644 internal/sync/eventcache.go delete mode 100644 internal/sync/timedevent.go create mode 100644 internal/sync/title.go diff --git a/internal/sync/eventcache.go b/internal/sync/eventcache.go new file mode 100644 index 00000000..921b2a48 --- /dev/null +++ b/internal/sync/eventcache.go @@ -0,0 +1,72 @@ +package sync + +import ( + "os" + "time" + + "github.com/rjeczalik/notify" + "go.coder.com/flog" +) + +type timedEvent struct { + CreatedAt time.Time + notify.EventInfo +} + +type eventCache map[string]timedEvent + +func (cache eventCache) Add(ev timedEvent) { + log := flog.New() + log.Prefix = ev.Path() + ": " + lastEvent, ok := cache[ev.Path()] + if ok { + switch { + // If the file was quickly created and then destroyed, pretend nothing ever happened. + case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove: + delete(cache, ev.Path()) + log.Info("ignored Create then Remove") + return + } + } + if ok { + log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event()) + } + // Only let the latest event for a path have action. + cache[ev.Path()] = ev +} + +// DirectoryEvents returns the list of events that pertain to directories. +// The set of returns events is disjoint with FileEvents. +func (cache eventCache) DirectoryEvents() []timedEvent { + var r []timedEvent + for _, ev := range cache { + info, err := os.Stat(ev.Path()) + if err != nil { + continue + } + if !info.IsDir() { + continue + } + r = append(r, ev) + + } + return r +} + +// FileEvents returns the list of events that pertain to files. +// The set of returns events is disjoint with DirectoryEvents. +func (cache eventCache) FileEvents() []timedEvent { + var r []timedEvent + for _, ev := range cache { + info, err := os.Stat(ev.Path()) + if err != nil { + continue + } + if info.IsDir() { + continue + } + r = append(r, ev) + + } + return r +} diff --git a/internal/sync/sync.go b/internal/sync/sync.go index b28665d3..d7ebe16b 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -9,12 +9,12 @@ import ( "os/exec" "path" "path/filepath" + "sync" "time" "github.com/gorilla/websocket" "github.com/rjeczalik/notify" "go.coder.com/flog" - "golang.org/x/crypto/ssh/terminal" "golang.org/x/xerrors" "cdr.dev/coder-cli/internal/entclient" @@ -167,51 +167,51 @@ func (s Sync) work(ev timedEvent) { } } -func setConsoleTitle(title string) { - if !terminal.IsTerminal(int(os.Stdout.Fd())) { - return - } - fmt.Printf("\033]0;%s\007", title) -} - var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it") // workEventGroup converges a group of events to prevent duplicate work. func (s Sync) workEventGroup(evs []timedEvent) { - cache := map[string]timedEvent{} + cache := make(eventCache) for _, ev := range evs { - log := flog.New() - log.Prefix = ev.Path() + ": " - lastEvent, ok := cache[ev.Path()] - if ok { - switch { - // If the file was quickly created and then destroyed, pretend nothing ever happened. - case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove: - delete(cache, ev.Path()) - log.Info("ignored Create then Remove") - continue - } - } - if ok { - log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event()) - } - // Only let the latest event for a path have action. - cache[ev.Path()] = ev + cache.Add(ev) } - for _, ev := range cache { - setConsoleTitle("🚀 updating " + filepath.Base(ev.Path())) + + // We want to process events concurrently but safely for speed. + // Because the event cache prevents duplicate events for the same file, race conditions of that type + // are impossible. + // What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed + // and then a file is moved to it. AFAIK this dependecy only exists with Directories. + // So, we sequentially process the list of directory Renames and Creates, and then concurrently + // perform all Writes. + for _, ev := range cache.DirectoryEvents() { s.work(ev) } -} + var wg sync.WaitGroup + for _, ev := range cache.FileEvents() { + setConsoleTitle(fmtUpdateTitle(ev.Path())) + + wg.Add(1) + ev := ev + go func() { + defer wg.Done() + s.work(ev) + }() + } + + wg.Wait() +} const ( // maxinflightInotify sets the maximum number of inotifies before the sync just restarts. // Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly // with individual rsyncs. maxInflightInotify = 8 - maxEventDelay = time.Second * 7 + maxEventDelay = time.Second * 7 + // maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server. + // This sets a lower bound for perceivable latency, but the higher it is, the better the optimization. + maxAcceptableDispatch = time.Millisecond * 50 ) func (s Sync) Run() error { @@ -250,7 +250,7 @@ func (s Sync) Run() error { var ( eventGroup []timedEvent - dispatchEventGroup = time.NewTicker(time.Millisecond * 10) + dispatchEventGroup = time.NewTicker(maxAcceptableDispatch) ) defer dispatchEventGroup.Stop() for { diff --git a/internal/sync/timedevent.go b/internal/sync/timedevent.go deleted file mode 100644 index 4d896107..00000000 --- a/internal/sync/timedevent.go +++ /dev/null @@ -1,12 +0,0 @@ -package sync - -import ( - "time" - - "github.com/rjeczalik/notify" -) - -type timedEvent struct { - CreatedAt time.Time - notify.EventInfo -} diff --git a/internal/sync/title.go b/internal/sync/title.go new file mode 100644 index 00000000..20164644 --- /dev/null +++ b/internal/sync/title.go @@ -0,0 +1,21 @@ +package sync + +import ( + "fmt" + "os" + "path/filepath" + + "golang.org/x/crypto/ssh/terminal" +) + +func setConsoleTitle(title string) { + if !terminal.IsTerminal(int(os.Stdout.Fd())) { + return + } + fmt.Printf("\033]0;%s\007", title) +} + + +func fmtUpdateTitle(path string) string { + return "🚀 updating " + filepath.Base(path) +} \ No newline at end of file From ba23ea545eaadd970f3ea24bf8997581e233e296 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 13:52:13 -0500 Subject: [PATCH 03/12] Remove single file support --- internal/sync/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index d7ebe16b..2f62fd1f 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -84,7 +84,7 @@ func (s Sync) initSync() error { start := time.Now() // Delete old files on initial sync (e.g git checkout). - err := s.syncPaths(true, s.LocalDir, s.RemoteDir) + err := s.syncPaths(true, s.LocalDir + "/.", s.RemoteDir) if err == nil { flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond)) } From c2ca1ea11aa529ce1e95ba1af58fe3c3d0bb7537 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 13:55:14 -0500 Subject: [PATCH 04/12] Improve replace message --- internal/sync/eventcache.go | 2 +- internal/sync/sync.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/sync/eventcache.go b/internal/sync/eventcache.go index 921b2a48..1c614fbe 100644 --- a/internal/sync/eventcache.go +++ b/internal/sync/eventcache.go @@ -29,7 +29,7 @@ func (cache eventCache) Add(ev timedEvent) { } } if ok { - log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event()) + log.Info("replaced %s with %s", lastEvent.Event(), ev.Event()) } // Only let the latest event for a path have action. cache[ev.Path()] = ev diff --git a/internal/sync/sync.go b/internal/sync/sync.go index 2f62fd1f..5683b741 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -272,7 +272,6 @@ func (s Sync) Run() error { if time.Since(eventGroup[0].CreatedAt) > maxEventDelay { return ErrRestartSync } - s.workEventGroup(eventGroup) eventGroup = eventGroup[:0] } From ed23f647d4da465dbb1311672a6e65680c3aa866 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 14:52:23 -0500 Subject: [PATCH 05/12] Document test command --- internal/sync/sync.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index 5683b741..be8c33ae 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -214,6 +214,9 @@ const ( maxAcceptableDispatch = time.Millisecond * 50 ) +// Run starts the sync synchronously. +// Use this command to debug what wasn't sync'd correctly: +// rsync -e "coder sh" -nicr ~/Projects/cdr/coder-cli/. ammar:/home/coder/coder-cli/ func (s Sync) Run() error { setConsoleTitle("⏳ syncing project") err := s.initSync() From c305b9dbca5934d38f2b1fa4e327c7ff7fa76be1 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 15:18:24 -0500 Subject: [PATCH 06/12] Improve dropped event calculation --- internal/sync/sync.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index be8c33ae..a28f21b8 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "sync" + "sync/atomic" "time" "github.com/gorilla/websocket" @@ -84,7 +85,7 @@ func (s Sync) initSync() error { start := time.Now() // Delete old files on initial sync (e.g git checkout). - err := s.syncPaths(true, s.LocalDir + "/.", s.RemoteDir) + err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir) if err == nil { flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond)) } @@ -167,7 +168,6 @@ func (s Sync) work(ev timedEvent) { } } - var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it") // workEventGroup converges a group of events to prevent duplicate work. @@ -239,14 +239,21 @@ func (s Sync) Run() error { flog.Info("watching %s for changes", s.LocalDir) + var droppedEvents uint64 // Timed events lets us track how long each individual file takes to update. timedEvents := make(chan timedEvent, cap(events)) go func() { defer close(timedEvents) for event := range events { - timedEvents <- timedEvent{ + select { + case timedEvents <- timedEvent{ CreatedAt: time.Now(), EventInfo: event, + }: + default: + if atomic.AddUint64(&droppedEvents, 1) == 1 { + flog.Info("dropped event, sync should restart soon") + } } } }() @@ -262,7 +269,7 @@ func (s Sync) Run() error { select { case ev := <-timedEvents: - if len(events) > maxInflightInotify { + if atomic.LoadUint64(&droppedEvents) > 0 { return ErrRestartSync } From 4e31fe9b1a8f044c9f848da2394527788300bff8 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 15:22:08 -0500 Subject: [PATCH 07/12] Add DEBUG_RSYNC env --- internal/sync/sync.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index a28f21b8..adc440c6 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -38,13 +38,17 @@ func (s Sync) syncPaths(delete bool, local, remote string) error { self := os.Args[0] args := []string{"-zz", - "-a", "--progress", + "-a", "--delete", "-e", self + " sh", local, s.Environment.Name + ":" + remote, } if delete { args = append([]string{"--delete"}, args...) } + if os.Getenv("DEBUG_RSYNC") != "" { + args = append([]string{"--progress"}, args...) + } + // See https://unix.stackexchange.com/questions/188737/does-compression-option-z-with-rsync-speed-up-backup // on compression level. // (AB): compression sped up the initial sync of the enterprise repo by 30%, leading me to believe it's @@ -228,8 +232,7 @@ func (s Sync) Run() error { return nil } - // This queue is twice as large as the max in flight so we can check when it's full reliably. - events := make(chan notify.EventInfo, maxInflightInotify*2) + events := make(chan notify.EventInfo, maxInflightInotify) // Set up a recursive watch. err = notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All) if err != nil { From a675d5ec3743ee612230a1d06018762b4270b9fb Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 15:32:24 -0500 Subject: [PATCH 08/12] Create watch before initial sync Resolves #7 --- internal/sync/sync.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index adc440c6..2205299b 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -222,8 +222,17 @@ const ( // Use this command to debug what wasn't sync'd correctly: // rsync -e "coder sh" -nicr ~/Projects/cdr/coder-cli/. ammar:/home/coder/coder-cli/ func (s Sync) Run() error { + events := make(chan notify.EventInfo, maxInflightInotify) + // Set up a recursive watch. + // We do this before the initial sync so we can capture any changes that may have happened during sync. + err := notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All) + if err != nil { + return xerrors.Errorf("create watch: %w", err) + } + defer notify.Stop(events) + setConsoleTitle("⏳ syncing project") - err := s.initSync() + err = s.initSync() if err != nil { return err } @@ -232,13 +241,6 @@ func (s Sync) Run() error { return nil } - events := make(chan notify.EventInfo, maxInflightInotify) - // Set up a recursive watch. - err = notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All) - if err != nil { - return xerrors.Errorf("create watch: %w", err) - } - defer notify.Stop(events) flog.Info("watching %s for changes", s.LocalDir) From 423278b43f79f267067e30fc4b13ff6bf3be3876 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 15:46:21 -0500 Subject: [PATCH 09/12] Support file deletion again --- internal/sync/eventcache.go | 21 ++++++++++----------- internal/sync/sync.go | 4 ++-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/internal/sync/eventcache.go b/internal/sync/eventcache.go index 1c614fbe..7128802a 100644 --- a/internal/sync/eventcache.go +++ b/internal/sync/eventcache.go @@ -35,34 +35,33 @@ func (cache eventCache) Add(ev timedEvent) { cache[ev.Path()] = ev } -// DirectoryEvents returns the list of events that pertain to directories. -// The set of returns events is disjoint with FileEvents. -func (cache eventCache) DirectoryEvents() []timedEvent { +// SequentialEvents returns the list of events that pertain to directories. +// The set of returned events is disjoint with ConcurrentEvents. +func (cache eventCache) SequentialEvents() []timedEvent { var r []timedEvent for _, ev := range cache { info, err := os.Stat(ev.Path()) - if err != nil { - continue - } - if !info.IsDir() { + if err == nil && !info.IsDir() { continue } + // Include files that have deleted here. + // It's unclear whether they're files or folders. r = append(r, ev) } return r } -// FileEvents returns the list of events that pertain to files. -// The set of returns events is disjoint with DirectoryEvents. -func (cache eventCache) FileEvents() []timedEvent { +// ConcurrentEvents returns the list of events that are safe to process after SequentialEvents. +// The set of returns events is disjoint with SequentialEvents. +func (cache eventCache) ConcurrentEvents() []timedEvent { var r []timedEvent for _, ev := range cache { info, err := os.Stat(ev.Path()) if err != nil { continue } - if info.IsDir() { + if info.IsDir() { continue } r = append(r, ev) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index 2205299b..c4f6fee4 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -188,12 +188,12 @@ func (s Sync) workEventGroup(evs []timedEvent) { // and then a file is moved to it. AFAIK this dependecy only exists with Directories. // So, we sequentially process the list of directory Renames and Creates, and then concurrently // perform all Writes. - for _, ev := range cache.DirectoryEvents() { + for _, ev := range cache.SequentialEvents() { s.work(ev) } var wg sync.WaitGroup - for _, ev := range cache.FileEvents() { + for _, ev := range cache.ConcurrentEvents() { setConsoleTitle(fmtUpdateTitle(ev.Path())) wg.Add(1) From 65be003dafbcd3f06f7a11050e287ecdd7929976 Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 15:55:02 -0500 Subject: [PATCH 10/12] Add semaphore to pushing Can't have too many rsyncs flying around. --- internal/sync/sync.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index c4f6fee4..81d7f6df 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -16,6 +16,7 @@ import ( "github.com/gorilla/websocket" "github.com/rjeczalik/notify" "go.coder.com/flog" + "golang.org/x/sync/semaphore" "golang.org/x/xerrors" "cdr.dev/coder-cli/internal/entclient" @@ -192,13 +193,17 @@ func (s Sync) workEventGroup(evs []timedEvent) { s.work(ev) } + sem := semaphore.NewWeighted(8) + var wg sync.WaitGroup for _, ev := range cache.ConcurrentEvents() { setConsoleTitle(fmtUpdateTitle(ev.Path())) wg.Add(1) + sem.Acquire(context.Background(), 1) ev := ev go func() { + defer sem.Release(1) defer wg.Done() s.work(ev) }() From 4905bbd516ded7435c5732e76843dad9240a85fe Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 16:21:28 -0500 Subject: [PATCH 11/12] Change initial sync to Success level --- internal/sync/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sync/sync.go b/internal/sync/sync.go index 81d7f6df..91506b06 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -92,7 +92,7 @@ func (s Sync) initSync() error { // Delete old files on initial sync (e.g git checkout). err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir) if err == nil { - flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond)) + flog.Success("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond)) } return err } From b0c4bc36083b6057a58522221998cea94b4f57ae Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Fri, 24 Apr 2020 16:27:48 -0500 Subject: [PATCH 12/12] Comment on dir translation --- internal/sync/eventcache.go | 2 -- internal/sync/sync.go | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/sync/eventcache.go b/internal/sync/eventcache.go index 7128802a..4e0d9c60 100644 --- a/internal/sync/eventcache.go +++ b/internal/sync/eventcache.go @@ -27,8 +27,6 @@ func (cache eventCache) Add(ev timedEvent) { log.Info("ignored Create then Remove") return } - } - if ok { log.Info("replaced %s with %s", lastEvent.Event(), ev.Event()) } // Only let the latest event for a path have action. diff --git a/internal/sync/sync.go b/internal/sync/sync.go index 91506b06..5c695260 100644 --- a/internal/sync/sync.go +++ b/internal/sync/sync.go @@ -90,6 +90,8 @@ func (s Sync) initSync() error { start := time.Now() // Delete old files on initial sync (e.g git checkout). + // Add the "/." to the local directory so rsync doesn't try to place the directory + // into the remote dir. err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir) if err == nil { flog.Success("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond))