diff --git a/enterprise/tailnet/pgcoord.go b/enterprise/tailnet/pgcoord.go index aabb21eef6b28..aecdcde828d78 100644 --- a/enterprise/tailnet/pgcoord.go +++ b/enterprise/tailnet/pgcoord.go @@ -231,6 +231,17 @@ func (c *pgCoord) Coordinate( logger := c.logger.With(slog.F("peer_id", id)) reqs := make(chan *proto.CoordinateRequest, agpl.RequestBufferSize) resps := make(chan *proto.CoordinateResponse, agpl.ResponseBufferSize) + if !c.querier.isHealthy() { + // If the coordinator is unhealthy, we don't want to hook this Coordinate call up to the + // binder, as that can cause an unnecessary call to DeleteTailnetPeer when the connIO is + // closed. Instead, we just close the response channel and bail out. + // c.f. https://github.com/coder/coder/issues/12923 + c.logger.Info(ctx, "closed incoming coordinate call while unhealthy", + slog.F("peer_id", id), + ) + close(resps) + return reqs, resps + } cIO := newConnIO(c.ctx, ctx, logger, c.bindings, c.tunnelerCh, reqs, resps, id, name, a) err := agpl.SendCtx(c.ctx, c.newConnections, cIO) if err != nil { @@ -842,7 +853,12 @@ func (q *querier) newConn(c *connIO) { defer q.mu.Unlock() if !q.healthy { err := c.Close() - q.logger.Info(q.ctx, "closed incoming connection while unhealthy", + // This can only happen during a narrow window where we were healthy + // when pgCoord checked before accepting the connection, but now are + // unhealthy now that we get around to processing it. Seeing a small + // number of these logs is not worrying, but a large number probably + // indicates something is amiss. + q.logger.Warn(q.ctx, "closed incoming connection while unhealthy", slog.Error(err), slog.F("peer_id", c.UniqueID()), ) @@ -865,6 +881,12 @@ func (q *querier) newConn(c *connIO) { }) } +func (q *querier) isHealthy() bool { + q.mu.Lock() + defer q.mu.Unlock() + return q.healthy +} + func (q *querier) cleanupConn(c *connIO) { logger := q.logger.With(slog.F("peer_id", c.UniqueID())) q.mu.Lock() diff --git a/enterprise/tailnet/pgcoord_internal_test.go b/enterprise/tailnet/pgcoord_internal_test.go index d5b79d6225d2c..b1bfb371f0959 100644 --- a/enterprise/tailnet/pgcoord_internal_test.go +++ b/enterprise/tailnet/pgcoord_internal_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "golang.org/x/xerrors" gProto "google.golang.org/protobuf/proto" "cdr.dev/slog" @@ -21,6 +22,8 @@ import ( "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbmock" "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/database/pubsub" + agpl "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/testutil" ) @@ -291,3 +294,51 @@ func TestGetDebug(t *testing.T) { require.Equal(t, peerID, debug.Tunnels[0].SrcID) require.Equal(t, dstID, debug.Tunnels[0].DstID) } + +// TestPGCoordinatorUnhealthy tests that when the coordinator fails to send heartbeats and is +// unhealthy it disconnects any peers and does not send any extraneous database queries. +func TestPGCoordinatorUnhealthy(t *testing.T) { + t.Parallel() + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + + ctrl := gomock.NewController(t) + mStore := dbmock.NewMockStore(ctrl) + ps := pubsub.NewInMemory() + + // after 3 failed heartbeats, the coordinator is unhealthy + mStore.EXPECT(). + UpsertTailnetCoordinator(gomock.Any(), gomock.Any()). + MinTimes(3). + Return(database.TailnetCoordinator{}, xerrors.New("badness")) + mStore.EXPECT(). + DeleteCoordinator(gomock.Any(), gomock.Any()). + Times(1). + Return(nil) + // But, in particular we DO NOT want the coordinator to call DeleteTailnetPeer, as this is + // unnecessary and can spam the database. c.f. https://github.com/coder/coder/issues/12923 + + // these cleanup queries run, but we don't care for this test + mStore.EXPECT().CleanTailnetCoordinators(gomock.Any()).AnyTimes().Return(nil) + mStore.EXPECT().CleanTailnetLostPeers(gomock.Any()).AnyTimes().Return(nil) + mStore.EXPECT().CleanTailnetTunnels(gomock.Any()).AnyTimes().Return(nil) + + coordinator, err := newPGCoordInternal(ctx, logger, ps, mStore) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return !coordinator.querier.isHealthy() + }, testutil.WaitShort, testutil.IntervalFast) + + pID := uuid.UUID{5} + _, resps := coordinator.Coordinate(ctx, pID, "test", agpl.AgentCoordinateeAuth{ID: pID}) + resp := testutil.RequireRecvCtx(ctx, t, resps) + require.Nil(t, resp, "channel should be closed") + + // give the coordinator some time to process any pending work. We are + // testing here that a database call is absent, so we don't want to race to + // shut down the test. + time.Sleep(testutil.IntervalMedium) + _ = coordinator.Close() + require.Eventually(t, ctrl.Satisfied, testutil.WaitShort, testutil.IntervalFast) +}