Skip to content

Commit ba1b5f0

Browse files
committed
feat: add cleanup of lost tailnet peers and tunnels to PGCoordinator
1 parent df25848 commit ba1b5f0

File tree

3 files changed

+129
-11
lines changed

3 files changed

+129
-11
lines changed

enterprise/tailnet/pgcoord.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -837,11 +837,14 @@ func (m *mapper) bestMappings(mappings []mapping) map[uuid.UUID]mapping {
837837
case !ok:
838838
// no current best
839839
best[mpng.peer] = mpng
840+
841+
// NODE always beats LOST mapping, since the LOST could be from a coordinator that's
842+
// slow updating the DB, and the peer has reconnected to a different coordinator and
843+
// given a NODE mapping.
844+
case bestM.kind == proto.CoordinateResponse_PeerUpdate_LOST && mpng.kind == proto.CoordinateResponse_PeerUpdate_NODE:
845+
best[mpng.peer] = mpng
840846
case mpng.updatedAt.After(bestM.updatedAt) && mpng.kind == proto.CoordinateResponse_PeerUpdate_NODE:
841-
// newer, and it's a NODE update. If it's a LOST mapping, we only prefer it if we don't
842-
// have anything else, since we don't want LOST mappings to overwrite NODE mappings, since
843-
// they could be from a coordinator that's slow updating the DB, and the peer has reconnected
844-
// to a different coordinator and given a NODE mapping.
847+
// newer, and it's a NODE update.
845848
best[mpng.peer] = mpng
846849
}
847850
}
@@ -1720,15 +1723,22 @@ func (h *heartbeats) cleanupLoop() {
17201723
}
17211724
}
17221725

1723-
// cleanup issues a DB command to clean out any old expired coordinators state. The cleanup is idempotent, so no need
1724-
// to synchronize with other coordinators.
1726+
// cleanup issues a DB command to clean out any old expired coordinators or lost peer state. The
1727+
// cleanup is idempotent, so no need to synchronize with other coordinators.
17251728
func (h *heartbeats) cleanup() {
1729+
// the records we are attempting to clean up do no serious harm other than
1730+
// accumulating in the tables, so we don't bother retrying if it fails.
17261731
err := h.store.CleanTailnetCoordinators(h.ctx)
17271732
if err != nil {
1728-
// the records we are attempting to clean up do no serious harm other than
1729-
// accumulating in the tables, so we don't bother retrying if it fails.
17301733
h.logger.Error(h.ctx, "failed to cleanup old coordinators", slog.Error(err))
1731-
return
17321734
}
1733-
h.logger.Debug(h.ctx, "cleaned up old coordinators")
1735+
err = h.store.CleanTailnetLostPeers(h.ctx)
1736+
if err != nil {
1737+
h.logger.Error(h.ctx, "failed to cleanup lost peers", slog.Error(err))
1738+
}
1739+
err = h.store.CleanTailnetTunnels(h.ctx)
1740+
if err != nil {
1741+
h.logger.Error(h.ctx, "failed to cleanup abandoned tunnels", slog.Error(err))
1742+
}
1743+
h.logger.Debug(h.ctx, "completed cleanup")
17341744
}

enterprise/tailnet/pgcoord_internal_test.go

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ func TestHeartbeat_Cleanup(t *testing.T) {
4747
<-waitForCleanup
4848
return nil
4949
})
50+
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).MinTimes(2).DoAndReturn(func(_ context.Context) error {
51+
<-waitForCleanup
52+
return nil
53+
})
54+
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).MinTimes(2).DoAndReturn(func(_ context.Context) error {
55+
<-waitForCleanup
56+
return nil
57+
})
5058

