Skip to content

feat: support graceful disconnect in PGCoordinator #10937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: support graceful disconnect in PGCoordinator
  • Loading branch information
spikecurtis committed Nov 30, 2023
commit a000e63ffe298750f8c1e6514aa552b5740a88ff
53 changes: 35 additions & 18 deletions enterprise/tailnet/connio.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ type connIO struct {
// coordCtx is the parent context, that is, the context of the Coordinator
coordCtx context.Context
// peerCtx is the context of the connection to our peer
peerCtx context.Context
cancel context.CancelFunc
logger slog.Logger
requests <-chan *proto.CoordinateRequest
responses chan<- *proto.CoordinateResponse
bindings chan<- binding
tunnels chan<- tunnel
auth agpl.TunnelAuth
mu sync.Mutex
closed bool
peerCtx context.Context
cancel context.CancelFunc
logger slog.Logger
requests <-chan *proto.CoordinateRequest
responses chan<- *proto.CoordinateResponse
bindings chan<- binding
tunnels chan<- tunnel
auth agpl.TunnelAuth
mu sync.Mutex
closed bool
disconnected bool

name string
start int64
Expand Down Expand Up @@ -76,20 +77,29 @@ func newConnIO(coordContext context.Context,

func (c *connIO) recvLoop() {
defer func() {
// withdraw bindings & tunnels when we exit. We need to use the parent context here, since
// withdraw bindings & tunnels when we exit. We need to use the coordinator context here, since
// our own context might be canceled, but we still need to withdraw.
b := binding{
bKey: bKey(c.UniqueID()),
kind: proto.CoordinateResponse_PeerUpdate_LOST,
}
if c.disconnected {
b.kind = proto.CoordinateResponse_PeerUpdate_DISCONNECTED
}
if err := sendCtx(c.coordCtx, c.bindings, b); err != nil {
c.logger.Debug(c.coordCtx, "parent context expired while withdrawing bindings", slog.Error(err))
}
t := tunnel{
tKey: tKey{src: c.UniqueID()},
active: false,
}
if err := sendCtx(c.coordCtx, c.tunnels, t); err != nil {
c.logger.Debug(c.coordCtx, "parent context expired while withdrawing tunnels", slog.Error(err))
// only remove tunnels on graceful disconnect. If we remove tunnels for lost peers, then
// this will look like a disconnect from the peer perspective, since we query for active peers
// by using the tunnel as a join in the database
if c.disconnected {
t := tunnel{
tKey: tKey{src: c.UniqueID()},
active: false,
}
if err := sendCtx(c.coordCtx, c.tunnels, t); err != nil {
c.logger.Debug(c.coordCtx, "parent context expired while withdrawing tunnels", slog.Error(err))
}
}
}()
defer c.Close()
Expand All @@ -111,13 +121,16 @@ func (c *connIO) recvLoop() {
}
}

var errDisconnect = xerrors.New("graceful disconnect")

func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
c.logger.Debug(c.peerCtx, "got request")
if req.UpdateSelf != nil {
c.logger.Debug(c.peerCtx, "got node update", slog.F("node", req.UpdateSelf))
b := binding{
bKey: bKey(c.UniqueID()),
node: req.UpdateSelf.Node,
kind: proto.CoordinateResponse_PeerUpdate_NODE,
}
if err := sendCtx(c.coordCtx, c.bindings, b); err != nil {
c.logger.Debug(c.peerCtx, "failed to send binding", slog.Error(err))
Expand Down Expand Up @@ -169,7 +182,11 @@ func (c *connIO) handleRequest(req *proto.CoordinateRequest) error {
return err
}
}
// TODO: (spikecurtis) support Disconnect
if req.Disconnect != nil {
c.logger.Debug(c.peerCtx, "graceful disconnect")
c.disconnected = true
return errDisconnect
}
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions enterprise/tailnet/multiagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestPGCoordinator_MultiAgent(t *testing.T) {
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
assertEventuallyLost(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_UnsubscribeRace tests a single coordinator with
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestPGCoordinator_MultiAgent_UnsubscribeRace(t *testing.T) {
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
assertEventuallyLost(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_Unsubscribe tests a single coordinator with a
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestPGCoordinator_MultiAgent_Unsubscribe(t *testing.T) {
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
assertEventuallyLost(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_MultiCoordinator tests two coordinators with a
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestPGCoordinator_MultiAgent_MultiCoordinator(t *testing.T) {
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
assertEventuallyLost(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_MultiCoordinator_UpdateBeforeSubscribe tests two
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestPGCoordinator_MultiAgent_MultiCoordinator_UpdateBeforeSubscribe(t *test
require.NoError(t, agent1.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
assertEventuallyLost(ctx, t, store, agent1.id)
}

// TestPGCoordinator_MultiAgent_TwoAgents tests three coordinators with a
Expand Down Expand Up @@ -344,5 +344,5 @@ func TestPGCoordinator_MultiAgent_TwoAgents(t *testing.T) {
require.NoError(t, agent2.close())

assertEventuallyNoClientsForAgent(ctx, t, store, agent1.id)
assertEventuallyNoAgents(ctx, t, store, agent1.id)
assertEventuallyLost(ctx, t, store, agent1.id)
}
Loading