Skip to content

Commit 345435a

Browse files
authored
feat: modify coordinators to send errors and peers to log them (coder#17467)
Adds support to our coordinator implementations to send Error updates before disconnecting clients. I was recently debugging a connection issue where the client was getting repeatedly disconnected from the Coordinator, but since we never send any error information it was really hard without server logs. This PR aims to correct that, by sending a CoordinateResponse with `Error` set in cases where we disconnect a client without them asking us to. It also logs the error whenever we get one in the client controller.
1 parent ea017a1 commit 345435a

12 files changed

+135
-51
lines changed

enterprise/tailnet/connio.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (c *connIO) recvLoop() {
113113
select {
114114
case <-c.coordCtx.Done():
115115
c.logger.Debug(c.coordCtx, "exiting io recvLoop; coordinator exit")
116+
_ = c.Enqueue(&proto.CoordinateResponse{Error: agpl.CloseErrCoordinatorClose})
116117
return
117118
case <-c.peerCtx.Done():
118119
c.logger.Debug(c.peerCtx, "exiting io recvLoop; peer context canceled")
@@ -123,6 +124,9 @@ func (c *connIO) recvLoop() {
123124
return
124125
}
125126
if err := c.handleRequest(req); err != nil {
127+
if !xerrors.Is(err, errDisconnect) {
128+
_ = c.Enqueue(&proto.CoordinateResponse{Error: err.Error()})
129+
}
126130
return
127131
}
128132
}
@@ -136,7 +140,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
136140
err := c.auth.Authorize(c.peerCtx, req)
137141
if err != nil {
138142
c.logger.Warn(c.peerCtx, "unauthorized request", slog.Error(err))
139-
return xerrors.Errorf("authorize request: %w", err)
143+
return agpl.AuthorizationError{Wrapped: err}
140144
}
141145

142146
if req.UpdateSelf != nil {
@@ -217,7 +221,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
217221
slog.F("dst", dst.String()),
218222
)
219223
_ = c.Enqueue(&proto.CoordinateResponse{
220-
Error: fmt.Sprintf("you do not share a tunnel with %q", dst.String()),
224+
Error: fmt.Sprintf("%s: you do not share a tunnel with %q", agpl.ReadyForHandshakeError, dst.String()),
221225
})
222226
return nil
223227
}

enterprise/tailnet/multiagent_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"cdr.dev/slog/sloggers/slogtest"
1111
"github.com/coder/coder/v2/coderd/database/dbtestutil"
1212
"github.com/coder/coder/v2/enterprise/tailnet"
13+
agpl "github.com/coder/coder/v2/tailnet"
1314
agpltest "github.com/coder/coder/v2/tailnet/test"
1415
"github.com/coder/coder/v2/testutil"
1516
)
@@ -77,7 +78,7 @@ func TestPGCoordinator_MultiAgent_CoordClose(t *testing.T) {
7778
err = coord1.Close()
7879
require.NoError(t, err)
7980

80-
ma1.AssertEventuallyResponsesClosed()
81+
ma1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
8182
}
8283

8384
// TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with

enterprise/tailnet/pgcoord.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737
numHandshakerWorkers = 5
3838
dbMaxBackoff = 10 * time.Second
3939
cleanupPeriod = time.Hour
40+
CloseErrUnhealthy = "coordinator unhealthy"
4041
)
4142

