Skip to content

Commit 0361f14

Browse files
committed
fix: stop sending DeleteTailnetPeer when coordinator is unhealthy
1 parent b6359b0 commit 0361f14

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

enterprise/tailnet/pgcoord.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,17 @@ func (c *pgCoord) Coordinate(
231231
logger := c.logger.With(slog.F("peer_id", id))
232232
reqs := make(chan *proto.CoordinateRequest, agpl.RequestBufferSize)
233233
resps := make(chan *proto.CoordinateResponse, agpl.ResponseBufferSize)
234+
if !c.querier.isHealthy() {
235+
// If the coordinator is unhealthy, we don't want to hook this Coordinate call up to the
236+
// binder, as that can cause an unnecessary call to DeleteTailnetPeer when the connIO is
237+
// closed. Instead, we just close the response channel and bail out.
238+
// c.f. https://github.com/coder/coder/issues/12923
239+
c.logger.Info(ctx, "closed incoming coordinate call while unhealthy",
240+
slog.F("peer_id", id),
241+
)
242+
close(resps)
243+
return reqs, resps
244+
}
234245
cIO := newConnIO(c.ctx, ctx, logger, c.bindings, c.tunnelerCh, reqs, resps, id, name, a)
235246
err := agpl.SendCtx(c.ctx, c.newConnections, cIO)
236247
if err != nil {
@@ -842,7 +853,12 @@ func (q *querier) newConn(c *connIO) {
842853
defer q.mu.Unlock()
843854
if !q.healthy {
844855
err := c.Close()
845-
q.logger.Info(q.ctx, "closed incoming connection while unhealthy",
856+
// This can only happen during a narrow window where we were healthy
857+
// when pgCoord checked before accepting the connection, but now are
858+
// unhealthy now that we get around to processing it. Seeing a small
859+
// number of these logs is not worrying, but a large number probably
860+
// indicates something is amiss.
861+
q.logger.Warn(q.ctx, "closed incoming connection while unhealthy",
846862
slog.Error(err),
847863
slog.F("peer_id", c.UniqueID()),
848864
)
@@ -865,6 +881,12 @@ func (q *querier) newConn(c *connIO) {
865881
})
866882
}
867883

884+
func (q *querier) isHealthy() bool {
885+
q.mu.Lock()
886+
defer q.mu.Unlock()
887+
return q.healthy
888+
}
889+
868890
func (q *querier) cleanupConn(c *connIO) {
869891
logger := q.logger.With(slog.F("peer_id", c.UniqueID()))
870892
q.mu.Lock()

enterprise/tailnet/pgcoord_internal_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/coder/coder/v2/coderd/database/pubsub"
14+
agpl "github.com/coder/coder/v2/tailnet"
15+
"golang.org/x/xerrors"
16+
1317
"github.com/google/uuid"
1418
"github.com/stretchr/testify/require"
1519
"go.uber.org/mock/gomock"
@@ -291,3 +295,51 @@ func TestGetDebug(t *testing.T) {
291295
require.Equal(t, peerID, debug.Tunnels[0].SrcID)
292296
require.Equal(t, dstID, debug.Tunnels[0].DstID)
293297
}
298+
299+
// TestPGCoordinatorUnhealthy tests that when the coordinator fails to send heartbeats and is
300+
// unhealthy it disconnects any peers and does not send any extraneous database queries.
301+
func TestPGCoordinatorUnhealthy(t *testing.T) {
302+
t.Parallel()
303+
ctx := testutil.Context(t, testutil.WaitShort)
304+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
305+
306+
ctrl := gomock.NewController(t)
307+
mStore := dbmock.NewMockStore(ctrl)
308+
ps := pubsub.NewInMemory()
309+
310+
// after 3 failed heartbeats, the coordinator is unhealthy
311+
mStore.EXPECT().
312+
UpsertTailnetCoordinator(gomock.Any(), gomock.Any()).
313+
MinTimes(3).
314+
Return(database.TailnetCoordinator{}, xerrors.New("badness"))
315+
mStore.EXPECT().
316+
DeleteCoordinator(gomock.Any(), gomock.Any()).
317+
Times(1).
318+
Return(nil)
319+
// But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is
320+
// unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923
321+
322+
// these cleanup queries run, but we don't care for this test
323+
mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil)
324+
mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil)
325+
mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil)
326+
327+
coordinator, err := newPGCoordInternal(ctx, logger, ps, mStore)
328+
require.NoError(t, err)
329+
330+
require.Eventually(t, func() bool {
331+
return !coordinator.querier.isHealthy()
332+
}, testutil.WaitShort, testutil.IntervalFast)
333+
334+
pID := uuid.UUID{5}
335+
_, resps := coordinator.Coordinate(ctx, pID, "test", agpl.AgentCoordinateeAuth{ID: pID})
336+
resp := testutil.RequireRecvCtx(ctx, t, resps)
337+
require.Nil(t, resp, "channel should be closed")
338+
339+
// give the coordinator some time to process any pending work. We are
340+
// testing here that a database call is absent, so we don't want to race to
341+
// shut down the test.
342+
time.Sleep(testutil.IntervalMedium)
343+
_ = coordinator.Close()
344+
require.Eventually(t, ctrl.Satisfied, testutil.WaitShort, testutil.IntervalFast)
345+
}

0 commit comments

Comments
 (0)