Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Speed up event processing #8

Merged
merged 12 commits into from
Apr 24, 2020
Prev Previous commit
Next Next commit
Process safe events concurrently
  • Loading branch information
ammario committed Apr 24, 2020
commit 22887355bc4dfae880a4596348c74ab5e12b9764
72 changes: 72 additions & 0 deletions internal/sync/eventcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package sync

import (
"os"
"time"

"github.com/rjeczalik/notify"
"go.coder.com/flog"
)

type timedEvent struct {
CreatedAt time.Time
notify.EventInfo
}

type eventCache map[string]timedEvent

func (cache eventCache) Add(ev timedEvent) {
log := flog.New()
log.Prefix = ev.Path() + ": "
lastEvent, ok := cache[ev.Path()]
if ok {
switch {
// If the file was quickly created and then destroyed, pretend nothing ever happened.
case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove:
delete(cache, ev.Path())
log.Info("ignored Create then Remove")
return
}
}
if ok {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be merged with the if statement above

log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event())
}
// Only let the latest event for a path have action.
cache[ev.Path()] = ev
}

// DirectoryEvents returns the list of events that pertain to directories.
// The set of returns events is disjoint with FileEvents.
func (cache eventCache) DirectoryEvents() []timedEvent {
var r []timedEvent
for _, ev := range cache {
info, err := os.Stat(ev.Path())
if err != nil {
continue
}
if !info.IsDir() {
continue
}
r = append(r, ev)

}
return r
}

// FileEvents returns the list of events that pertain to files.
// The set of returns events is disjoint with DirectoryEvents.
func (cache eventCache) FileEvents() []timedEvent {
var r []timedEvent
for _, ev := range cache {
info, err := os.Stat(ev.Path())
if err != nil {
continue
}
if info.IsDir() {
continue
}
r = append(r, ev)

}
return r
}
62 changes: 31 additions & 31 deletions internal/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"os/exec"
"path"
"path/filepath"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/rjeczalik/notify"
"go.coder.com/flog"
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/xerrors"

"cdr.dev/coder-cli/internal/entclient"
Expand Down Expand Up @@ -167,51 +167,51 @@ func (s Sync) work(ev timedEvent) {
}
}

func setConsoleTitle(title string) {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return
}
fmt.Printf("\033]0;%s\007", title)
}


var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it")

// workEventGroup converges a group of events to prevent duplicate work.
func (s Sync) workEventGroup(evs []timedEvent) {
cache := map[string]timedEvent{}
cache := make(eventCache)
for _, ev := range evs {
log := flog.New()
log.Prefix = ev.Path() + ": "
lastEvent, ok := cache[ev.Path()]
if ok {
switch {
// If the file was quickly created and then destroyed, pretend nothing ever happened.
case lastEvent.Event() == notify.Create && ev.Event() == notify.Remove:
delete(cache, ev.Path())
log.Info("ignored Create then Remove")
continue
}
}
if ok {
log.Info("ignored duplicate event (%s replaced by %s)", lastEvent.Event(), ev.Event())
}
// Only let the latest event for a path have action.
cache[ev.Path()] = ev
cache.Add(ev)
}
for _, ev := range cache {
setConsoleTitle("🚀 updating " + filepath.Base(ev.Path()))

// We want to process events concurrently but safely for speed.
// Because the event cache prevents duplicate events for the same file, race conditions of that type
// are impossible.
// What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed
// and then a file is moved to it. AFAIK this dependecy only exists with Directories.
// So, we sequentially process the list of directory Renames and Creates, and then concurrently
// perform all Writes.
for _, ev := range cache.DirectoryEvents() {
s.work(ev)
}
}

var wg sync.WaitGroup
for _, ev := range cache.FileEvents() {
setConsoleTitle(fmtUpdateTitle(ev.Path()))

wg.Add(1)
ev := ev
go func() {
defer wg.Done()
s.work(ev)
}()
}

wg.Wait()
}

const (
// maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
// Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
// with individual rsyncs.
maxInflightInotify = 8
maxEventDelay = time.Second * 7
maxEventDelay = time.Second * 7
// maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server.
// This sets a lower bound for perceivable latency, but the higher it is, the better the optimization.
maxAcceptableDispatch = time.Millisecond * 50
)

func (s Sync) Run() error {
Expand Down Expand Up @@ -250,7 +250,7 @@ func (s Sync) Run() error {

var (
eventGroup []timedEvent
dispatchEventGroup = time.NewTicker(time.Millisecond * 10)
dispatchEventGroup = time.NewTicker(maxAcceptableDispatch)
)
defer dispatchEventGroup.Stop()
for {
Expand Down
12 changes: 0 additions & 12 deletions internal/sync/timedevent.go

This file was deleted.

21 changes: 21 additions & 0 deletions internal/sync/title.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sync

import (
"fmt"
"os"
"path/filepath"

"golang.org/x/crypto/ssh/terminal"
)

func setConsoleTitle(title string) {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return
}
fmt.Printf("\033]0;%s\007", title)
}


func fmtUpdateTitle(path string) string {
return "🚀 updating " + filepath.Base(path)
}