This repository was archived by the owner on Aug 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 18
Merged
Changes from 8 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
bb101e0
Speed up event processing by introducing duplicate cache
ammario 2288735
Process safe events concurrently
ammario ba23ea5
Remove single file support
ammario c2ca1ea
Improve replace message
ammario ed23f64
Document test command
ammario c305b9d
Improve dropped event calculation
ammario 4e31fe9
Add DEBUG_RSYNC env
ammario a675d5e
Create watch before initial sync
ammario 423278b
Support file deletion again
ammario 65be003
Add semaphore to pushing
ammario 4905bbd
Change initial sync to Success level
ammario b0c4bc3
Comment on dir translation
ammario File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package sync | ||
|
||
import ( | ||
"os" | ||
"time" | ||
|
||
"github.com/rjeczalik/notify" | ||
"go.coder.com/flog" | ||
) | ||
|
||
type timedEvent struct { | ||
CreatedAt time.Time | ||
notify.EventInfo | ||
} | ||
|
||
type eventCache map[string]timedEvent | ||
|
||
func (cache eventCache) Add(ev timedEvent) { | ||
log := flog.New() | ||
log.Prefix = ev.Path() + ": " | ||
lastEvent, ok := cache[ev.Path()] | ||
if ok { | ||
switch { | ||
// If the file was quickly created and then destroyed, pretend nothing ever happened. | ||
case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove: | ||
delete(cache, ev.Path()) | ||
log.Info("ignored Create then Remove") | ||
return | ||
} | ||
} | ||
if ok { | ||
log.Info("replaced %s with %s", lastEvent.Event(), ev.Event()) | ||
} | ||
// Only let the latest event for a path have action. | ||
cache[ev.Path()] = ev | ||
} | ||
|
||
// DirectoryEvents returns the list of events that pertain to directories. | ||
// The set of returns events is disjoint with FileEvents. | ||
func (cache eventCache) DirectoryEvents() []timedEvent { | ||
var r []timedEvent | ||
for _, ev := range cache { | ||
info, err := os.Stat(ev.Path()) | ||
if err != nil { | ||
continue | ||
} | ||
if !info.IsDir() { | ||
continue | ||
} | ||
r = append(r, ev) | ||
|
||
} | ||
return r | ||
} | ||
|
||
// FileEvents returns the list of events that pertain to files. | ||
// The set of returns events is disjoint with DirectoryEvents. | ||
func (cache eventCache) FileEvents() []timedEvent { | ||
var r []timedEvent | ||
for _, ev := range cache { | ||
info, err := os.Stat(ev.Path()) | ||
if err != nil { | ||
continue | ||
} | ||
if info.IsDir() { | ||
continue | ||
} | ||
r = append(r, ev) | ||
|
||
} | ||
return r | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,12 +9,13 @@ import ( | |
"os/exec" | ||
"path" | ||
"path/filepath" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/gorilla/websocket" | ||
"github.com/rjeczalik/notify" | ||
"go.coder.com/flog" | ||
"golang.org/x/crypto/ssh/terminal" | ||
"golang.org/x/xerrors" | ||
|
||
"cdr.dev/coder-cli/internal/entclient" | ||
|
@@ -24,8 +25,10 @@ import ( | |
// Sync runs a live sync daemon. | ||
type Sync struct { | ||
// Init sets whether the sync will do the initial init and then return fast. | ||
Init bool | ||
LocalDir string | ||
Init bool | ||
// LocalDir is an absolute path. | ||
LocalDir string | ||
// RemoteDir is an absolute path. | ||
RemoteDir string | ||
entclient.Environment | ||
*entclient.Client | ||
|
@@ -35,13 +38,17 @@ func (s Sync) syncPaths(delete bool, local, remote string) error { | |
self := os.Args[0] | ||
|
||
args := []string{"-zz", | ||
"-a", "--progress", | ||
"-a", | ||
"--delete", | ||
"-e", self + " sh", local, s.Environment.Name + ":" + remote, | ||
} | ||
if delete { | ||
args = append([]string{"--delete"}, args...) | ||
} | ||
if os.Getenv("DEBUG_RSYNC") != "" { | ||
args = append([]string{"--progress"}, args...) | ||
} | ||
|
||
// See https://unix.stackexchange.com/questions/188737/does-compression-option-z-with-rsync-speed-up-backup | ||
// on compression level. | ||
// (AB): compression sped up the initial sync of the enterprise repo by 30%, leading me to believe it's | ||
|
@@ -82,7 +89,7 @@ func (s Sync) initSync() error { | |
|
||
start := time.Now() | ||
// Delete old files on initial sync (e.g git checkout). | ||
err := s.syncPaths(true, s.LocalDir, s.RemoteDir) | ||
err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't hurt to comment on why this is necessary |
||
if err == nil { | ||
flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond)) | ||
} | ||
|
@@ -104,6 +111,12 @@ func (s Sync) handleCreate(localPath string) error { | |
target := s.convertPath(localPath) | ||
err := s.syncPaths(false, localPath, target) | ||
if err != nil { | ||
_, statErr := os.Stat(localPath) | ||
// File was quickly deleted. | ||
if os.IsNotExist(statErr) { | ||
return nil | ||
} | ||
|
||
return err | ||
} | ||
return nil | ||
|
@@ -133,11 +146,10 @@ func (s Sync) handleRename(localPath string) error { | |
return s.handleCreate(localPath) | ||
} | ||
|
||
func (s Sync) work(ev notify.EventInfo) { | ||
func (s Sync) work(ev timedEvent) { | ||
var ( | ||
localPath = ev.Path() | ||
remotePath = s.convertPath(localPath) | ||
err error | ||
localPath = ev.Path() | ||
err error | ||
) | ||
switch ev.Event() { | ||
case notify.Write, notify.Create: | ||
|
@@ -150,61 +162,133 @@ func (s Sync) work(ev notify.EventInfo) { | |
flog.Info("unhandled event %v %+v", ev.Event(), ev.Path()) | ||
} | ||
|
||
log := fmt.Sprintf("%v %v (%v)", | ||
ev.Event(), filepath.Base(localPath), time.Since(ev.CreatedAt).Truncate(time.Millisecond*10), | ||
) | ||
if err != nil { | ||
flog.Error("%v: %v -> %v: %v", ev.Event(), localPath, remotePath, err) | ||
flog.Error(log+": %v", err) | ||
} else { | ||
flog.Success("%v: %v -> %v", ev.Event(), localPath, remotePath) | ||
flog.Success(log) | ||
} | ||
} | ||
|
||
func setConsoleTitle(title string) { | ||
if !terminal.IsTerminal(int(os.Stdout.Fd())) { | ||
return | ||
var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it") | ||
|
||
// workEventGroup converges a group of events to prevent duplicate work. | ||
func (s Sync) workEventGroup(evs []timedEvent) { | ||
cache := make(eventCache) | ||
for _, ev := range evs { | ||
cache.Add(ev) | ||
} | ||
fmt.Printf("\033]0;%s\007", title) | ||
} | ||
|
||
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts. | ||
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly | ||
// with individual rsyncs. | ||
const maxInflightInotify = 16 | ||
// We want to process events concurrently but safely for speed. | ||
// Because the event cache prevents duplicate events for the same file, race conditions of that type | ||
// are impossible. | ||
// What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed | ||
// and then a file is moved to it. AFAIK this dependecy only exists with Directories. | ||
// So, we sequentially process the list of directory Renames and Creates, and then concurrently | ||
// perform all Writes. | ||
for _, ev := range cache.DirectoryEvents() { | ||
s.work(ev) | ||
} | ||
|
||
var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it") | ||
var wg sync.WaitGroup | ||
for _, ev := range cache.FileEvents() { | ||
setConsoleTitle(fmtUpdateTitle(ev.Path())) | ||
|
||
func (s Sync) Run() error { | ||
setConsoleTitle("⏳ syncing project") | ||
err := s.initSync() | ||
if err != nil { | ||
return err | ||
wg.Add(1) | ||
ev := ev | ||
go func() { | ||
defer wg.Done() | ||
s.work(ev) | ||
}() | ||
} | ||
|
||
if s.Init { | ||
return nil | ||
} | ||
wg.Wait() | ||
} | ||
|
||
// This queue is twice as large as the max in flight so we can check when it's full reliably. | ||
events := make(chan notify.EventInfo, maxInflightInotify*2) | ||
const ( | ||
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts. | ||
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly | ||
// with individual rsyncs. | ||
maxInflightInotify = 8 | ||
maxEventDelay = time.Second * 7 | ||
// maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server. | ||
// This sets a lower bound for perceivable latency, but the higher it is, the better the optimization. | ||
maxAcceptableDispatch = time.Millisecond * 50 | ||
) | ||
|
||
// Run starts the sync synchronously. | ||
// Use this command to debug what wasn't sync'd correctly: | ||
// rsync -e "coder sh" -nicr ~/Projects/cdr/coder-cli/. ammar:/home/coder/coder-cli/ | ||
func (s Sync) Run() error { | ||
events := make(chan notify.EventInfo, maxInflightInotify) | ||
// Set up a recursive watch. | ||
err = notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All) | ||
// We do this before the initial sync so we can capture any changes that may have happened during sync. | ||
err := notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All) | ||
if err != nil { | ||
return xerrors.Errorf("create watch: %w", err) | ||
} | ||
defer notify.Stop(events) | ||
|
||
setConsoleTitle("⏳ syncing project") | ||
err = s.initSync() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if s.Init { | ||
return nil | ||
} | ||
|
||
const watchingFilesystemTitle = "🛰 watching filesystem" | ||
setConsoleTitle(watchingFilesystemTitle) | ||
|
||
flog.Info("watching %s for changes", s.LocalDir) | ||
for ev := range events { | ||
if len(events) > maxInflightInotify { | ||
return ErrRestartSync | ||
|
||
var droppedEvents uint64 | ||
// Timed events lets us track how long each individual file takes to update. | ||
timedEvents := make(chan timedEvent, cap(events)) | ||
go func() { | ||
defer close(timedEvents) | ||
for event := range events { | ||
select { | ||
case timedEvents <- timedEvent{ | ||
CreatedAt: time.Now(), | ||
EventInfo: event, | ||
}: | ||
default: | ||
if atomic.AddUint64(&droppedEvents, 1) == 1 { | ||
flog.Info("dropped event, sync should restart soon") | ||
} | ||
} | ||
} | ||
}() | ||
|
||
setConsoleTitle("🚀 updating " + filepath.Base(ev.Path())) | ||
s.work(ev) | ||
var ( | ||
eventGroup []timedEvent | ||
dispatchEventGroup = time.NewTicker(maxAcceptableDispatch) | ||
) | ||
defer dispatchEventGroup.Stop() | ||
for { | ||
const watchingFilesystemTitle = "🛰 watching filesystem" | ||
setConsoleTitle(watchingFilesystemTitle) | ||
} | ||
|
||
return nil | ||
select { | ||
case ev := <-timedEvents: | ||
if atomic.LoadUint64(&droppedEvents) > 0 { | ||
return ErrRestartSync | ||
} | ||
|
||
eventGroup = append(eventGroup, ev) | ||
case <-dispatchEventGroup.C: | ||
if len(eventGroup) == 0 { | ||
continue | ||
} | ||
// We're too backlogged and should restart the sync. | ||
if time.Since(eventGroup[0].CreatedAt) > maxEventDelay { | ||
return ErrRestartSync | ||
} | ||
s.workEventGroup(eventGroup) | ||
eventGroup = eventGroup[:0] | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package sync | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
|
||
"golang.org/x/crypto/ssh/terminal" | ||
) | ||
|
||
func setConsoleTitle(title string) { | ||
if !terminal.IsTerminal(int(os.Stdout.Fd())) { | ||
return | ||
} | ||
fmt.Printf("\033]0;%s\007", title) | ||
} | ||
|
||
|
||
func fmtUpdateTitle(path string) string { | ||
return "🚀 updating " + filepath.Base(path) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be merged with the if statement above