Skip to content

feat: modify coordinators to send errors and peers to log them #17467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions enterprise/tailnet/connio.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (c *connIO) recvLoop() {
select {
case <-c.coordCtx.Done():
c.logger.Debug(c.coordCtx, "exiting io recvLoop; coordinator exit")
_ = c.Enqueue(&proto.CoordinateResponse{Error: agpl.CloseErrCoordinatorClose})
return
case <-c.peerCtx.Done():
c.logger.Debug(c.peerCtx, "exiting io recvLoop; peer context canceled")
Expand All @@ -123,6 +124,9 @@ func (c *connIO) recvLoop() {
return
}
if err := c.handleRequest(req); err != nil {
if !xerrors.Is(err, errDisconnect) {
_ = c.Enqueue(&proto.CoordinateResponse{Error: err.Error()})
}
return
}
}
Expand All @@ -136,7 +140,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
err := c.auth.Authorize(c.peerCtx, req)
if err != nil {
c.logger.Warn(c.peerCtx, "unauthorized request", slog.Error(err))
return xerrors.Errorf("authorize request: %w", err)
return agpl.AuthorizationError{Wrapped: err}
}

if req.UpdateSelf != nil {
Expand Down Expand Up @@ -217,7 +221,7 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
slog.F("dst", dst.String()),
)
_ = c.Enqueue(&proto.CoordinateResponse{
Error: fmt.Sprintf("you do not share a tunnel with %q", dst.String()),
Error: fmt.Sprintf("%s: you do not share a tunnel with %q", agpl.ReadyForHandshakeError, dst.String()),
})
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion enterprise/tailnet/multiagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/enterprise/tailnet"
agpl "github.com/coder/coder/v2/tailnet"
agpltest "github.com/coder/coder/v2/tailnet/test"
"github.com/coder/coder/v2/testutil"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestPGCoordinator_MultiAgent_CoordClose(t *testing.T) {
err = coord1.Close()
require.NoError(t, err)

ma1.AssertEventuallyResponsesClosed()
ma1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
}

// TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with
Expand Down
4 changes: 4 additions & 0 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
numHandshakerWorkers = 5
dbMaxBackoff = 10 * time.Second
cleanupPeriod = time.Hour
CloseErrUnhealthy = "coordinator unhealthy"
)

