Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Speed up event processing #8

Merged
merged 12 commits into from
Apr 24, 2020
Prev Previous commit
Next Next commit
Improve dropped event calculation
  • Loading branch information
ammario committed Apr 24, 2020
commit c305b9dbca5934d38f2b1fa4e327c7ff7fa76be1
15 changes: 11 additions & 4 deletions internal/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (s Sync) initSync() error {

start := time.Now()
// Delete old files on initial sync (e.g git checkout).
err := s.syncPaths(true, s.LocalDir + "/.", s.RemoteDir)
err := s.syncPaths(true, s.LocalDir+"/.", s.RemoteDir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't hurt to comment on why this is necessary

if err == nil {
flog.Info("finished initial sync (%v)", time.Since(start).Truncate(time.Millisecond))
}
Expand Down Expand Up @@ -167,7 +168,6 @@ func (s Sync) work(ev timedEvent) {
}
}


var ErrRestartSync = errors.New("the sync exited because it was overloaded, restart it")

// workEventGroup converges a group of events to prevent duplicate work.
Expand Down Expand Up @@ -239,14 +239,21 @@ func (s Sync) Run() error {

flog.Info("watching %s for changes", s.LocalDir)

var droppedEvents uint64
// Timed events lets us track how long each individual file takes to update.
timedEvents := make(chan timedEvent, cap(events))
go func() {
defer close(timedEvents)
for event := range events {
timedEvents <- timedEvent{
select {
case timedEvents <- timedEvent{
CreatedAt: time.Now(),
EventInfo: event,
}:
default:
if atomic.AddUint64(&droppedEvents, 1) == 1 {
flog.Info("dropped event, sync should restart soon")
}
}
}
}()
Expand All @@ -262,7 +269,7 @@ func (s Sync) Run() error {

select {
case ev := <-timedEvents:
if len(events) > maxInflightInotify {
if atomic.LoadUint64(&droppedEvents) > 0 {
return ErrRestartSync
}

Expand Down