|
4 | 4 | "context"
|
5 | 5 | "fmt"
|
6 | 6 | "io"
|
| 7 | + "maps" |
7 | 8 | "math"
|
8 | 9 | "strings"
|
9 | 10 | "sync"
|
@@ -287,34 +288,139 @@ func (c *BasicCoordination) respLoop() {
|
287 | 288 | }
|
288 | 289 | }
|
289 | 290 |
|
290 |
| -type singleDestController struct { |
| 291 | +type TunnelSrcCoordController struct { |
291 | 292 | *BasicCoordinationController
|
292 |
| - dest uuid.UUID |
| 293 | + |
| 294 | + mu sync.Mutex |
| 295 | + dests map[uuid.UUID]struct{} |
| 296 | + coordination *BasicCoordination |
293 | 297 | }
|
294 | 298 |
|
295 |
| -// NewSingleDestController creates a CoordinationController for Coder clients that connect to a |
296 |
| -// single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent. |
297 |
| -func NewSingleDestController(logger slog.Logger, coordinatee Coordinatee, dest uuid.UUID) CoordinationController { |
298 |
| - coordinatee.SetTunnelDestination(dest) |
299 |
| - return &singleDestController{ |
| 299 | +// NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively tunnel |
| 300 | +// sources (that is, they create tunnel --- Coder clients not workspaces). |
| 301 | +func NewTunnelSrcCoordController(logger slog.Logger, coordinatee Coordinatee) *TunnelSrcCoordController { |
| 302 | + return &TunnelSrcCoordController{ |
300 | 303 | BasicCoordinationController: &BasicCoordinationController{
|
301 | 304 | Logger: logger,
|
302 | 305 | Coordinatee: coordinatee,
|
303 | 306 | SendAcks: false,
|
304 | 307 | },
|
305 |
| - dest: dest, |
| 308 | + dests: make(map[uuid.UUID]struct{}), |
306 | 309 | }
|
307 | 310 | }
|
308 | 311 |
|
309 |
| -func (c *singleDestController) New(client CoordinatorClient) CloserWaiter { |
| 312 | +func (c *TunnelSrcCoordController) New(client CoordinatorClient) CloserWaiter { |
| 313 | + c.mu.Lock() |
| 314 | + defer c.mu.Unlock() |
310 | 315 | b := c.BasicCoordinationController.NewCoordination(client)
|
311 |
| - err := client.Send(&proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: c.dest[:]}}) |
312 |
| - if err != nil { |
313 |
| - b.SendErr(err) |
| 316 | + c.coordination = b |
| 317 | + // resync destinations on reconnect |
| 318 | + for dest := range c.dests { |
| 319 | + err := client.Send(&proto.CoordinateRequest{ |
| 320 | + AddTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}, |
| 321 | + }) |
| 322 | + if err != nil { |
| 323 | + b.SendErr(err) |
| 324 | + c.coordination = nil |
| 325 | + cErr := client.Close() |
| 326 | + if cErr != nil { |
| 327 | + c.Logger.Debug(context.Background(), "failed to close coordinator client after add tunnel failure", slog.Error(cErr)) |
| 328 | + } |
| 329 | + break |
| 330 | + } |
314 | 331 | }
|
315 | 332 | return b
|
316 | 333 | }
|
317 | 334 |
|
| 335 | +func (c *TunnelSrcCoordController) AddDestination(dest uuid.UUID) { |
| 336 | + c.mu.Lock() |
| 337 | + defer c.mu.Unlock() |
| 338 | + c.Coordinatee.SetTunnelDestination(dest) // this prepares us for an ack |
| 339 | + c.dests[dest] = struct{}{} |
| 340 | + if c.coordination == nil { |
| 341 | + return |
| 342 | + } |
| 343 | + err := c.coordination.Client.Send( |
| 344 | + &proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}}) |
| 345 | + if err != nil { |
| 346 | + c.coordination.SendErr(err) |
| 347 | + cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect |
| 348 | + if cErr != nil { |
| 349 | + c.Logger.Debug(context.Background(), |
| 350 | + "failed to close coordinator client after add tunnel failure", |
| 351 | + slog.Error(cErr)) |
| 352 | + } |
| 353 | + c.coordination = nil |
| 354 | + } |
| 355 | +} |
| 356 | + |
| 357 | +func (c *TunnelSrcCoordController) RemoveDestination(dest uuid.UUID) { |
| 358 | + c.mu.Lock() |
| 359 | + defer c.mu.Unlock() |
| 360 | + delete(c.dests, dest) |
| 361 | + if c.coordination == nil { |
| 362 | + return |
| 363 | + } |
| 364 | + err := c.coordination.Client.Send( |
| 365 | + &proto.CoordinateRequest{RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}}) |
| 366 | + if err != nil { |
| 367 | + c.coordination.SendErr(err) |
| 368 | + cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect |
| 369 | + if cErr != nil { |
| 370 | + c.Logger.Debug(context.Background(), |
| 371 | + "failed to close coordinator client after remove tunnel failure", |
| 372 | + slog.Error(cErr)) |
| 373 | + } |
| 374 | + c.coordination = nil |
| 375 | + } |
| 376 | +} |
| 377 | + |
| 378 | +func (c *TunnelSrcCoordController) SyncDestinations(destinations []uuid.UUID) { |
| 379 | + c.mu.Lock() |
| 380 | + defer c.mu.Unlock() |
| 381 | + toAdd := make(map[uuid.UUID]struct{}) |
| 382 | + toRemove := maps.Clone(c.dests) |
| 383 | + all := make(map[uuid.UUID]struct{}) |
| 384 | + for _, dest := range destinations { |
| 385 | + all[dest] = struct{}{} |
| 386 | + delete(toRemove, dest) |
| 387 | + if _, ok := c.dests[dest]; !ok { |
| 388 | + toAdd[dest] = struct{}{} |
| 389 | + } |
| 390 | + } |
| 391 | + c.dests = all |
| 392 | + if c.coordination == nil { |
| 393 | + return |
| 394 | + } |
| 395 | + var err error |
| 396 | + defer func() { |
| 397 | + if err != nil { |
| 398 | + c.coordination.SendErr(err) |
| 399 | + cErr := c.coordination.Client.Close() // close the client so we don't gracefully disconnect |
| 400 | + if cErr != nil { |
| 401 | + c.Logger.Debug(context.Background(), |
| 402 | + "failed to close coordinator client during sync destinations", |
| 403 | + slog.Error(cErr)) |
| 404 | + } |
| 405 | + c.coordination = nil |
| 406 | + } |
| 407 | + }() |
| 408 | + for dest := range toAdd { |
| 409 | + err = c.coordination.Client.Send( |
| 410 | + &proto.CoordinateRequest{AddTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}}) |
| 411 | + if err != nil { |
| 412 | + return |
| 413 | + } |
| 414 | + } |
| 415 | + for dest := range toRemove { |
| 416 | + err = c.coordination.Client.Send( |
| 417 | + &proto.CoordinateRequest{RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: dest[:]}}) |
| 418 | + if err != nil { |
| 419 | + return |
| 420 | + } |
| 421 | + } |
| 422 | +} |
| 423 | + |
318 | 424 | // NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
|
319 | 425 | // create tunnels and always send ReadyToHandshake acknowledgements.
|
320 | 426 | func NewAgentCoordinationController(logger slog.Logger, coordinatee Coordinatee) CoordinationController {
|
|
0 commit comments