5159
uut := &heartbeats{
5260
ctx: ctx,
@@ -56,7 +64,7 @@ func TestHeartbeat_Cleanup(t *testing.T) {
5664
}
5765
go uut.cleanupLoop()
5866

59-
for i := 0; i < 2; i++ {
67+
for i := 0; i < 6; i++ {
6068
select {
6169
case <-ctx.Done():
6270
t.Fatal("timeout")
@@ -67,6 +75,104 @@ func TestHeartbeat_Cleanup(t *testing.T) {
6775
close(waitForCleanup)
6876
}
6977

78+
// TestLostPeerCleanupQueries tests that our SQL queries to clean up lost peers do what we expect,
79+
// that is, clean up peers and associated tunnels that have been lost for over 24 hours.
80+
func TestLostPeerCleanupQueries(t *testing.T) {
81+
t.Parallel()
82+
if !dbtestutil.WillUsePostgres() {
83+
t.Skip("test only with postgres")
84+
}
85+
store, _, sqlDB := dbtestutil.NewDBWithSQLDB(t, dbtestutil.WithDumpOnFailure())
86+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
87+
defer cancel()
88+
89+
coordID := uuid.New()
90+
_, err := store.UpsertTailnetCoordinator(ctx, coordID)
91+
require.NoError(t, err)
92+
93+
peerID := uuid.New()
94+
_, err = store.UpsertTailnetPeer(ctx, database.UpsertTailnetPeerParams{
95+
ID: peerID,
96+
CoordinatorID: coordID,
97+
Node: []byte("test"),
98+
Status: database.TailnetStatusLost,
99+
})
100+
require.NoError(t, err)
101+
102+
otherID := uuid.New()
103+
_, err = store.UpsertTailnetTunnel(ctx, database.UpsertTailnetTunnelParams{
104+
CoordinatorID: coordID,
105+
SrcID: peerID,
106+
DstID: otherID,
107+
})
108+
require.NoError(t, err)
109+
110+
peers, err := store.GetAllTailnetPeers(ctx)
111+
require.NoError(t, err)
112+
require.Len(t, peers, 1)
113+
require.Equal(t, peerID, peers[0].ID)
114+
115+
tunnels, err := store.GetAllTailnetTunnels(ctx)
116+
require.NoError(t, err)
117+
require.Len(t, tunnels, 1)
118+
require.Equal(t, peerID, tunnels[0].SrcID)
119+
require.Equal(t, otherID, tunnels[0].DstID)
120+
121+
// this clean is a noop since the peer and tunnel are less than 24h old
122+
err = store.CleanTailnetLostPeers(ctx)
123+
require.NoError(t, err)
124+
err = store.CleanTailnetTunnels(ctx)
125+
require.NoError(t, err)
126+
127+
peers, err = store.GetAllTailnetPeers(ctx)
128+
require.NoError(t, err)
129+
require.Len(t, peers, 1)
130+
require.Equal(t, peerID, peers[0].ID)
131+
132+
tunnels, err = store.GetAllTailnetTunnels(ctx)
133+
require.NoError(t, err)
134+
require.Len(t, tunnels, 1)
135+
require.Equal(t, peerID, tunnels[0].SrcID)
136+
require.Equal(t, otherID, tunnels[0].DstID)
137+
138+
// set the age of the tunnel to >24h
139+
sqlDB.Exec("UPDATE tailnet_tunnels SET updated_at = $1", time.Now().Add(-25*time.Hour))
140+
141+
// this clean is still a noop since the peer hasn't been lost for 24 hours
142+
err = store.CleanTailnetLostPeers(ctx)
143+
require.NoError(t, err)
144+
err = store.CleanTailnetTunnels(ctx)
145+
require.NoError(t, err)
146+
147+
peers, err = store.GetAllTailnetPeers(ctx)
148+
require.NoError(t, err)
149+
require.Len(t, peers, 1)
150+
require.Equal(t, peerID, peers[0].ID)
151+
152+
tunnels, err = store.GetAllTailnetTunnels(ctx)
153+
require.NoError(t, err)
154+
require.Len(t, tunnels, 1)
155+
require.Equal(t, peerID, tunnels[0].SrcID)
156+
require.Equal(t, otherID, tunnels[0].DstID)
157+
158+
// set the age of the tunnel to >24h
159+
sqlDB.Exec("UPDATE tailnet_peers SET updated_at = $1", time.Now().Add(-25*time.Hour))
160+
161+
// this clean removes the peer and the associated tunnel
162+
err = store.CleanTailnetLostPeers(ctx)
163+
require.NoError(t, err)
164+
err = store.CleanTailnetTunnels(ctx)
165+
require.NoError(t, err)
166+
167+
peers, err = store.GetAllTailnetPeers(ctx)
168+
require.NoError(t, err)
169+
require.Len(t, peers, 0)
170+
171+
tunnels, err = store.GetAllTailnetTunnels(ctx)
172+
require.NoError(t, err)
173+
require.Len(t, tunnels, 0)
174+
}
175+
70176
func TestDebugTemplate(t *testing.T) {
71177
t.Parallel()
72178
if runtime.GOOS == "windows" {

enterprise/tailnet/pgcoord_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,8 @@ func TestPGCoordinator_Unhealthy(t *testing.T) {
545545
Return(database.TailnetCoordinator{}, nil)
546546
// extra calls we don't particularly care about for this test
547547
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
548+
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
549+
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
548550
mStore.EXPECT().GetTailnetTunnelPeerIDs(gomock.Any(), gomock.Any()).AnyTimes().Return(nil, nil)
549551
mStore.EXPECT().GetTailnetTunnelPeerBindings(gomock.Any(), gomock.Any()).
550552
AnyTimes().Return(nil, nil)

0 commit comments

Comments
 (0)