Skip to content

fix: make PGCoordinator close connections when unhealthy #9125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ issues:
run:
skip-dirs:
- node_modules
- .git
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but I noticed that we're linting this dir and it can cause problems if someone names their branch e.g. tools.go

skip-files:
- scripts/rules.go
timeout: 10m
Expand Down
101 changes: 91 additions & 10 deletions enterprise/tailnet/pgcoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,12 @@ type querier struct {

workQ *workQ[mKey]
heartbeats *heartbeats
updates <-chan struct{}
updates <-chan hbUpdate

mu sync.Mutex
mappers map[mKey]*countedMapper
conns map[*connIO]struct{}
healthy bool
}

type countedMapper struct {
Expand All @@ -604,7 +606,7 @@ func newQuerier(
self uuid.UUID, newConnections chan *connIO, numWorkers int,
firstHeartbeat chan<- struct{},
) *querier {
updates := make(chan struct{})
updates := make(chan hbUpdate)
q := &querier{
ctx: ctx,
logger: logger.Named("querier"),
Expand All @@ -614,7 +616,9 @@ func newQuerier(
workQ: newWorkQ[mKey](ctx),
heartbeats: newHeartbeats(ctx, logger, ps, store, self, updates, firstHeartbeat),
mappers: make(map[mKey]*countedMapper),
conns: make(map[*connIO]struct{}),
updates: updates,
healthy: true, // assume we start healthy
}
go q.subscribe()
go q.handleConnIO()
Expand All @@ -639,6 +643,15 @@ func (q *querier) handleConnIO() {
func (q *querier) newConn(c *connIO) {
q.mu.Lock()
defer q.mu.Unlock()
if !q.healthy {
err := c.updates.Close()
q.logger.Info(q.ctx, "closed incoming connection while unhealthy",
slog.Error(err),
slog.F("agent_id", c.agent),
slog.F("client_id", c.client),
)
return
}
mk := mKey{
agent: c.agent,
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself
Expand All @@ -661,13 +674,15 @@ func (q *querier) newConn(c *connIO) {
return
}
cm.count++
q.conns[c] = struct{}{}
go q.cleanupConn(c)
}

func (q *querier) cleanupConn(c *connIO) {
<-c.ctx.Done()
q.mu.Lock()
defer q.mu.Unlock()
delete(q.conns, c)
mk := mKey{
agent: c.agent,
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself
Expand Down Expand Up @@ -911,8 +926,18 @@ func (q *querier) handleUpdates() {
select {
case <-q.ctx.Done():
return
case <-q.updates:
q.updateAll()
case u := <-q.updates:
if u.filter == filterUpdateUpdated {
q.updateAll()
}
if u.health == healthUpdateUnhealthy {
q.unhealthyCloseAll()
continue
}
if u.health == healthUpdateHealthy {
q.setHealthy()
continue
}
}
}
}
Expand All @@ -932,6 +957,30 @@ func (q *querier) updateAll() {
}
}

// unhealthyCloseAll marks the coordinator unhealthy and closes all connections. We do this so that clients and agents
// are forced to reconnect to the coordinator, and will hopefully land on a healthy coordinator.
func (q *querier) unhealthyCloseAll() {
q.mu.Lock()
defer q.mu.Unlock()
q.healthy = false
for c := range q.conns {
// close connections async so that we don't block the querier routine that responds to updates
go func(c *connIO) {
err := c.updates.Close()
if err != nil {
q.logger.Debug(q.ctx, "error closing conn while unhealthy", slog.Error(err))
}
}(c)
// NOTE: we don't need to remove the connection from the map, as that will happen async in q.cleanupConn()
}
}

func (q *querier) setHealthy() {
q.mu.Lock()
defer q.mu.Unlock()
q.healthy = true
}

func (q *querier) getAll(ctx context.Context) (map[uuid.UUID]database.TailnetAgent, map[uuid.UUID][]database.TailnetClient, error) {
agents, err := q.store.GetAllTailnetAgents(ctx)
if err != nil {
Expand Down Expand Up @@ -1078,6 +1127,28 @@ func (q *workQ[K]) done(key K) {
q.cond.Signal()
}

type filterUpdate int

const (
filterUpdateNone filterUpdate = iota
filterUpdateUpdated
)

type healthUpdate int

const (
healthUpdateNone healthUpdate = iota
healthUpdateHealthy
healthUpdateUnhealthy
)

// hbUpdate is an update sent from the heartbeats to the querier. Zero values of the fields mean no update of that
// kind.
type hbUpdate struct {
filter filterUpdate
health healthUpdate
}

// heartbeats sends heartbeats for this coordinator on a timer, and monitors heartbeats from other coordinators. If a
// coordinator misses their heartbeat, we remove it from our map of "valid" coordinators, such that we will filter out
// any mappings for it when filter() is called, and we send a signal on the update channel, which triggers all mappers
Expand All @@ -1089,8 +1160,9 @@ type heartbeats struct {
store database.Store
self uuid.UUID

update chan<- struct{}
firstHeartbeat chan<- struct{}
update chan<- hbUpdate
firstHeartbeat chan<- struct{}
failedHeartbeats int

lock sync.RWMutex
coordinators map[uuid.UUID]time.Time
Expand All @@ -1103,7 +1175,7 @@ type heartbeats struct {
func newHeartbeats(
ctx context.Context, logger slog.Logger,
ps pubsub.Pubsub, store database.Store,
self uuid.UUID, update chan<- struct{},
self uuid.UUID, update chan<- hbUpdate,
firstHeartbeat chan<- struct{},
) *heartbeats {
h := &heartbeats{
Expand Down Expand Up @@ -1194,7 +1266,7 @@ func (h *heartbeats) recvBeat(id uuid.UUID) {
h.logger.Info(h.ctx, "heartbeats (re)started", slog.F("other_coordinator_id", id))
// send on a separate goroutine to avoid holding lock. Triggering update can be async
go func() {
_ = sendCtx(h.ctx, h.update, struct{}{})
_ = sendCtx(h.ctx, h.update, hbUpdate{filter: filterUpdateUpdated})
}()
}
h.coordinators[id] = time.Now()
Expand Down Expand Up @@ -1241,7 +1313,7 @@ func (h *heartbeats) checkExpiry() {
if expired {
// send on a separate goroutine to avoid holding lock. Triggering update can be async
go func() {
_ = sendCtx(h.ctx, h.update, struct{}{})
_ = sendCtx(h.ctx, h.update, hbUpdate{filter: filterUpdateUpdated})
}()
}
// we need to reset the timer for when the next oldest coordinator will expire, if any.
Expand Down Expand Up @@ -1269,11 +1341,20 @@ func (h *heartbeats) sendBeats() {
func (h *heartbeats) sendBeat() {
_, err := h.store.UpsertTailnetCoordinator(h.ctx, h.self)
if err != nil {
// just log errors, heartbeats are rescheduled on a timer
h.logger.Error(h.ctx, "failed to send heartbeat", slog.Error(err))
h.failedHeartbeats++
if h.failedHeartbeats == 3 {
h.logger.Error(h.ctx, "coordinator failed 3 heartbeats and is unhealthy")
_ = sendCtx(h.ctx, h.update, hbUpdate{health: healthUpdateUnhealthy})
}
return
}
h.logger.Debug(h.ctx, "sent heartbeat")
if h.failedHeartbeats >= 3 {
h.logger.Info(h.ctx, "coordinator sent heartbeat and is healthy")
_ = sendCtx(h.ctx, h.update, hbUpdate{health: healthUpdateHealthy})
}
h.failedHeartbeats = 0
}

func (h *heartbeats) sendDelete() {
Expand Down
Loading