@@ -24,7 +24,10 @@ const (
24
24
// dropping updates
25
25
ResponseBufferSize = 512
26
26
// RequestBufferSize is the max number of requests to buffer per connection
27
- RequestBufferSize = 32
27
+ RequestBufferSize = 32
28
+ CloseErrOverwritten = "peer ID overwritten by new connection"
29
+ CloseErrCoordinatorClose = "coordinator closed"
30
+ ReadyForHandshakeError = "ready for handshake error"
28
31
)
29
32
30
33
// Coordinator exchanges nodes with agents to establish connections.
@@ -97,6 +100,18 @@ var (
97
100
ErrAlreadyRemoved = xerrors .New ("already removed" )
98
101
)
99
102
103
+ type AuthorizationError struct {
104
+ Wrapped error
105
+ }
106
+
107
+ func (e AuthorizationError ) Error () string {
108
+ return fmt .Sprintf ("authorization: %s" , e .Wrapped .Error ())
109
+ }
110
+
111
+ func (e AuthorizationError ) Unwrap () error {
112
+ return e .Wrapped
113
+ }
114
+
100
115
// NewCoordinator constructs a new in-memory connection coordinator. This
101
116
// coordinator is incompatible with multiple Coder replicas as all node data is
102
117
// in-memory.
@@ -161,8 +176,12 @@ func (c *coordinator) Coordinate(
161
176
c .wg .Add (1 )
162
177
go func () {
163
178
defer c .wg .Done ()
164
- p .reqLoop (ctx , logger , c .core .handleRequest )
165
- err := c .core .lostPeer (p )
179
+ loopErr := p .reqLoop (ctx , logger , c .core .handleRequest )
180
+ closeErrStr := ""
181
+ if loopErr != nil {
182
+ closeErrStr = loopErr .Error ()
183
+ }
184
+ err := c .core .lostPeer (p , closeErrStr )
166
185
if xerrors .Is (err , ErrClosed ) || xerrors .Is (err , ErrAlreadyRemoved ) {
167
186
return
168
187
}
@@ -227,7 +246,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
227
246
}
228
247
229
248
if err := pr .auth .Authorize (ctx , req ); err != nil {
230
- return xerrors . Errorf ( "authorize request: %w" , err )
249
+ return AuthorizationError { Wrapped : err }
231
250
}
232
251
233
252
if req .UpdateSelf != nil {
@@ -270,7 +289,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
270
289
}
271
290
}
272
291
if req .Disconnect != nil {
273
- c .removePeerLocked (p .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "graceful disconnect" )
292
+ c .removePeerLocked (p .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "graceful disconnect" , "" )
274
293
}
275
294
if rfhs := req .ReadyForHandshake ; rfhs != nil {
276
295
err := c .handleReadyForHandshakeLocked (pr , rfhs )
@@ -298,7 +317,7 @@ func (c *core) handleReadyForHandshakeLocked(src *peer, rfhs []*proto.Coordinate
298
317
// don't want to kill its connection.
299
318
select {
300
319
case src .resps <- & proto.CoordinateResponse {
301
- Error : fmt .Sprintf ("you do not share a tunnel with %q" , dstID .String ()),
320
+ Error : fmt .Sprintf ("%s: you do not share a tunnel with %q" , ReadyForHandshakeError , dstID .String ()),
302
321
}:
303
322
default :
304
323
return ErrWouldBlock
@@ -344,7 +363,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
344
363
err := other .updateMappingLocked (id , n , k , reason )
345
364
if err != nil {
346
365
other .logger .Error (context .Background (), "failed to update mapping" , slog .Error (err ))
347
- c .removePeerLocked (other .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" )
366
+ c .removePeerLocked (other .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" , "failed to update tunnel peer mapping" )
348
367
}
349
368
}
350
369
}
@@ -360,7 +379,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
360
379
err := src .updateMappingLocked (dstID , dst .node , proto .CoordinateResponse_PeerUpdate_NODE , "add tunnel" )
361
380
if err != nil {
362
381
src .logger .Error (context .Background (), "failed update of tunnel src" , slog .Error (err ))
363
- c .removePeerLocked (src .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" )
382
+ c .removePeerLocked (src .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" ,
383
+ "failed to update tunnel dest mapping" )
364
384
// if the source fails, then the tunnel is also removed and there is no reason to continue
365
385
// processing.
366
386
return err
@@ -370,7 +390,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
370
390
err := dst .updateMappingLocked (src .id , src .node , proto .CoordinateResponse_PeerUpdate_NODE , "add tunnel" )
371
391
if err != nil {
372
392
dst .logger .Error (context .Background (), "failed update of tunnel dst" , slog .Error (err ))
373
- c .removePeerLocked (dst .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" )
393
+ c .removePeerLocked (dst .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" ,
394
+ "failed to update tunnel src mapping" )
374
395
}
375
396
}
376
397
}
@@ -381,7 +402,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
381
402
err := src .updateMappingLocked (dstID , nil , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "remove tunnel" )
382
403
if err != nil {
383
404
src .logger .Error (context .Background (), "failed to update" , slog .Error (err ))
384
- c .removePeerLocked (src .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" )
405
+ c .removePeerLocked (src .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" , "failed to remove tunnel dest mapping" )
385
406
// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
386
407
// return here.
387
408
return err
@@ -391,7 +412,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
391
412
err = dst .updateMappingLocked (src .id , nil , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "remove tunnel" )
392
413
if err != nil {
393
414
dst .logger .Error (context .Background (), "failed to update" , slog .Error (err ))
394
- c .removePeerLocked (dst .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" )
415
+ c .removePeerLocked (dst .id , proto .CoordinateResponse_PeerUpdate_DISCONNECTED , "failed update" , "failed to remove tunnel src mapping" )
395
416
// don't return here because we still want to remove the tunnel, and an error at the
396
417
// destination doesn't count as an error removing the tunnel at the source.
397
418
}
@@ -413,6 +434,11 @@ func (c *core) initPeer(p *peer) error {
413
434
if old , ok := c .peers [p .id ]; ok {
414
435
// rare and interesting enough to log at Info, but it isn't an error per se
415
436
old .logger .Info (context .Background (), "overwritten by new connection" )
437
+ select {
438
+ case old .resps <- & proto.CoordinateResponse {Error : CloseErrOverwritten }:
439
+ default :
440
+ // pass
441
+ }
416
442
close (old .resps )
417
443
p .overwrites = old .overwrites + 1
418
444
}
@@ -433,7 +459,7 @@ func (c *core) initPeer(p *peer) error {
433
459
434
460
// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
435
461
// all tunnels from which the removed peer is the source.
436
- func (c * core ) lostPeer (p * peer ) error {
462
+ func (c * core ) lostPeer (p * peer , closeErr string ) error {
437
463
c .mutex .Lock ()
438
464
defer c .mutex .Unlock ()
439
465
c .logger .Debug (context .Background (), "lostPeer" , slog .F ("peer_id" , p .id ))
@@ -443,18 +469,25 @@ func (c *core) lostPeer(p *peer) error {
443
469
if existing , ok := c .peers [p .id ]; ! ok || existing != p {
444
470
return ErrAlreadyRemoved
445
471
}
446
- c .removePeerLocked (p .id , proto .CoordinateResponse_PeerUpdate_LOST , "lost" )
472
+ c .removePeerLocked (p .id , proto .CoordinateResponse_PeerUpdate_LOST , "lost" , closeErr )
447
473
return nil
448
474
}
449
475
450
- func (c * core ) removePeerLocked (id uuid.UUID , kind proto.CoordinateResponse_PeerUpdate_Kind , reason string ) {
476
+ func (c * core ) removePeerLocked (id uuid.UUID , kind proto.CoordinateResponse_PeerUpdate_Kind , reason , closeErr string ) {
451
477
p , ok := c .peers [id ]
452
478
if ! ok {
453
479
c .logger .Critical (context .Background (), "removed non-existent peer %s" , id )
454
480
return
455
481
}
456
482
c .updateTunnelPeersLocked (id , nil , kind , reason )
457
483
c .tunnels .removeAll (id )
484
+ if closeErr != "" {
485
+ select {
486
+ case p .resps <- & proto.CoordinateResponse {Error : closeErr }:
487
+ default :
488
+ // blocked, pass.
489
+ }
490
+ }
458
491
close (p .resps )
459
492
delete (c .peers , id )
460
493
}
@@ -487,7 +520,8 @@ func (c *core) close() error {
487
520
for id := range c .peers {
488
521
// when closing, mark them as LOST so that we don't disrupt in-progress
489
522
// connections.
490
- c .removePeerLocked (id , proto .CoordinateResponse_PeerUpdate_LOST , "coordinator close" )
523
+ c .removePeerLocked (id , proto .CoordinateResponse_PeerUpdate_LOST , "coordinator close" ,
524
+ CloseErrCoordinatorClose )
491
525
}
492
526
return nil
493
527
}
0 commit comments