@@ -9,12 +9,14 @@ import (
9
9
"os/exec"
10
10
"path"
11
11
"path/filepath"
12
+ "sync"
13
+ "sync/atomic"
12
14
"time"
13
15
14
16
"github.com/gorilla/websocket"
15
17
"github.com/rjeczalik/notify"
16
18
"go.coder.com/flog"
17
- "golang.org/x/crypto/ssh/terminal "
19
+ "golang.org/x/sync/semaphore "
18
20
"golang.org/x/xerrors"
19
21
20
22
"cdr.dev/coder-cli/internal/entclient"
@@ -24,8 +26,10 @@ import (
24
26
// Sync runs a live sync daemon.
25
27
type Sync struct {
26
28
// Init sets whether the sync will do the initial init and then return fast.
27
- Init bool
28
- LocalDir string
29
+ Init bool
30
+ // LocalDir is an absolute path.
31
+ LocalDir string
32
+ // RemoteDir is an absolute path.
29
33
RemoteDir string
30
34
entclient.Environment
31
35
* entclient.Client
@@ -35,13 +39,17 @@ func (s Sync) syncPaths(delete bool, local, remote string) error {
35
39
self := os .Args [0 ]
36
40
37
41
args := []string {"-zz" ,
38
- "-a" , "--progress" ,
42
+ "-a" ,
39
43
"--delete" ,
40
44
"-e" , self + " sh" , local , s .Environment .Name + ":" + remote ,
41
45
}
42
46
if delete {
43
47
args = append ([]string {"--delete" }, args ... )
44
48
}
49
+ if os .Getenv ("DEBUG_RSYNC" ) != "" {
50
+ args = append ([]string {"--progress" }, args ... )
51
+ }
52
+
45
53
// See https://unix.stackexchange.com/questions/188737/does-compression-option-z-with-rsync-speed-up-backup
46
54
// on compression level.
47
55
// (AB): compression sped up the initial sync of the enterprise repo by 30%, leading me to believe it's
@@ -82,9 +90,11 @@ func (s Sync) initSync() error {
82
90
83
91
start := time .Now ()
84
92
// Delete old files on initial sync (e.g git checkout).
85
- err := s .syncPaths (true , s .LocalDir , s .RemoteDir )
93
+ // Add the "/." to the local directory so rsync doesn't try to place the directory
94
+ // into the remote dir.
95
+ err := s .syncPaths (true , s .LocalDir + "/." , s .RemoteDir )
86
96
if err == nil {
87
- flog .Info ("finished initial sync (%v)" , time .Since (start ).Truncate (time .Millisecond ))
97
+ flog .Success ("finished initial sync (%v)" , time .Since (start ).Truncate (time .Millisecond ))
88
98
}
89
99
return err
90
100
}
@@ -104,6 +114,12 @@ func (s Sync) handleCreate(localPath string) error {
104
114
target := s .convertPath (localPath )
105
115
err := s .syncPaths (false , localPath , target )
106
116
if err != nil {
117
+ _ , statErr := os .Stat (localPath )
118
+ // File was quickly deleted.
119
+ if os .IsNotExist (statErr ) {
120
+ return nil
121
+ }
122
+
107
123
return err
108
124
}
109
125
return nil
@@ -133,11 +149,10 @@ func (s Sync) handleRename(localPath string) error {
133
149
return s .handleCreate (localPath )
134
150
}
135
151
136
- func (s Sync ) work (ev notify. EventInfo ) {
152
+ func (s Sync ) work (ev timedEvent ) {
137
153
var (
138
- localPath = ev .Path ()
139
- remotePath = s .convertPath (localPath )
140
- err error
154
+ localPath = ev .Path ()
155
+ err error
141
156
)
142
157
switch ev .Event () {
143
158
case notify .Write , notify .Create :
@@ -150,61 +165,137 @@ func (s Sync) work(ev notify.EventInfo) {
150
165
flog .Info ("unhandled event %v %+v" , ev .Event (), ev .Path ())
151
166
}
152
167
168
+ log := fmt .Sprintf ("%v %v (%v)" ,
169
+ ev .Event (), filepath .Base (localPath ), time .Since (ev .CreatedAt ).Truncate (time .Millisecond * 10 ),
170
+ )
153
171
if err != nil {
154
- flog .Error ("%v : %v -> %v: %v" , ev . Event (), localPath , remotePath , err )
172
+ flog .Error (log + " : %v" , err )
155
173
} else {
156
- flog .Success ("%v: %v -> %v" , ev . Event (), localPath , remotePath )
174
+ flog .Success (log )
157
175
}
158
176
}
159
177
160
- func setConsoleTitle (title string ) {
161
- if ! terminal .IsTerminal (int (os .Stdout .Fd ())) {
162
- return
178
+ var ErrRestartSync = errors .New ("the sync exited because it was overloaded, restart it" )
179
+
180
+ // workEventGroup converges a group of events to prevent duplicate work.
181
+ func (s Sync ) workEventGroup (evs []timedEvent ) {
182
+ cache := make (eventCache )
183
+ for _ , ev := range evs {
184
+ cache .Add (ev )
185
+ }
186
+
187
+ // We want to process events concurrently but safely for speed.
188
+ // Because the event cache prevents duplicate events for the same file, race conditions of that type
189
+ // are impossible.
190
+ // What is possible is a dependency on a previous Rename or Create. For example, if a directory is renamed
191
+ // and then a file is moved to it. AFAIK this dependecy only exists with Directories.
192
+ // So, we sequentially process the list of directory Renames and Creates, and then concurrently
193
+ // perform all Writes.
194
+ for _ , ev := range cache .SequentialEvents () {
195
+ s .work (ev )
163
196
}
164
- fmt .Printf ("\033 ]0;%s\007 " , title )
165
- }
166
197
167
- // maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
168
- // Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
169
- // with individual rsyncs.
170
- const maxInflightInotify = 16
198
+ sem := semaphore .NewWeighted (8 )
171
199
172
- var ErrRestartSync = errors .New ("the sync exited because it was overloaded, restart it" )
200
+ var wg sync.WaitGroup
201
+ for _ , ev := range cache .ConcurrentEvents () {
202
+ setConsoleTitle (fmtUpdateTitle (ev .Path ()))
173
203
174
- func (s Sync ) Run () error {
175
- setConsoleTitle ("⏳ syncing project" )
176
- err := s .initSync ()
177
- if err != nil {
178
- return err
204
+ wg .Add (1 )
205
+ sem .Acquire (context .Background (), 1 )
206
+ ev := ev
207
+ go func () {
208
+ defer sem .Release (1 )
209
+ defer wg .Done ()
210
+ s .work (ev )
211
+ }()
179
212
}
180
213
181
- if s .Init {
182
- return nil
183
- }
214
+ wg .Wait ()
215
+ }
184
216
185
- // This queue is twice as large as the max in flight so we can check when it's full reliably.
186
- events := make (chan notify.EventInfo , maxInflightInotify * 2 )
217
+ const (
218
+ // maxinflightInotify sets the maximum number of inotifies before the sync just restarts.
219
+ // Syncing a large amount of small files (e.g .git or node_modules) is impossible to do performantly
220
+ // with individual rsyncs.
221
+ maxInflightInotify = 8
222
+ maxEventDelay = time .Second * 7
223
+ // maxAcceptableDispatch is the maximum amount of time before an event should begin its journey to the server.
224
+ // This sets a lower bound for perceivable latency, but the higher it is, the better the optimization.
225
+ maxAcceptableDispatch = time .Millisecond * 50
226
+ )
227
+
228
+ // Run starts the sync synchronously.
229
+ // Use this command to debug what wasn't sync'd correctly:
230
+ // rsync -e "coder sh" -nicr ~/Projects/cdr/coder-cli/. ammar:/home/coder/coder-cli/
231
+ func (s Sync ) Run () error {
232
+ events := make (chan notify.EventInfo , maxInflightInotify )
187
233
// Set up a recursive watch.
188
- err = notify .Watch (path .Join (s .LocalDir , "..." ), events , notify .All )
234
+ // We do this before the initial sync so we can capture any changes that may have happened during sync.
235
+ err := notify .Watch (path .Join (s .LocalDir , "..." ), events , notify .All )
189
236
if err != nil {
190
237
return xerrors .Errorf ("create watch: %w" , err )
191
238
}
192
239
defer notify .Stop (events )
193
240
241
+ setConsoleTitle ("⏳ syncing project" )
242
+ err = s .initSync ()
243
+ if err != nil {
244
+ return err
245
+ }
246
+
247
+ if s .Init {
248
+ return nil
249
+ }
194
250
195
- const watchingFilesystemTitle = "🛰 watching filesystem"
196
- setConsoleTitle (watchingFilesystemTitle )
197
251
198
252
flog .Info ("watching %s for changes" , s .LocalDir )
199
- for ev := range events {
200
- if len (events ) > maxInflightInotify {
201
- return ErrRestartSync
253
+
254
+ var droppedEvents uint64
255
+ // Timed events lets us track how long each individual file takes to update.
256
+ timedEvents := make (chan timedEvent , cap (events ))
257
+ go func () {
258
+ defer close (timedEvents )
259
+ for event := range events {
260
+ select {
261
+ case timedEvents <- timedEvent {
262
+ CreatedAt : time .Now (),
263
+ EventInfo : event ,
264
+ }:
265
+ default :
266
+ if atomic .AddUint64 (& droppedEvents , 1 ) == 1 {
267
+ flog .Info ("dropped event, sync should restart soon" )
268
+ }
269
+ }
202
270
}
271
+ }()
203
272
204
- setConsoleTitle ("🚀 updating " + filepath .Base (ev .Path ()))
205
- s .work (ev )
273
+ var (
274
+ eventGroup []timedEvent
275
+ dispatchEventGroup = time .NewTicker (maxAcceptableDispatch )
276
+ )
277
+ defer dispatchEventGroup .Stop ()
278
+ for {
279
+ const watchingFilesystemTitle = "🛰 watching filesystem"
206
280
setConsoleTitle (watchingFilesystemTitle )
207
- }
208
281
209
- return nil
282
+ select {
283
+ case ev := <- timedEvents :
284
+ if atomic .LoadUint64 (& droppedEvents ) > 0 {
285
+ return ErrRestartSync
286
+ }
287
+
288
+ eventGroup = append (eventGroup , ev )
289
+ case <- dispatchEventGroup .C :
290
+ if len (eventGroup ) == 0 {
291
+ continue
292
+ }
293
+ // We're too backlogged and should restart the sync.
294
+ if time .Since (eventGroup [0 ].CreatedAt ) > maxEventDelay {
295
+ return ErrRestartSync
296
+ }
297
+ s .workEventGroup (eventGroup )
298
+ eventGroup = eventGroup [:0 ]
299
+ }
300
+ }
210
301
}
0 commit comments