From a8ac20594a0f14dcc984b45d6b2d72aa79845d60 Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Wed, 1 May 2024 21:10:58 +0000 Subject: [PATCH 1/4] fix(enterprise): mark nodes from unhealthy coordinators as lost Instead of removing the mappings of unhealthy coordinators entirely, mark them as lost instead. This prevents peers from disappearing from other peers if a coordinator misses a heartbeat. --- enterprise/tailnet/pgcoord.go | 13 +++++-- enterprise/tailnet/pgcoord_internal_test.go | 40 +++++++++++++++++++-- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/enterprise/tailnet/pgcoord.go b/enterprise/tailnet/pgcoord.go index 390e13621fff0..1a9dc88378b03 100644 --- a/enterprise/tailnet/pgcoord.go +++ b/enterprise/tailnet/pgcoord.go @@ -1485,10 +1485,17 @@ func (h *heartbeats) filter(mappings []mapping) []mapping { ok := m.coordinator == h.self if !ok { _, ok = h.coordinators[m.coordinator] + if !ok { + // If a mapping exists to a coordinator lost to heartbeats, + // still add the mapping as LOST. If a coordinator misses + // heartbeats but a client is still connected to it, this may be + // the only mapping available for it. Newer mappings will take + // precedence. + m.kind = proto.CoordinateResponse_PeerUpdate_LOST + } } - if ok { - out = append(out, m) - } + + out = append(out, m) } return out } diff --git a/enterprise/tailnet/pgcoord_internal_test.go b/enterprise/tailnet/pgcoord_internal_test.go index b1bfb371f0959..53fd61d73f066 100644 --- a/enterprise/tailnet/pgcoord_internal_test.go +++ b/enterprise/tailnet/pgcoord_internal_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "golang.org/x/xerrors" @@ -33,9 +34,9 @@ import ( // make update-golden-files var UpdateGoldenFiles = flag.Bool("update", false, "update .golden files") -// TestHeartbeat_Cleanup is internal so that we can overwrite the cleanup period and not wait an hour for the timed +// TestHeartbeats_Cleanup is internal so that we can overwrite the cleanup period and not wait an hour for the timed // cleanup. -func TestHeartbeat_Cleanup(t *testing.T) { +func TestHeartbeats_Cleanup(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) @@ -78,6 +79,41 @@ func TestHeartbeat_Cleanup(t *testing.T) { close(waitForCleanup) } +func TestHeartbeats_LostCoordinator_MarkLost(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + mStore := dbmock.NewMockStore(ctrl) + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + uut := &heartbeats{ + ctx: ctx, + logger: logger, + store: mStore, + cleanupPeriod: time.Millisecond, + coordinators: map[uuid.UUID]time.Time{ + uuid.New(): time.Now(), + }, + } + + mpngs := []mapping{{ + peer: uuid.New(), + coordinator: uuid.New(), + updatedAt: time.Now(), + node: &proto.Node{}, + kind: proto.CoordinateResponse_PeerUpdate_NODE, + }} + + // Filter should still return the mapping without a coordinator, but marked + // as LOST. + got := uut.filter(mpngs) + require.Len(t, got, 1) + assert.Equal(t, proto.CoordinateResponse_PeerUpdate_LOST, got[0].kind) +} + // TestLostPeerCleanupQueries tests that our SQL queries to clean up lost peers do what we expect, // that is, clean up peers and associated tunnels that have been lost for over 24 hours. func TestLostPeerCleanupQueries(t *testing.T) { From 7f8ca1d4c1e40ad42b61a5a0a5f6716b1c3bf08f Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Thu, 2 May 2024 22:43:37 +0000 Subject: [PATCH 2/4] add pgcoord test --- enterprise/tailnet/pgcoord_test.go | 70 +++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/enterprise/tailnet/pgcoord_test.go b/enterprise/tailnet/pgcoord_test.go index b27db149f634b..395e3aef37ef6 100644 --- a/enterprise/tailnet/pgcoord_test.go +++ b/enterprise/tailnet/pgcoord_test.go @@ -29,6 +29,7 @@ import ( "github.com/coder/coder/v2/enterprise/tailnet" agpl "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet/proto" + "github.com/coder/coder/v2/tailnet/test" agpltest "github.com/coder/coder/v2/tailnet/test" "github.com/coder/coder/v2/testutil" ) @@ -415,6 +416,55 @@ func TestPGCoordinatorSingle_MissedHeartbeats(t *testing.T) { assertEventuallyLost(ctx, t, store, client.id) } +func TestPGCoordinatorSingle_MissedHeartbeats_NoDrop(t *testing.T) { + t.Parallel() + if !dbtestutil.WillUsePostgres() { + t.Skip("test only with postgres") + } + store, ps := dbtestutil.NewDB(t) + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitSuperLong) + defer cancel() + logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) + + coordinator, err := tailnet.NewPGCoord(ctx, logger, ps, store) + require.NoError(t, err) + defer coordinator.Close() + + agent := test.NewPeer(ctx, t, coordinator, "agent") + defer agent.Close(ctx) + + client := test.NewPeer(ctx, t, coordinator, "client") + defer client.Close(ctx) + client.AddTunnel(agent.ID) + + client.UpdateDERP(11) + agent.AssertEventuallyHasDERP(client.ID, 11) + + // simulate a second coordinator via DB calls only --- our goal is to test + // broken heart-beating, so we can't use a real coordinator + fCoord2 := &fakeCoordinator{ + ctx: ctx, + t: t, + store: store, + id: uuid.New(), + } + // simulate a single heartbeat, the coordinator is healthy + fCoord2.heartbeat() + + fCoord2.agentNode(agent.ID, &agpl.Node{PreferredDERP: 12}) + // since it's healthy the client should get the new node. + client.AssertEventuallyHasDERP(agent.ID, 12) + + // the heartbeat should then timeout and we'll get sent a LOST update, NOT a + // disconnect. + client.AssertEventuallyLost(agent.ID) + + agent.Close(ctx) + client.Close(ctx) + + assertEventuallyLost(ctx, t, store, client.ID) +} + func TestPGCoordinatorSingle_SendsHeartbeats(t *testing.T) { t.Parallel() if !dbtestutil.WillUsePostgres() { @@ -857,6 +907,16 @@ func newTestAgent(t *testing.T, coord agpl.CoordinatorV1, name string, id ...uui return a } +func newTestClient(t *testing.T, coord agpl.CoordinatorV1, agentID uuid.UUID, id ...uuid.UUID) *testConn { + c := newTestConn(id) + go func() { + err := coord.ServeClient(c.serverWS, c.id, agentID) + assert.NoError(t, err) + close(c.closeChan) + }() + return c +} + func (c *testConn) close() error { return c.ws.Close() } @@ -902,16 +962,6 @@ func (c *testConn) waitForClose(ctx context.Context, t *testing.T) { } } -func newTestClient(t *testing.T, coord agpl.CoordinatorV1, agentID uuid.UUID, id ...uuid.UUID) *testConn { - c := newTestConn(id) - go func() { - err := coord.ServeClient(c.serverWS, c.id, agentID) - assert.NoError(t, err) - close(c.closeChan) - }() - return c -} - func assertEventuallyHasDERPs(ctx context.Context, t *testing.T, c *testConn, expected ...int) { t.Helper() for { From 7796f52c3bf9ba0c3bd6354d24bb40b4f0cd35a0 Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Fri, 3 May 2024 18:54:55 +0000 Subject: [PATCH 3/4] fixup! add pgcoord test --- enterprise/tailnet/pgcoord_test.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/enterprise/tailnet/pgcoord_test.go b/enterprise/tailnet/pgcoord_test.go index 395e3aef37ef6..118c922010992 100644 --- a/enterprise/tailnet/pgcoord_test.go +++ b/enterprise/tailnet/pgcoord_test.go @@ -430,15 +430,13 @@ func TestPGCoordinatorSingle_MissedHeartbeats_NoDrop(t *testing.T) { require.NoError(t, err) defer coordinator.Close() - agent := test.NewPeer(ctx, t, coordinator, "agent") - defer agent.Close(ctx) + agentID := uuid.New() client := test.NewPeer(ctx, t, coordinator, "client") defer client.Close(ctx) - client.AddTunnel(agent.ID) + client.AddTunnel(agentID) client.UpdateDERP(11) - agent.AssertEventuallyHasDERP(client.ID, 11) // simulate a second coordinator via DB calls only --- our goal is to test // broken heart-beating, so we can't use a real coordinator @@ -451,15 +449,14 @@ func TestPGCoordinatorSingle_MissedHeartbeats_NoDrop(t *testing.T) { // simulate a single heartbeat, the coordinator is healthy fCoord2.heartbeat() - fCoord2.agentNode(agent.ID, &agpl.Node{PreferredDERP: 12}) + fCoord2.agentNode(agentID, &agpl.Node{PreferredDERP: 12}) // since it's healthy the client should get the new node. - client.AssertEventuallyHasDERP(agent.ID, 12) + client.AssertEventuallyHasDERP(agentID, 12) // the heartbeat should then timeout and we'll get sent a LOST update, NOT a // disconnect. - client.AssertEventuallyLost(agent.ID) + client.AssertEventuallyLost(agentID) - agent.Close(ctx) client.Close(ctx) assertEventuallyLost(ctx, t, store, client.ID) From 59caa04258ec9c843e7ac22d8dba097a44c23376 Mon Sep 17 00:00:00 2001 From: Colin Adler Date: Fri, 3 May 2024 18:58:15 +0000 Subject: [PATCH 4/4] fixup! add pgcoord test --- enterprise/tailnet/pgcoord_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/enterprise/tailnet/pgcoord_test.go b/enterprise/tailnet/pgcoord_test.go index 118c922010992..5bd722533dc39 100644 --- a/enterprise/tailnet/pgcoord_test.go +++ b/enterprise/tailnet/pgcoord_test.go @@ -29,7 +29,6 @@ import ( "github.com/coder/coder/v2/enterprise/tailnet" agpl "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet/proto" - "github.com/coder/coder/v2/tailnet/test" agpltest "github.com/coder/coder/v2/tailnet/test" "github.com/coder/coder/v2/testutil" ) @@ -432,7 +431,7 @@ func TestPGCoordinatorSingle_MissedHeartbeats_NoDrop(t *testing.T) { agentID := uuid.New() - client := test.NewPeer(ctx, t, coordinator, "client") + client := agpltest.NewPeer(ctx, t, coordinator, "client") defer client.Close(ctx) client.AddTunnel(agentID)