4243
// pgCoord is a postgres-backed coordinator
@@ -235,6 +236,7 @@ func (c *pgCoord) Coordinate(
235236
c.logger.Info(ctx, "closed incoming coordinate call while unhealthy",
236237
slog.F("peer_id", id),
237238
)
239+
resps <- &proto.CoordinateResponse{Error: CloseErrUnhealthy}
238240
close(resps)
239241
return reqs, resps
240242
}
@@ -882,6 +884,7 @@ func (q *querier) newConn(c *connIO) {
882884
q.mu.Lock()
883885
defer q.mu.Unlock()
884886
if !q.healthy {
887+
_ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy})
885888
err := c.Close()
886889
// This can only happen during a narrow window where we were healthy
887890
// when pgCoord checked before accepting the connection, but now are
@@ -1271,6 +1274,7 @@ func (q *querier) unhealthyCloseAll() {
12711274
for _, mpr := range q.mappers {
12721275
// close connections async so that we don't block the querier routine that responds to updates
12731276
go func(c *connIO) {
1277+
_ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy})
12741278
err := c.Close()
12751279
if err != nil {
12761280
q.logger.Debug(q.ctx, "error closing conn while unhealthy", slog.Error(err))

enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,9 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
427427

428428
pID := uuid.UUID{5}
429429
_, resps := coordinator.Coordinate(ctx, pID, "test", agpl.AgentCoordinateeAuth{ID: pID})
430-
resp := testutil.TryReceive(ctx, t, resps)
430+
resp := testutil.RequireReceive(ctx, t, resps)
431+
require.Equal(t, CloseErrUnhealthy, resp.Error)
432+
resp = testutil.TryReceive(ctx, t, resps)
431433
require.Nil(t, resp, "channel should be closed")
432434

433435
// give the coordinator some time to process any pending work. We are

enterprise/tailnet/pgcoord_test.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,15 @@ func TestPGCoordinatorSingle_AgentInvalidIP(t *testing.T) {
118118

119119
agent := agpltest.NewAgent(ctx, t, coordinator, "agent")
120120
defer agent.Close(ctx)
121+
prefix := agpl.TailscaleServicePrefix.RandomPrefix()
121122
agent.UpdateNode(&proto.Node{
122-
Addresses: []string{
123-
agpl.TailscaleServicePrefix.RandomPrefix().String(),
124-
},
123+
Addresses: []string{prefix.String()},
125124
PreferredDerp: 10,
126125
})
127126

128127
// The agent connection should be closed immediately after sending an invalid addr
129-
agent.AssertEventuallyResponsesClosed()
128+
agent.AssertEventuallyResponsesClosed(
129+
agpl.AuthorizationError{Wrapped: agpl.InvalidNodeAddressError{Addr: prefix.Addr().String()}}.Error())
130130
assertEventuallyLost(ctx, t, store, agent.ID)
131131
}
132132

@@ -153,7 +153,8 @@ func TestPGCoordinatorSingle_AgentInvalidIPBits(t *testing.T) {
153153
})
154154

155155
// The agent connection should be closed immediately after sending an invalid addr
156-
agent.AssertEventuallyResponsesClosed()
156+
agent.AssertEventuallyResponsesClosed(
157+
agpl.AuthorizationError{Wrapped: agpl.InvalidAddressBitsError{Bits: 64}}.Error())
157158
assertEventuallyLost(ctx, t, store, agent.ID)
158159
}
159160

@@ -493,19 +494,19 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
493494
require.NoError(t, err)
494495

495496
// this closes agent2, client22, client21
496-
agent2.AssertEventuallyResponsesClosed()
497-
client22.AssertEventuallyResponsesClosed()
498-
client21.AssertEventuallyResponsesClosed()
497+
agent2.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
498+
client22.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
499+
client21.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
499500
assertEventuallyLost(ctx, t, store, agent2.ID)
500501
assertEventuallyLost(ctx, t, store, client21.ID)
501502
assertEventuallyLost(ctx, t, store, client22.ID)
502503

503504
err = coord1.Close()
504505
require.NoError(t, err)
505506
// this closes agent1, client12, client11
506-
agent1.AssertEventuallyResponsesClosed()
507-
client12.AssertEventuallyResponsesClosed()
508-
client11.AssertEventuallyResponsesClosed()
507+
agent1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
508+
client12.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
509+
client11.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
509510
assertEventuallyLost(ctx, t, store, agent1.ID)
510511
assertEventuallyLost(ctx, t, store, client11.ID)
511512
assertEventuallyLost(ctx, t, store, client12.ID)
@@ -636,12 +637,12 @@ func TestPGCoordinator_Unhealthy(t *testing.T) {
636637
}
637638
}
638639
// connected agent should be disconnected
639-
agent1.AssertEventuallyResponsesClosed()
640+
agent1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
640641

641642
// new agent should immediately disconnect
642643
agent2 := agpltest.NewAgent(ctx, t, uut, "agent2")
643644
defer agent2.Close(ctx)
644-
agent2.AssertEventuallyResponsesClosed()
645+
agent2.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
645646

