Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Speed up event processing #8

Merged
merged 12 commits into from
Apr 24, 2020
8 changes: 7 additions & 1 deletion cmd/coder/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"os"
"path/filepath"
"strings"

"github.com/spf13/pflag"
Expand Down Expand Up @@ -57,11 +58,16 @@ func (cmd *syncCmd) Run(fl *pflag.FlagSet) {

env := findEnv(entClient, envName)

absLocal, err := filepath.Abs(local)
if err != nil {
flog.Fatal("make abs path out of %v: %v", local, absLocal)
}

s := sync.Sync{
Init: cmd.init,
Environment: env,
RemoteDir: remoteDir,
LocalDir: local,
LocalDir: absLocal,
Client: entClient,
}
for err == nil || err == sync.ErrRestartSync {
Expand Down
72 changes: 72 additions & 0 deletions internal/sync/eventcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package sync

import (
"os"
"time"

"github.com/rjeczalik/notify"
"go.coder.com/flog"
)

type timedEvent struct {
CreatedAt time.Time
notify.EventInfo
}

type eventCache map[string]timedEvent

func (cache eventCache) Add(ev timedEvent) {
log := flog.New()
log.Prefix = ev.Path() + ": "
lastEvent, ok := cache[ev.Path()]
if ok {
switch {
// If the file was quickly created and then destroyed, pretend nothing ever happened.
case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove:
delete(cache, ev.Path())
log.Info("ignored Create then Remove")
return
}
}
if ok {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be merged with the if statement above

log.Info("replaced %s with %s", lastEvent.Event(), ev.Event())
}
// Only let the latest event for a path have action.
cache[ev.Path()] = ev
}

// DirectoryEvents returns the list of events that pertain to directories.
// The set of returns events is disjoint with FileEvents.
func (cache eventCache) DirectoryEvents() []timedEvent {
var r []timedEvent
for _, ev := range cache {
info, err := os.Stat(ev.Path())
if err != nil {
continue
}
if !info.IsDir() {
continue
}
r = append(r, ev)

}
return r
}

// FileEvents returns the list of events that pertain to files.
// The set of returns events is disjoint with DirectoryEvents.
func (cache eventCache) FileEvents() []timedEvent {
var r []timedEvent
for _, ev := range cache {
info, err := os.Stat(ev.Path())
if err != nil {
continue
}
if info.IsDir() {
continue
}
r = append(r, ev)

}
return r
}
166 changes: 125 additions & 41 deletions internal/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"os/exec"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
"github.com/rjeczalik/notify"
"go.coder.com/flog"
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/xerrors"

"cdr.dev/coder-cli/internal/entclient"
Expand All @@ -24,8 +25,10 @@ import (
// Sync runs a live sync daemon.
type Sync struct {
// Init sets whether the sync will do the initial init and then return fast.
Init bool
LocalDir string
Init bool
// LocalDir is an absolute path.
LocalDir string
// RemoteDir is an absolute path.
RemoteDir string
entclient.Environment
*entclient.Client
Expand All @@ -35,13 +38,17 @@ func (s Sync) syncPaths(delete bool, local, remote string) error {
self := os.Args[0]

args := []string{"-zz",
"-a", "--progress",
"-a",
"--delete",
"-e", self + " sh", local, s.Environment.Name + ":" + remote,
}
if delete {
args = append([]string{"--delete"}, args...)
}
if os.Getenv("DEBUG_RSYNC") != "" {
args = append([]string{"--progress"}, args...)
}

// See https://unix.stackexchange.com/questions/188737/does-compression-option-z-with-rsync-speed-up-backup
// on compression level.
// (AB): compression sped up the initial sync of the enterprise repo by 30%, leading me to believe it's
Expand Down Expand Up @@ -82,7 +89,7 @@ func (s Sync) initSync() error {

start := time.Now()
// Delete old files on initial sync (e.g git checkout).
err := s.syncPaths(true, s.LocalDir, s.RemoteDir)
err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't hurt to comment on why this is necessary

if err == nil {
flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond))
}
Expand All @@ -104,6 +111,12 @@ func (s Sync) handleCreate(localPath string) error {
target := s.convertPath(localPath)
err := s.syncPaths(false, localPath, target)
if err != nil {
_, statErr := os.Stat(localPath)
// File was quickly deleted.
if os.IsNotExist(statErr) {
return nil
}

return err
}
return nil
Expand Down Expand Up @@ -133,11 +146,10 @@ func (s Sync) handleRename(localPath string) error {
return s.handleCreate(localPath)
}

func (s Sync) work(ev notify.EventInfo) {
func (s Sync) work(ev timedEvent) {
var (
localPath = ev.Path()
remotePath = s.convertPath(localPath)
err error
localPath = ev.Path()
err error
)
switch ev.Event() {
case notify.Write, notify.Create:
Expand All @@ -150,61 +162,133 @@ func (s Sync) work(ev notify.EventInfo) {
flog.Info("unhandled event %v %+v", ev.Event(), ev.Path())
}

log := fmt.Sprintf("%v %v (%v)",
ev.Event(), filepath.Base(localPath), time.Since(ev.CreatedAt).Truncate(time.Millisecond*10),
)
if err != nil {
flog.Error("%v: %v -> %v: %v", ev.Event(), localPath, remotePath, err)
flog.Error(log+": %v", err)
} else {
flog.Success("%v: %v -> %v", ev.Event(), localPath, remotePath)
flog.Success(log)
}
}

func setConsoleTitle(title string) {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return
var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it")

// workEventGroup converges a group of events to prevent duplicate work.
func (s Sync) workEventGroup(evs []timedEvent) {
cache := make(eventCache)
for _, ev := range evs {
cache.Add(ev)
}
fmt.Printf("\033]0;%s\007", title)
}

// maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
// with individual rsyncs.
const maxInflightInotify = 16
// We want to process events concurrently but safely for speed.
// Because the event cache prevents duplicate events for the same file, race conditions of that type
// are impossible.
// What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed
// and then a file is moved to it. AFAIK this dependecy only exists with Directories.
// So, we sequentially process the list of directory Renames and Creates, and then concurrently
// perform all Writes.
for _, ev := range cache.DirectoryEvents() {
s.work(ev)
}

var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it")
var wg sync.WaitGroup
for _, ev := range cache.FileEvents() {
setConsoleTitle(fmtUpdateTitle(ev.Path()))

func (s Sync) Run() error {
setConsoleTitle("⏳ syncing project")
err := s.initSync()
if err != nil {
return err
wg.Add(1)
ev := ev
go func() {
defer wg.Done()
s.work(ev)
}()
}

if s.Init {
return nil
}
wg.Wait()
}

// This queue is twice as large as the max in flight so we can check when it's full reliably.
events := make(chan notify.EventInfo, maxInflightInotify*2)
const (
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
// with individual rsyncs.
maxInflightInotify = 8
maxEventDelay = time.Second * 7
// maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server.
// This sets a lower bound for perceivable latency, but the higher it is, the better the optimization.
maxAcceptableDispatch = time.Millisecond * 50
)

// Run starts the sync synchronously.
// Use this command to debug what wasn't sync'd correctly:
// rsync -e "coder sh" -nicr ~/Projects/cdr/coder-cli/. ammar:/home/coder/coder-cli/
func (s Sync) Run() error {
events := make(chan notify.EventInfo, maxInflightInotify)
// Set up a recursive watch.
err = notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All)
// We do this before the initial sync so we can capture any changes that may have happened during sync.
err := notify.Watch(path.Join(s.LocalDir, "..."), events, notify.All)
if err != nil {
return xerrors.Errorf("create watch: %w", err)
}
defer notify.Stop(events)

setConsoleTitle("⏳ syncing project")
err = s.initSync()
if err != nil {
return err
}

if s.Init {
return nil
}

const watchingFilesystemTitle = "🛰 watching filesystem"
setConsoleTitle(watchingFilesystemTitle)

flog.Info("watching %s for changes", s.LocalDir)
for ev := range events {
if len(events) > maxInflightInotify {
return ErrRestartSync

var droppedEvents uint64
// Timed events lets us track how long each individual file takes to update.
timedEvents := make(chan timedEvent, cap(events))
go func() {
defer close(timedEvents)
for event := range events {
select {
case timedEvents <- timedEvent{
CreatedAt: time.Now(),
EventInfo: event,
}:
default:
if atomic.AddUint64(&droppedEvents, 1) == 1 {
flog.Info("dropped event, sync should restart soon")
}
}
}
}()

setConsoleTitle("🚀 updating " + filepath.Base(ev.Path()))
s.work(ev)
var (
eventGroup []timedEvent
dispatchEventGroup = time.NewTicker(maxAcceptableDispatch)
)
defer dispatchEventGroup.Stop()
for {
const watchingFilesystemTitle = "🛰 watching filesystem"
setConsoleTitle(watchingFilesystemTitle)
}

return nil
select {
case ev := <-timedEvents:
if atomic.LoadUint64(&droppedEvents) > 0 {
return ErrRestartSync
}

eventGroup = append(eventGroup, ev)
case <-dispatchEventGroup.C:
if len(eventGroup) == 0 {
continue
}
// We're too backlogged and should restart the sync.
if time.Since(eventGroup[0].CreatedAt) > maxEventDelay {
return ErrRestartSync
}
s.workEventGroup(eventGroup)
eventGroup = eventGroup[:0]
}
}
}
21 changes: 21 additions & 0 deletions internal/sync/title.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sync

import (
"fmt"
"os"
"path/filepath"

"golang.org/x/crypto/ssh/terminal"
)

func setConsoleTitle(title string) {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return
}
fmt.Printf("\033]0;%s\007", title)
}


func fmtUpdateTitle(path string) string {
return "🚀 updating " + filepath.Base(path)
}