Skip to content

Commit 9cc149f

Browse files
committed
feat: modify coordinators to send errors and peers to log them
1 parent 6e0e29a commit 9cc149f

File tree

12 files changed

+126
-49
lines changed

12 files changed

+126
-49
lines changed

enterprise/tailnet/connio.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (c *connIO) recvLoop() {
113113
select {
114114
case <-c.coordCtx.Done():
115115
c.logger.Debug(c.coordCtx, "exiting io recvLoop; coordinator exit")
116+
_ = c.Enqueue(&proto.CoordinateResponse{Error: agpl.CloseErrCoordinatorClose})
116117
return
117118
case <-c.peerCtx.Done():
118119
c.logger.Debug(c.peerCtx, "exiting io recvLoop; peer context canceled")
@@ -123,6 +124,9 @@ func (c *connIO) recvLoop() {
123124
return
124125
}
125126
if err := c.handleRequest(req); err != nil {
127+
if !xerrors.Is(err, errDisconnect) {
128+
_ = c.Enqueue(&proto.CoordinateResponse{Error: err.Error()})
129+
}
126130
return
127131
}
128132
}
@@ -136,7 +140,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
136140
err := c.auth.Authorize(c.peerCtx, req)
137141
if err != nil {
138142
c.logger.Warn(c.peerCtx, "unauthorized request", slog.Error(err))
139-
return xerrors.Errorf("authorize request: %w", err)
143+
return agpl.AuthorizationError{Wrapped: err}
140144
}
141145

142146
if req.UpdateSelf != nil {

enterprise/tailnet/multiagent_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"cdr.dev/slog/sloggers/slogtest"
1111
"github.com/coder/coder/v2/coderd/database/dbtestutil"
1212
"github.com/coder/coder/v2/enterprise/tailnet"
13+
agpl "github.com/coder/coder/v2/tailnet"
1314
agpltest "github.com/coder/coder/v2/tailnet/test"
1415
"github.com/coder/coder/v2/testutil"
1516
)
@@ -77,7 +78,7 @@ func TestPGCoordinator_MultiAgent_CoordClose(t *testing.T) {
7778
err = coord1.Close()
7879
require.NoError(t, err)
7980

80-
ma1.AssertEventuallyResponsesClosed()
81+
ma1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
8182
}
8283

8384
// TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with

enterprise/tailnet/pgcoord.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const (
3737
numHandshakerWorkers = 5
3838
dbMaxBackoff = 10 * time.Second
3939
cleanupPeriod = time.Hour
40+
CloseErrUnhealthy = "coordinator unhealthy"
4041
)
4142

4243
// pgCoord is a postgres-backed coordinator
@@ -235,6 +236,7 @@ func (c *pgCoord) Coordinate(
235236
c.logger.Info(ctx, "closed incoming coordinate call while unhealthy",
236237
slog.F("peer_id", id),
237238
)
239+
resps <- &proto.CoordinateResponse{Error: CloseErrUnhealthy}
238240
close(resps)
239241
return reqs, resps
240242
}
@@ -882,6 +884,7 @@ func (q *querier) newConn(c *connIO) {
882884
q.mu.Lock()
883885
defer q.mu.Unlock()
884886
if !q.healthy {
887+
_ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy})
885888
err := c.Close()
886889
// This can only happen during a narrow window where we were healthy
887890
// when pgCoord checked before accepting the connection, but now are
@@ -1271,6 +1274,7 @@ func (q *querier) unhealthyCloseAll() {
12711274
for _, mpr := range q.mappers {
12721275
// close connections async so that we don't block the querier routine that responds to updates
12731276
go func(c *connIO) {
1277+
_ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy})
12741278
err := c.Close()
12751279
if err != nil {
12761280
q.logger.Debug(q.ctx, "error closing conn while unhealthy", slog.Error(err))

enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,9 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {
427427

428428
pID := uuid.UUID{5}
429429
_, resps := coordinator.Coordinate(ctx, pID, "test", agpl.AgentCoordinateeAuth{ID: pID})
430-
resp := testutil.TryReceive(ctx, t, resps)
430+
resp := testutil.RequireReceive(ctx, t, resps)
431+
require.Equal(t, CloseErrUnhealthy, resp.Error)
432+
resp = testutil.TryReceive(ctx, t, resps)
431433
require.Nil(t, resp, "channel should be closed")
432434

433435
// give the coordinator some time to process any pending work. We are

enterprise/tailnet/pgcoord_test.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,15 @@ func TestPGCoordinatorSingle_AgentInvalidIP(t *testing.T) {
118118

119119
agent := agpltest.NewAgent(ctx, t, coordinator, "agent")
120120
defer agent.Close(ctx)
121+
prefix := agpl.TailscaleServicePrefix.RandomPrefix()
121122
agent.UpdateNode(&proto.Node{
122-
Addresses: []string{
123-
agpl.TailscaleServicePrefix.RandomPrefix().String(),
124-
},
123+
Addresses: []string{prefix.String()},
125124
PreferredDerp: 10,
126125
})
127126

128127
// The agent connection should be closed immediately after sending an invalid addr
129-
agent.AssertEventuallyResponsesClosed()
128+
agent.AssertEventuallyResponsesClosed(
129+
agpl.AuthorizationError{Wrapped: agpl.InvalidNodeAddressError{Addr: prefix.Addr().String()}}.Error())
130130
assertEventuallyLost(ctx, t, store, agent.ID)
131131
}
132132

@@ -153,7 +153,8 @@ func TestPGCoordinatorSingle_AgentInvalidIPBits(t *testing.T) {
153153
})
154154

155155
// The agent connection should be closed immediately after sending an invalid addr
156-
agent.AssertEventuallyResponsesClosed()
156+
agent.AssertEventuallyResponsesClosed(
157+
agpl.AuthorizationError{Wrapped: agpl.InvalidAddressBitsError{Bits: 64}}.Error())
157158
assertEventuallyLost(ctx, t, store, agent.ID)
158159
}
159160

@@ -493,19 +494,19 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
493494
require.NoError(t, err)
494495

495496
// this closes agent2, client22, client21
496-
agent2.AssertEventuallyResponsesClosed()
497-
client22.AssertEventuallyResponsesClosed()
498-
client21.AssertEventuallyResponsesClosed()
497+
agent2.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
498+
client22.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
499+
client21.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
499500
assertEventuallyLost(ctx, t, store, agent2.ID)
500501
assertEventuallyLost(ctx, t, store, client21.ID)
501502
assertEventuallyLost(ctx, t, store, client22.ID)
502503

503504
err = coord1.Close()
504505
require.NoError(t, err)
505506
// this closes agent1, client12, client11
506-
agent1.AssertEventuallyResponsesClosed()
507-
client12.AssertEventuallyResponsesClosed()
508-
client11.AssertEventuallyResponsesClosed()
507+
agent1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
508+
client12.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
509+
client11.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
509510
assertEventuallyLost(ctx, t, store, agent1.ID)
510511
assertEventuallyLost(ctx, t, store, client11.ID)
511512
assertEventuallyLost(ctx, t, store, client12.ID)
@@ -636,12 +637,12 @@ func TestPGCoordinator_Unhealthy(t *testing.T) {
636637
}
637638
}
638639
// connected agent should be disconnected
639-
agent1.AssertEventuallyResponsesClosed()
640+
agent1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
640641

641642
// new agent should immediately disconnect
642643
agent2 := agpltest.NewAgent(ctx, t, uut, "agent2")
643644
defer agent2.Close(ctx)
644-
agent2.AssertEventuallyResponsesClosed()
645+
agent2.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
645646

646647
// next heartbeats succeed, so we are healthy
647648
for i := 0; i < 2; i++ {
@@ -836,7 +837,7 @@ func TestPGCoordinatorDual_FailedHeartbeat(t *testing.T) {
836837
// we eventually disconnect from the coordinator.
837838
err = sdb1.Close()
838839
require.NoError(t, err)
839-
p1.AssertEventuallyResponsesClosed()
840+
p1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
840841
p2.AssertEventuallyLost(p1.ID)
841842
// This basically checks that peer2 had no update
842843
// performed on their status since we are connected
@@ -891,7 +892,7 @@ func TestPGCoordinatorDual_PeerReconnect(t *testing.T) {
891892
// never send a DISCONNECTED update.
892893
err = c1.Close()
893894
require.NoError(t, err)
894-
p1.AssertEventuallyResponsesClosed()
895+
p1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
895896
p2.AssertEventuallyLost(p1.ID)
896897
// This basically checks that peer2 had no update
897898
// performed on their status since we are connected

tailnet/controllers.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,11 @@ func (c *BasicCoordination) respLoop() {
284284
return
285285
}
286286

287+
if resp.Error != "" {
288+
c.logger.Error(context.Background(),
289+
"coordination protocol error", slog.F("error", resp.Error))
290+
}
291+
287292
err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates())
288293
if err != nil {
289294
c.logger.Debug(context.Background(), "failed to update peers", slog.Error(err))

tailnet/coordinator.go

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ const (
2424
// dropping updates
2525
ResponseBufferSize = 512
2626
// RequestBufferSize is the max number of requests to buffer per connection
27-
RequestBufferSize = 32
27+
RequestBufferSize = 32
28+
CloseErrOverwritten = "peer ID overwritten by new connection"
29+
CloseErrCoordinatorClose = "coordinator closed"
2830
)
2931

3032
// Coordinator exchanges nodes with agents to establish connections.
@@ -97,6 +99,18 @@ var (
9799
ErrAlreadyRemoved = xerrors.New("already removed")
98100
)
99101

102+
type AuthorizationError struct {
103+
Wrapped error
104+
}
105+
106+
func (e AuthorizationError) Error() string {
107+
return fmt.Sprintf("authorization: %s", e.Wrapped.Error())
108+
}
109+
110+
func (e AuthorizationError) Unwrap() error {
111+
return e.Wrapped
112+
}
113+
100114
// NewCoordinator constructs a new in-memory connection coordinator. This
101115
// coordinator is incompatible with multiple Coder replicas as all node data is
102116
// in-memory.
@@ -161,8 +175,12 @@ func (c *coordinator) Coordinate(
161175
c.wg.Add(1)
162176
go func() {
163177
defer c.wg.Done()
164-
p.reqLoop(ctx, logger, c.core.handleRequest)
165-
err := c.core.lostPeer(p)
178+
loopErr := p.reqLoop(ctx, logger, c.core.handleRequest)
179+
closeErrStr := ""
180+
if loopErr != nil {
181+
closeErrStr = loopErr.Error()
182+
}
183+
err := c.core.lostPeer(p, closeErrStr)
166184
if xerrors.Is(err, ErrClosed) || xerrors.Is(err, ErrAlreadyRemoved) {
167185
return
168186
}
@@ -227,7 +245,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
227245
}
228246

229247
if err := pr.auth.Authorize(ctx, req); err != nil {
230-
return xerrors.Errorf("authorize request: %w", err)
248+
return AuthorizationError{Wrapped: err}
231249
}
232250

233251
if req.UpdateSelf != nil {
@@ -270,7 +288,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
270288
}
271289
}
272290
if req.Disconnect != nil {
273-
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect")
291+
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect", "")
274292
}
275293
if rfhs := req.ReadyForHandshake; rfhs != nil {
276294
err := c.handleReadyForHandshakeLocked(pr, rfhs)
@@ -344,7 +362,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
344362
err := other.updateMappingLocked(id, n, k, reason)
345363
if err != nil {
346364
other.logger.Error(context.Background(), "failed to update mapping", slog.Error(err))
347-
c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
365+
c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to update tunnel peer mapping")
348366
}
349367
}
350368
}
@@ -360,7 +378,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
360378
err := src.updateMappingLocked(dstID, dst.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel")
361379
if err != nil {
362380
src.logger.Error(context.Background(), "failed update of tunnel src", slog.Error(err))
363-
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
381+
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update",
382+
"failed to update tunnel dest mapping")
364383
// if the source fails, then the tunnel is also removed and there is no reason to continue
365384
// processing.
366385
return err
@@ -370,7 +389,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
370389
err := dst.updateMappingLocked(src.id, src.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel")
371390
if err != nil {
372391
dst.logger.Error(context.Background(), "failed update of tunnel dst", slog.Error(err))
373-
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
392+
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update",
393+
"failed to update tunnel src mapping")
374394
}
375395
}
376396
}
@@ -381,7 +401,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
381401
err := src.updateMappingLocked(dstID, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel")
382402
if err != nil {
383403
src.logger.Error(context.Background(), "failed to update", slog.Error(err))
384-
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
404+
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel dest mapping")
385405
// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
386406
// return here.
387407
return err
@@ -391,7 +411,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
391411
err = dst.updateMappingLocked(src.id, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel")
392412
if err != nil {
393413
dst.logger.Error(context.Background(), "failed to update", slog.Error(err))
394-
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
414+
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel src mapping")
395415
// don't return here because we still want to remove the tunnel, and an error at the
396416
// destination doesn't count as an error removing the tunnel at the source.
397417
}
@@ -413,6 +433,11 @@ func (c *core) initPeer(p *peer) error {
413433
if old, ok := c.peers[p.id]; ok {
414434
// rare and interesting enough to log at Info, but it isn't an error per se
415435
old.logger.Info(context.Background(), "overwritten by new connection")
436+
select {
437+
case old.resps <- &proto.CoordinateResponse{Error: CloseErrOverwritten}:
438+
default:
439+
// pass
440+
}
416441
close(old.resps)
417442
p.overwrites = old.overwrites + 1
418443
}
@@ -433,7 +458,7 @@ func (c *core) initPeer(p *peer) error {
433458

434459
// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
435460
// all tunnels from which the removed peer is the source.
436-
func (c *core) lostPeer(p *peer) error {
461+
func (c *core) lostPeer(p *peer, closeErr string) error {
437462
c.mutex.Lock()
438463
defer c.mutex.Unlock()
439464
c.logger.Debug(context.Background(), "lostPeer", slog.F("peer_id", p.id))
@@ -443,18 +468,25 @@ func (c *core) lostPeer(p *peer) error {
443468
if existing, ok := c.peers[p.id]; !ok || existing != p {
444469
return ErrAlreadyRemoved
445470
}
446-
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost")
471+
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost", closeErr)
447472
return nil
448473
}
449474

450-
func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason string) {
475+
func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason, closeErr string) {
451476
p, ok := c.peers[id]
452477
if !ok {
453478
c.logger.Critical(context.Background(), "removed non-existent peer %s", id)
454479
return
455480
}
456481
c.updateTunnelPeersLocked(id, nil, kind, reason)
457482
c.tunnels.removeAll(id)
483+
if closeErr != "" {
484+
select {
485+
case p.resps <- &proto.CoordinateResponse{Error: closeErr}:
486+
default:
487+
// blocked, pass.
488+
}
489+
}
458490
close(p.resps)
459491
delete(c.peers, id)
460492
}
@@ -487,7 +519,8 @@ func (c *core) close() error {
487519
for id := range c.peers {
488520
// when closing, mark them as LOST so that we don't disrupt in-progress
489521
// connections.
490-
c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close")
522+
c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close",
523+
CloseErrCoordinatorClose)
491524
}
492525
return nil
493526
}

0 commit comments

Comments
 (0)