646647
// next heartbeats succeed, so we are healthy
647648
for i := 0; i < 2; i++ {
@@ -836,7 +837,7 @@ func TestPGCoordinatorDual_FailedHeartbeat(t *testing.T) {
836837
// we eventually disconnect from the coordinator.
837838
err = sdb1.Close()
838839
require.NoError(t, err)
839-
p1.AssertEventuallyResponsesClosed()
840+
p1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
840841
p2.AssertEventuallyLost(p1.ID)
841842
// This basically checks that peer2 had no update
842843
// performed on their status since we are connected
@@ -891,7 +892,7 @@ func TestPGCoordinatorDual_PeerReconnect(t *testing.T) {
891892
// never send a DISCONNECTED update.
892893
err = c1.Close()
893894
require.NoError(t, err)
894-
p1.AssertEventuallyResponsesClosed()
895+
p1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
895896
p2.AssertEventuallyLost(p1.ID)
896897
// This basically checks that peer2 had no update
897898
// performed on their status since we are connected

tailnet/controllers.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,17 @@ func (c *BasicCoordination) respLoop() {
284284
return
285285
}
286286

287+
if resp.Error != "" {
288+
// ReadyForHandshake error can occur during race conditions, where we send a ReadyForHandshake message,
289+
// but the source has already disconnected from the tunnel by the time we do. So, just log at warning.
290+
if strings.HasPrefix(resp.Error, ReadyForHandshakeError) {
291+
c.logger.Warn(context.Background(), "coordination warning", slog.F("msg", resp.Error))
292+
} else {
293+
c.logger.Error(context.Background(),
294+
"coordination protocol error", slog.F("error", resp.Error))
295+
}
296+
}
297+
287298
err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates())
288299
if err != nil {
289300
c.logger.Debug(context.Background(), "failed to update peers", slog.Error(err))

tailnet/coordinator.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ const (
2424
// dropping updates
2525
ResponseBufferSize = 512
2626
// RequestBufferSize is the max number of requests to buffer per connection
27-
RequestBufferSize = 32
27+
RequestBufferSize = 32
28+
CloseErrOverwritten = "peer ID overwritten by new connection"
29+
CloseErrCoordinatorClose = "coordinator closed"
30+
ReadyForHandshakeError = "ready for handshake error"
2831
)
2932

3033
// Coordinator exchanges nodes with agents to establish connections.
@@ -97,6 +100,18 @@ var (
97100
ErrAlreadyRemoved = xerrors.New("already removed")
98101
)
99102

103+
type AuthorizationError struct {
104+
Wrapped error
105+
}
106+
107+
func (e AuthorizationError) Error() string {
108+
return fmt.Sprintf("authorization: %s", e.Wrapped.Error())
109+
}
110+
111+
func (e AuthorizationError) Unwrap() error {
112+
return e.Wrapped
113+
}
114+
100115
// NewCoordinator constructs a new in-memory connection coordinator. This
101116
// coordinator is incompatible with multiple Coder replicas as all node data is
102117
// in-memory.
@@ -161,8 +176,12 @@ func (c *coordinator) Coordinate(
161176
c.wg.Add(1)
162177
go func() {
163178
defer c.wg.Done()
164-
p.reqLoop(ctx, logger, c.core.handleRequest)
165-
err := c.core.lostPeer(p)
179+
loopErr := p.reqLoop(ctx, logger, c.core.handleRequest)
180+
closeErrStr := ""
181+
if loopErr != nil {
182+
closeErrStr = loopErr.Error()
183+
}
184+
err := c.core.lostPeer(p, closeErrStr)
166185
if xerrors.Is(err, ErrClosed) || xerrors.Is(err, ErrAlreadyRemoved) {
167186
return
168187
}
@@ -227,7 +246,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
227246
}
228247

229248
if err := pr.auth.Authorize(ctx, req); err != nil {
230-
return xerrors.Errorf("authorize request: %w", err)
249+
return AuthorizationError{Wrapped: err}
231250
}
232251

233252
if req.UpdateSelf != nil {
@@ -270,7 +289,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
270289
}
271290
}
272291
if req.Disconnect != nil {
273-
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect")
292+
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect", "")
274293
}
275294
if rfhs := req.ReadyForHandshake; rfhs != nil {
276295
err := c.handleReadyForHandshakeLocked(pr, rfhs)
@@ -298,7 +317,7 @@ func (c *core) handleReadyForHandshakeLocked(src *peer, rfhs []*proto.Coordinate
298317
// don't want to kill its connection.
299318
select {
300319
case src.resps <- &proto.CoordinateResponse{
301-
Error: fmt.Sprintf("you do not share a tunnel with %q", dstID.String()),
320+
Error: fmt.Sprintf("%s: you do not share a tunnel with %q", ReadyForHandshakeError, dstID.String()),
302321
}:
303322
default:
304323
return ErrWouldBlock
@@ -344,7 +363,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
344363
err := other.updateMappingLocked(id, n, k, reason)
345364
if err != nil {
346365
other.logger.Error(context.Background(), "failed to update mapping", slog.Error(err))
347-
c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
366+
c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to update tunnel peer mapping")
348367
}
349368
}
350369
}
@@ -360,7 +379,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
360379
err := src.updateMappingLocked(dstID, dst.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel")
361380
if err != nil {
362381
src.logger.Error(context.Background(), "failed update of tunnel src", slog.Error(err))
363-
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
382+
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update",
383+
"failed to update tunnel dest mapping")
364384
// if the source fails, then the tunnel is also removed and there is no reason to continue
365385
// processing.
366386
return err
@@ -370,7 +390,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
370390
err := dst.updateMappingLocked(src.id, src.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel")
371391
if err != nil {
372392
dst.logger.Error(context.Background(), "failed update of tunnel dst", slog.Error(err))
373-
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
393+
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update",
394+
"failed to update tunnel src mapping")
374395
}
375396
}
376397
}
@@ -381,7 +402,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
381402
err := src.updateMappingLocked(dstID, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel")
382403
if err != nil {
383404
src.logger.Error(context.Background(), "failed to update", slog.Error(err))
384-
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
405+
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel dest mapping")
385406
// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
386407
// return here.
387408
return err
@@ -391,7 +412,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
391412
err = dst.updateMappingLocked(src.id, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel")
392413
if err != nil {
393414
dst.logger.Error(context.Background(), "failed to update", slog.Error(err))
394-
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
415+
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel src mapping")
395416
// don't return here because we still want to remove the tunnel, and an error at the
396417
// destination doesn't count as an error removing the tunnel at the source.
397418
}
@@ -413,6 +434,11 @@ func (c *core) initPeer(p *peer) error {
413434
if old, ok := c.peers[p.id]; ok {
414435
// rare and interesting enough to log at Info, but it isn't an error per se
415436
old.logger.Info(context.Background(), "overwritten by new connection")
437+
select {
438+
case old.resps <- &proto.CoordinateResponse{Error: CloseErrOverwritten}:
439+
default:
440+
// pass
441+
}
416442
close(old.resps)
417443
p.overwrites = old.overwrites + 1
418444
}
@@ -433,7 +459,7 @@ func (c *core) initPeer(p *peer) error {
433459

434460
// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
435461
// all tunnels from which the removed peer is the source.
436-
func (c *core) lostPeer(p *peer) error {
462+
func (c *core) lostPeer(p *peer, closeErr string) error {
437463
c.mutex.Lock()
438464
defer c.mutex.Unlock()
439465
c.logger.Debug(context.Background(), "lostPeer", slog.F("peer_id", p.id))
@@ -443,18 +469,25 @@ func (c *core) lostPeer(p *peer) error {
443469
if existing, ok := c.peers[p.id]; !ok || existing != p {
444470
return ErrAlreadyRemoved
445471
}
446-
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost")
472+
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost", closeErr)
447473
return nil
448474
}
449475

450-
func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason string) {
476+
func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason, closeErr string) {
451477
p, ok := c.peers[id]
452478
if !ok {
453479
c.logger.Critical(context.Background(), "removed non-existent peer %s", id)
454480
return
455481
}
456482
c.updateTunnelPeersLocked(id, nil, kind, reason)
457483
c.tunnels.removeAll(id)
484+
if closeErr != "" {
485+
select {
486+
case p.resps <- &proto.CoordinateResponse{Error: closeErr}:
487+
default:
488+
// blocked, pass.
489+
}
490+
}
458491
close(p.resps)
459492
delete(c.peers, id)
460493
}
@@ -487,7 +520,8 @@ func (c *core) close() error {
487520
for id := range c.peers {
488521
// when closing, mark them as LOST so that we don't disrupt in-progress
489522
// connections.
490-
c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close")
523+
c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close",
524+
CloseErrCoordinatorClose)
491525
}
492526
return nil
493527
}

0 commit comments

Comments
 (0)