4
4
"context"
5
5
"fmt"
6
6
"io"
7
+ "maps"
7
8
"math"
8
9
"strings"
9
10
"sync"
@@ -239,15 +240,17 @@ func (c *BasicCoordination) respLoop() {
239
240
defer func () {
240
241
cErr := c .Client .Close ()
241
242
if cErr != nil {
242
- c .logger .Debug (context .Background (), "failed to close coordinate client after respLoop exit" , slog .Error (cErr ))
243
+ c .logger .Debug (context .Background (),
244
+ "failed to close coordinate client after respLoop exit" , slog .Error (cErr ))
243
245
}
244
246
c .coordinatee .SetAllPeersLost ()
245
247
close (c .respLoopDone )
246
248
}()
247
249
for {
248
250
resp , err := c .Client .Recv ()
249
251
if err != nil {
250
- c .logger .Debug (context .Background (), "failed to read from protocol" , slog .Error (err ))
252
+ c .logger .Debug (context .Background (),
253
+ "failed to read from protocol" , slog .Error (err ))
251
254
c .SendErr (xerrors .Errorf ("read: %w" , err ))
252
255
return
253
256
}
@@ -278,7 +281,8 @@ func (c *BasicCoordination) respLoop() {
278
281
ReadyForHandshake : rfh ,
279
282
})
280
283
if err != nil {
281
- c .logger .Debug (context .Background (), "failed to send ready for handshake" , slog .Error (err ))
284
+ c .logger .Debug (context .Background (),
285
+ "failed to send ready for handshake" , slog .Error (err ))
282
286
c .SendErr (xerrors .Errorf ("send: %w" , err ))
283
287
return
284
288
}
@@ -287,37 +291,158 @@ func (c *BasicCoordination) respLoop() {
287
291
}
288
292
}
289
293
290
- type singleDestController struct {
294
+ type TunnelSrcCoordController struct {
291
295
* BasicCoordinationController
292
- dest uuid.UUID
296
+
297
+ mu sync.Mutex
298
+ dests map [uuid.UUID ]struct {}
299
+ coordination * BasicCoordination
293
300
}
294
301
295
- // NewSingleDestController creates a CoordinationController for Coder clients that connect to a
296
- // single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent.
297
- func NewSingleDestController (logger slog.Logger , coordinatee Coordinatee , dest uuid.UUID ) CoordinationController {
298
- coordinatee .SetTunnelDestination (dest )
299
- return & singleDestController {
302
+ // NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively
303
+ // tunnel sources (that is, they create tunnel --- Coder clients not workspaces).
304
+ func NewTunnelSrcCoordController (
305
+ logger slog.Logger , coordinatee Coordinatee ,
306
+ ) * TunnelSrcCoordController {
307
+ return & TunnelSrcCoordController {
300
308
BasicCoordinationController : & BasicCoordinationController {
301
309
Logger : logger ,
302
310
Coordinatee : coordinatee ,
303
311
SendAcks : false ,
304
312
},
305
- dest : dest ,
313
+ dests : make ( map [uuid. UUID ] struct {}) ,
306
314
}
307
315
}
308
316
309
- func (c * singleDestController ) New (client CoordinatorClient ) CloserWaiter {
317
+ func (c * TunnelSrcCoordController ) New (client CoordinatorClient ) CloserWaiter {
318
+ c .mu .Lock ()
319
+ defer c .mu .Unlock ()
310
320
b := c .BasicCoordinationController .NewCoordination (client )
311
- err := client .Send (& proto.CoordinateRequest {AddTunnel : & proto.CoordinateRequest_Tunnel {Id : c .dest [:]}})
312
- if err != nil {
313
- b .SendErr (err )
321
+ c .coordination = b
322
+ // resync destinations on reconnect
323
+ for dest := range c .dests {
324
+ err := client .Send (& proto.CoordinateRequest {
325
+ AddTunnel : & proto.CoordinateRequest_Tunnel {Id : UUIDToByteSlice (dest )},
326
+ })
327
+ if err != nil {
328
+ b .SendErr (err )
329
+ c .coordination = nil
330
+ cErr := client .Close ()
331
+ if cErr != nil {
332
+ c .Logger .Debug (
333
+ context .Background (),
334
+ "failed to close coordinator client after add tunnel failure" ,
335
+ slog .Error (cErr ),
336
+ )
337
+ }
338
+ break
339
+ }
314
340
}
315
341
return b
316
342
}
317
343
344
+ func (c * TunnelSrcCoordController ) AddDestination (dest uuid.UUID ) {
345
+ c .mu .Lock ()
346
+ defer c .mu .Unlock ()
347
+ c .Coordinatee .SetTunnelDestination (dest ) // this prepares us for an ack
348
+ c .dests [dest ] = struct {}{}
349
+ if c .coordination == nil {
350
+ return
351
+ }
352
+ err := c .coordination .Client .Send (
353
+ & proto.CoordinateRequest {
354
+ AddTunnel : & proto.CoordinateRequest_Tunnel {Id : UUIDToByteSlice (dest )},
355
+ })
356
+ if err != nil {
357
+ c .coordination .SendErr (err )
358
+ cErr := c .coordination .Client .Close () // close the client so we don't gracefully disconnect
359
+ if cErr != nil {
360
+ c .Logger .Debug (context .Background (),
361
+ "failed to close coordinator client after add tunnel failure" ,
362
+ slog .Error (cErr ))
363
+ }
364
+ c .coordination = nil
365
+ }
366
+ }
367
+
368
+ func (c * TunnelSrcCoordController ) RemoveDestination (dest uuid.UUID ) {
369
+ c .mu .Lock ()
370
+ defer c .mu .Unlock ()
371
+ delete (c .dests , dest )
372
+ if c .coordination == nil {
373
+ return
374
+ }
375
+ err := c .coordination .Client .Send (
376
+ & proto.CoordinateRequest {
377
+ RemoveTunnel : & proto.CoordinateRequest_Tunnel {Id : UUIDToByteSlice (dest )},
378
+ })
379
+ if err != nil {
380
+ c .coordination .SendErr (err )
381
+ cErr := c .coordination .Client .Close () // close the client so we don't gracefully disconnect
382
+ if cErr != nil {
383
+ c .Logger .Debug (context .Background (),
384
+ "failed to close coordinator client after remove tunnel failure" ,
385
+ slog .Error (cErr ))
386
+ }
387
+ c .coordination = nil
388
+ }
389
+ }
390
+
391
+ func (c * TunnelSrcCoordController ) SyncDestinations (destinations []uuid.UUID ) {
392
+ c .mu .Lock ()
393
+ defer c .mu .Unlock ()
394
+ toAdd := make (map [uuid.UUID ]struct {})
395
+ toRemove := maps .Clone (c .dests )
396
+ all := make (map [uuid.UUID ]struct {})
397
+ for _ , dest := range destinations {
398
+ all [dest ] = struct {}{}
399
+ delete (toRemove , dest )
400
+ if _ , ok := c .dests [dest ]; ! ok {
401
+ toAdd [dest ] = struct {}{}
402
+ }
403
+ }
404
+ c .dests = all
405
+ if c .coordination == nil {
406
+ return
407
+ }
408
+ var err error
409
+ defer func () {
410
+ if err != nil {
411
+ c .coordination .SendErr (err )
412
+ cErr := c .coordination .Client .Close () // don't gracefully disconnect
413
+ if cErr != nil {
414
+ c .Logger .Debug (context .Background (),
415
+ "failed to close coordinator client during sync destinations" ,
416
+ slog .Error (cErr ))
417
+ }
418
+ c .coordination = nil
419
+ }
420
+ }()
421
+ for dest := range toAdd {
422
+ err = c .coordination .Client .Send (
423
+ & proto.CoordinateRequest {
424
+ AddTunnel : & proto.CoordinateRequest_Tunnel {Id : UUIDToByteSlice (dest )},
425
+ })
426
+ if err != nil {
427
+ return
428
+ }
429
+ }
430
+ for dest := range toRemove {
431
+ err = c .coordination .Client .Send (
432
+ & proto.CoordinateRequest {
433
+ RemoveTunnel : & proto.CoordinateRequest_Tunnel {Id : UUIDToByteSlice (dest )},
434
+ })
435
+ if err != nil {
436
+ return
437
+ }
438
+ }
439
+ }
440
+
318
441
// NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
319
442
// create tunnels and always send ReadyToHandshake acknowledgements.
320
- func NewAgentCoordinationController (logger slog.Logger , coordinatee Coordinatee ) CoordinationController {
443
+ func NewAgentCoordinationController (
444
+ logger slog.Logger , coordinatee Coordinatee ,
445
+ ) CoordinationController {
321
446
return & BasicCoordinationController {
322
447
Logger : logger ,
323
448
Coordinatee : coordinatee ,
0 commit comments