// pgCoord is a postgres-backed coordinator
Expand Down Expand Up @@ -235,6 +236,7 @@ func (c *pgCoord) Coordinate(
c.logger.Info(ctx, "closed incoming coordinate call while unhealthy",
slog.F("peer_id", id),
)
resps <- &proto.CoordinateResponse{Error: CloseErrUnhealthy}
close(resps)
return reqs, resps
}
Expand Down Expand Up @@ -882,6 +884,7 @@ func (q *querier) newConn(c *connIO) {
q.mu.Lock()
defer q.mu.Unlock()
if !q.healthy {
_ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy})
err := c.Close()
// This can only happen during a narrow window where we were healthy
// when pgCoord checked before accepting the connection, but now are
Expand Down Expand Up @@ -1271,6 +1274,7 @@ func (q *querier) unhealthyCloseAll() {
for _, mpr := range q.mappers {
// close connections async so that we don't block the querier routine that responds to updates
go func(c *connIO) {
_ = c.Enqueue(&proto.CoordinateResponse{Error: CloseErrUnhealthy})
err := c.Close()
if err != nil {
q.logger.Debug(q.ctx, "error closing conn while unhealthy", slog.Error(err))
Expand Down
4 changes: 3 additions & 1 deletion enterprise/tailnet/pgcoord_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,9 @@ func TestPGCoordinatorUnhealthy(t *testing.T) {

pID := uuid.UUID{5}
_, resps := coordinator.Coordinate(ctx, pID, "test", agpl.AgentCoordinateeAuth{ID: pID})
resp := testutil.TryReceive(ctx, t, resps)
resp := testutil.RequireReceive(ctx, t, resps)
require.Equal(t, CloseErrUnhealthy, resp.Error)
resp = testutil.TryReceive(ctx, t, resps)
require.Nil(t, resp, "channel should be closed")

// give the coordinator some time to process any pending work. We are
Expand Down
31 changes: 16 additions & 15 deletions enterprise/tailnet/pgcoord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ func TestPGCoordinatorSingle_AgentInvalidIP(t *testing.T) {

agent := agpltest.NewAgent(ctx, t, coordinator, "agent")
defer agent.Close(ctx)
prefix := agpl.TailscaleServicePrefix.RandomPrefix()
agent.UpdateNode(&proto.Node{
Addresses: []string{
agpl.TailscaleServicePrefix.RandomPrefix().String(),
},
Addresses: []string{prefix.String()},
PreferredDerp: 10,
})

// The agent connection should be closed immediately after sending an invalid addr
agent.AssertEventuallyResponsesClosed()
agent.AssertEventuallyResponsesClosed(
agpl.AuthorizationError{Wrapped: agpl.InvalidNodeAddressError{Addr: prefix.Addr().String()}}.Error())
assertEventuallyLost(ctx, t, store, agent.ID)
}

Expand All @@ -153,7 +153,8 @@ func TestPGCoordinatorSingle_AgentInvalidIPBits(t *testing.T) {
})

// The agent connection should be closed immediately after sending an invalid addr
agent.AssertEventuallyResponsesClosed()
agent.AssertEventuallyResponsesClosed(
agpl.AuthorizationError{Wrapped: agpl.InvalidAddressBitsError{Bits: 64}}.Error())
assertEventuallyLost(ctx, t, store, agent.ID)
}

Expand Down Expand Up @@ -493,19 +494,19 @@ func TestPGCoordinatorDual_Mainline(t *testing.T) {
require.NoError(t, err)

// this closes agent2, client22, client21
agent2.AssertEventuallyResponsesClosed()
client22.AssertEventuallyResponsesClosed()
client21.AssertEventuallyResponsesClosed()
agent2.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
client22.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
client21.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
assertEventuallyLost(ctx, t, store, agent2.ID)
assertEventuallyLost(ctx, t, store, client21.ID)
assertEventuallyLost(ctx, t, store, client22.ID)

err = coord1.Close()
require.NoError(t, err)
// this closes agent1, client12, client11
agent1.AssertEventuallyResponsesClosed()
client12.AssertEventuallyResponsesClosed()
client11.AssertEventuallyResponsesClosed()
agent1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
client12.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
client11.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
assertEventuallyLost(ctx, t, store, agent1.ID)
assertEventuallyLost(ctx, t, store, client11.ID)
assertEventuallyLost(ctx, t, store, client12.ID)
Expand Down Expand Up @@ -636,12 +637,12 @@ func TestPGCoordinator_Unhealthy(t *testing.T) {
}
}
// connected agent should be disconnected
agent1.AssertEventuallyResponsesClosed()
agent1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)

// new agent should immediately disconnect
agent2 := agpltest.NewAgent(ctx, t, uut, "agent2")
defer agent2.Close(ctx)
agent2.AssertEventuallyResponsesClosed()
agent2.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)

// next heartbeats succeed, so we are healthy
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -836,7 +837,7 @@ func TestPGCoordinatorDual_FailedHeartbeat(t *testing.T) {
// we eventually disconnect from the coordinator.
err = sdb1.Close()
require.NoError(t, err)
p1.AssertEventuallyResponsesClosed()
p1.AssertEventuallyResponsesClosed(tailnet.CloseErrUnhealthy)
p2.AssertEventuallyLost(p1.ID)
// This basically checks that peer2 had no update
// performed on their status since we are connected
Expand Down Expand Up @@ -891,7 +892,7 @@ func TestPGCoordinatorDual_PeerReconnect(t *testing.T) {
// never send a DISCONNECTED update.
err = c1.Close()
require.NoError(t, err)
p1.AssertEventuallyResponsesClosed()
p1.AssertEventuallyResponsesClosed(agpl.CloseErrCoordinatorClose)
p2.AssertEventuallyLost(p1.ID)
// This basically checks that peer2 had no update
// performed on their status since we are connected
Expand Down
11 changes: 11 additions & 0 deletions tailnet/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,17 @@ func (c *BasicCoordination) respLoop() {
return
}

if resp.Error != "" {
// ReadyForHandshake error can occur during race conditions, where we send a ReadyForHandshake message,
// but the source has already disconnected from the tunnel by the time we do. So, just log at warning.
if strings.HasPrefix(resp.Error, ReadyForHandshakeError) {
c.logger.Warn(context.Background(), "coordination warning", slog.F("msg", resp.Error))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit fragile in that it's special handling code for a specific error. But, I'm not sure we want to add more structure to the coordination protocol error messages for just one odd-duck error message.

Without this, many tests will flake due to dropping a benign error message.

@ethanndickson what do you think?

Copy link
Member

@ethanndickson ethanndickson Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was indeed missing those log detected at level ERRORs...

All those test failures are in CLI integration tests. We already ignore error logs on coderdtest for this same reason, is it feasible to ignore agent error logs too (and only in integration tests)?

If not, this sounds fine to me as a warning.

Copy link
Contributor Author

@spikecurtis spikecurtis Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's feasible, but I worry it's overbroad.

The ready for handshake "errors" aren't "errors" by the criteria we usually use for logs, which is that we definitely know something bad happened, rather than "something bad might have happened, or it might be fine" which is where we usually put WARN logs.

So, I think this behavior is more correct, I'm just not wild about the implementation. But, also not wild about alternative implementations that make the error vs warning explicit in the protocol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to write as a one-off. If we ever need to communicate more than one type of warning using the protocol, then maybe we reconsider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what I was thinking. If we have to start special casing more than one, it will be worth while to make it explicit in the protocol.

} else {
c.logger.Error(context.Background(),
"coordination protocol error", slog.F("error", resp.Error))
}
}

err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates())
if err != nil {
c.logger.Debug(context.Background(), "failed to update peers", slog.Error(err))
Expand Down
64 changes: 49 additions & 15 deletions tailnet/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ const (
// dropping updates
ResponseBufferSize = 512
// RequestBufferSize is the max number of requests to buffer per connection
RequestBufferSize = 32
RequestBufferSize = 32
CloseErrOverwritten = "peer ID overwritten by new connection"
CloseErrCoordinatorClose = "coordinator closed"
ReadyForHandshakeError = "ready for handshake error"
)

// Coordinator exchanges nodes with agents to establish connections.
Expand Down Expand Up @@ -97,6 +100,18 @@ var (
ErrAlreadyRemoved = xerrors.New("already removed")
)

type AuthorizationError struct {
Wrapped error
}

func (e AuthorizationError) Error() string {
return fmt.Sprintf("authorization: %s", e.Wrapped.Error())
}

func (e AuthorizationError) Unwrap() error {
return e.Wrapped
}

// NewCoordinator constructs a new in-memory connection coordinator. This
// coordinator is incompatible with multiple Coder replicas as all node data is
// in-memory.
Expand Down Expand Up @@ -161,8 +176,12 @@ func (c *coordinator) Coordinate(
c.wg.Add(1)
go func() {
defer c.wg.Done()
p.reqLoop(ctx, logger, c.core.handleRequest)
err := c.core.lostPeer(p)
loopErr := p.reqLoop(ctx, logger, c.core.handleRequest)
closeErrStr := ""
if loopErr != nil {
closeErrStr = loopErr.Error()
}
err := c.core.lostPeer(p, closeErrStr)
if xerrors.Is(err, ErrClosed) || xerrors.Is(err, ErrAlreadyRemoved) {
return
}
Expand Down Expand Up @@ -227,7 +246,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
}

if err := pr.auth.Authorize(ctx, req); err != nil {
return xerrors.Errorf("authorize request: %w", err)
return AuthorizationError{Wrapped: err}
}

if req.UpdateSelf != nil {
Expand Down Expand Up @@ -270,7 +289,7 @@ func (c *core) handleRequest(ctx context.Context, p *peer, req *proto.Coordinate
}
}
if req.Disconnect != nil {
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect")
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "graceful disconnect", "")
}
if rfhs := req.ReadyForHandshake; rfhs != nil {
err := c.handleReadyForHandshakeLocked(pr, rfhs)
Expand Down Expand Up @@ -298,7 +317,7 @@ func (c *core) handleReadyForHandshakeLocked(src *peer, rfhs []*proto.Coordinate
// don't want to kill its connection.
select {
case src.resps <- &proto.CoordinateResponse{
Error: fmt.Sprintf("you do not share a tunnel with %q", dstID.String()),
Error: fmt.Sprintf("%s: you do not share a tunnel with %q", ReadyForHandshakeError, dstID.String()),
}:
default:
return ErrWouldBlock
Expand Down Expand Up @@ -344,7 +363,7 @@ func (c *core) updateTunnelPeersLocked(id uuid.UUID, n *proto.Node, k proto.Coor
err := other.updateMappingLocked(id, n, k, reason)
if err != nil {
other.logger.Error(context.Background(), "failed to update mapping", slog.Error(err))
c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
c.removePeerLocked(other.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to update tunnel peer mapping")
}
}
}
Expand All @@ -360,7 +379,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
err := src.updateMappingLocked(dstID, dst.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel")
if err != nil {
src.logger.Error(context.Background(), "failed update of tunnel src", slog.Error(err))
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update",
"failed to update tunnel dest mapping")
// if the source fails, then the tunnel is also removed and there is no reason to continue
// processing.
return err
Expand All @@ -370,7 +390,8 @@ func (c *core) addTunnelLocked(src *peer, dstID uuid.UUID) error {
err := dst.updateMappingLocked(src.id, src.node, proto.CoordinateResponse_PeerUpdate_NODE, "add tunnel")
if err != nil {
dst.logger.Error(context.Background(), "failed update of tunnel dst", slog.Error(err))
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update",
"failed to update tunnel src mapping")
}
}
}
Expand All @@ -381,7 +402,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
err := src.updateMappingLocked(dstID, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel")
if err != nil {
src.logger.Error(context.Background(), "failed to update", slog.Error(err))
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
c.removePeerLocked(src.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel dest mapping")
// removing the peer also removes all other tunnels and notifies destinations, so it's safe to
// return here.
return err
Expand All @@ -391,7 +412,7 @@ func (c *core) removeTunnelLocked(src *peer, dstID uuid.UUID) error {
err = dst.updateMappingLocked(src.id, nil, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "remove tunnel")
if err != nil {
dst.logger.Error(context.Background(), "failed to update", slog.Error(err))
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update")
c.removePeerLocked(dst.id, proto.CoordinateResponse_PeerUpdate_DISCONNECTED, "failed update", "failed to remove tunnel src mapping")
// don't return here because we still want to remove the tunnel, and an error at the
// destination doesn't count as an error removing the tunnel at the source.
}
Expand All @@ -413,6 +434,11 @@ func (c *core) initPeer(p *peer) error {
if old, ok := c.peers[p.id]; ok {
// rare and interesting enough to log at Info, but it isn't an error per se
old.logger.Info(context.Background(), "overwritten by new connection")
select {
case old.resps <- &proto.CoordinateResponse{Error: CloseErrOverwritten}:
default:
// pass
}
close(old.resps)
p.overwrites = old.overwrites + 1
}
Expand All @@ -433,7 +459,7 @@ func (c *core) initPeer(p *peer) error {

// removePeer removes and cleans up a lost peer. It updates all peers it shares a tunnel with, deletes
// all tunnels from which the removed peer is the source.
func (c *core) lostPeer(p *peer) error {
func (c *core) lostPeer(p *peer, closeErr string) error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.logger.Debug(context.Background(), "lostPeer", slog.F("peer_id", p.id))
Expand All @@ -443,18 +469,25 @@ func (c *core) lostPeer(p *peer) error {
if existing, ok := c.peers[p.id]; !ok || existing != p {
return ErrAlreadyRemoved
}
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost")
c.removePeerLocked(p.id, proto.CoordinateResponse_PeerUpdate_LOST, "lost", closeErr)
return nil
}

func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason string) {
func (c *core) removePeerLocked(id uuid.UUID, kind proto.CoordinateResponse_PeerUpdate_Kind, reason, closeErr string) {
p, ok := c.peers[id]
if !ok {
c.logger.Critical(context.Background(), "removed non-existent peer %s", id)
return
}
c.updateTunnelPeersLocked(id, nil, kind, reason)
c.tunnels.removeAll(id)
if closeErr != "" {
select {
case p.resps <- &proto.CoordinateResponse{Error: closeErr}:
default:
// blocked, pass.
}
}
close(p.resps)
delete(c.peers, id)
}
Expand Down Expand Up @@ -487,7 +520,8 @@ func (c *core) close() error {
for id := range c.peers {
// when closing, mark them as LOST so that we don't disrupt in-progress
// connections.
c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close")
c.removePeerLocked(id, proto.CoordinateResponse_PeerUpdate_LOST, "coordinator close",
CloseErrCoordinatorClose)
}
return nil
}
Expand Down
Loading
Loading