Skip to content

Commit c7a6d62

Browse files
authored
fix: make PGCoordinator close connections when unhealthy (#9125)
Signed-off-by: Spike Curtis <spike@coder.com>
1 parent c217a0d commit c7a6d62

File tree

3 files changed

+183
-28
lines changed

3 files changed

+183
-28
lines changed

.golangci.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ issues:
211211
run:
212212
skip-dirs:
213213
- node_modules
214+
- .git
214215
skip-files:
215216
- scripts/rules.go
216217
timeout: 10m

enterprise/tailnet/pgcoord.go

+91-10
Original file line numberDiff line numberDiff line change
@@ -586,10 +586,12 @@ type querier struct {
586586

587587
workQ *workQ[mKey]
588588
heartbeats *heartbeats
589-
updates <-chan struct{}
589+
updates <-chan hbUpdate
590590

591591
mu sync.Mutex
592592
mappers map[mKey]*countedMapper
593+
conns map[*connIO]struct{}
594+
healthy bool
593595
}
594596

595597
type countedMapper struct {
@@ -604,7 +606,7 @@ func newQuerier(
604606
self uuid.UUID, newConnections chan *connIO, numWorkers int,
605607
firstHeartbeat chan<- struct{},
606608
) *querier {
607-
updates := make(chan struct{})
609+
updates := make(chan hbUpdate)
608610
q := &querier{
609611
ctx: ctx,
610612
logger: logger.Named("querier"),
@@ -614,7 +616,9 @@ func newQuerier(
614616
workQ: newWorkQ[mKey](ctx),
615617
heartbeats: newHeartbeats(ctx, logger, ps, store, self, updates, firstHeartbeat),
616618
mappers: make(map[mKey]*countedMapper),
619+
conns: make(map[*connIO]struct{}),
617620
updates: updates,
621+
healthy: true, // assume we start healthy
618622
}
619623
go q.subscribe()
620624
go q.handleConnIO()
@@ -639,6 +643,15 @@ func (q *querier) handleConnIO() {
639643
func (q *querier) newConn(c *connIO) {
640644
q.mu.Lock()
641645
defer q.mu.Unlock()
646+
if !q.healthy {
647+
err := c.updates.Close()
648+
q.logger.Info(q.ctx, "closed incoming connection while unhealthy",
649+
slog.Error(err),
650+
slog.F("agent_id", c.agent),
651+
slog.F("client_id", c.client),
652+
)
653+
return
654+
}
642655
mk := mKey{
643656
agent: c.agent,
644657
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself
@@ -661,13 +674,15 @@ func (q *querier) newConn(c *connIO) {
661674
return
662675
}
663676
cm.count++
677+
q.conns[c] = struct{}{}
664678
go q.cleanupConn(c)
665679
}
666680

667681
func (q *querier) cleanupConn(c *connIO) {
668682
<-c.ctx.Done()
669683
q.mu.Lock()
670684
defer q.mu.Unlock()
685+
delete(q.conns, c)
671686
mk := mKey{
672687
agent: c.agent,
673688
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself
@@ -911,8 +926,18 @@ func (q *querier) handleUpdates() {
911926
select {
912927
case <-q.ctx.Done():
913928
return
914-
case <-q.updates:
915-
q.updateAll()
929+
case u := <-q.updates:
930+
if u.filter == filterUpdateUpdated {
931+
q.updateAll()
932+
}
933+
if u.health == healthUpdateUnhealthy {
934+
q.unhealthyCloseAll()
935+
continue
936+
}
937+
if u.health == healthUpdateHealthy {
938+
q.setHealthy()
939+
continue
940+
}
916941
}
917942
}
918943
}
@@ -932,6 +957,30 @@ func (q *querier) updateAll() {
932957
}
933958
}
934959

960+
// unhealthyCloseAll marks the coordinator unhealthy and closes all connections. We do this so that clients and agents
961+
// are forced to reconnect to the coordinator, and will hopefully land on a healthy coordinator.
962+
func (q *querier) unhealthyCloseAll() {
963+
q.mu.Lock()
964+
defer q.mu.Unlock()
965+
q.healthy = false
966+
for c := range q.conns {
967+
// close connections async so that we don't block the querier routine that responds to updates
968+
go func(c *connIO) {
969+
err := c.updates.Close()
970+
if err != nil {
971+
q.logger.Debug(q.ctx, "error closing conn while unhealthy", slog.Error(err))
972+
}
973+
}(c)
974+
// NOTE: we don't need to remove the connection from the map, as that will happen async in q.cleanupConn()
975+
}
976+
}
977+
978+
func (q *querier) setHealthy() {
979+
q.mu.Lock()
980+
defer q.mu.Unlock()
981+
q.healthy = true
982+
}
983+
935984
func (q *querier) getAll(ctx context.Context) (map[uuid.UUID]database.TailnetAgent, map[uuid.UUID][]database.TailnetClient, error) {
936985
agents, err := q.store.GetAllTailnetAgents(ctx)
937986
if err != nil {
@@ -1078,6 +1127,28 @@ func (q *workQ[K]) done(key K) {
10781127
q.cond.Signal()
10791128
}
10801129

1130+
type filterUpdate int
1131+
1132+
const (
1133+
filterUpdateNone filterUpdate = iota
1134+
filterUpdateUpdated
1135+
)
1136+
1137+
type healthUpdate int
1138+
1139+
const (
1140+
healthUpdateNone healthUpdate = iota
1141+
healthUpdateHealthy
1142+
healthUpdateUnhealthy
1143+
)
1144+
1145+
// hbUpdate is an update sent from the heartbeats to the querier. Zero values of the fields mean no update of that
1146+
// kind.
1147+
type hbUpdate struct {
1148+
filter filterUpdate
1149+
health healthUpdate
1150+
}
1151+
10811152
// heartbeats sends heartbeats for this coordinator on a timer, and monitors heartbeats from other coordinators. If a
10821153
// coordinator misses their heartbeat, we remove it from our map of "valid" coordinators, such that we will filter out
10831154
// any mappings for it when filter() is called, and we send a signal on the update channel, which triggers all mappers
@@ -1089,8 +1160,9 @@ type heartbeats struct {
10891160
store database.Store
10901161
self uuid.UUID
10911162

1092-
update chan<- struct{}
1093-
firstHeartbeat chan<- struct{}
1163+
update chan<- hbUpdate
1164+
firstHeartbeat chan<- struct{}
1165+
failedHeartbeats int
10941166

10951167
lock sync.RWMutex
10961168
coordinators map[uuid.UUID]time.Time
@@ -1103,7 +1175,7 @@ type heartbeats struct {
11031175
func newHeartbeats(
11041176
ctx context.Context, logger slog.Logger,
11051177
ps pubsub.Pubsub, store database.Store,
1106-
self uuid.UUID, update chan<- struct{},
1178+
self uuid.UUID, update chan<- hbUpdate,
11071179
firstHeartbeat chan<- struct{},
11081180
) *heartbeats {
11091181
h := &heartbeats{
@@ -1194,7 +1266,7 @@ func (h *heartbeats) recvBeat(id uuid.UUID) {
11941266
h.logger.Info(h.ctx, "heartbeats (re)started", slog.F("other_coordinator_id", id))
11951267
// send on a separate goroutine to avoid holding lock. Triggering update can be async
11961268
go func() {
1197-
_ = sendCtx(h.ctx, h.update, struct{}{})
1269+
_ = sendCtx(h.ctx, h.update, hbUpdate{filter: filterUpdateUpdated})
11981270
}()
11991271
}
12001272
h.coordinators[id] = time.Now()
@@ -1241,7 +1313,7 @@ func (h *heartbeats) checkExpiry() {
12411313
if expired {
12421314
// send on a separate goroutine to avoid holding lock. Triggering update can be async
12431315
go func() {
1244-
_ = sendCtx(h.ctx, h.update, struct{}{})
1316+
_ = sendCtx(h.ctx, h.update, hbUpdate{filter: filterUpdateUpdated})
12451317
}()
12461318
}
12471319
// we need to reset the timer for when the next oldest coordinator will expire, if any.
@@ -1269,11 +1341,20 @@ func (h *heartbeats) sendBeats() {
12691341
func (h *heartbeats) sendBeat() {
12701342
_, err := h.store.UpsertTailnetCoordinator(h.ctx, h.self)
12711343
if err != nil {
1272-
// just log errors, heartbeats are rescheduled on a timer
12731344
h.logger.Error(h.ctx, "failed to send heartbeat", slog.Error(err))
1345+
h.failedHeartbeats++
1346+
if h.failedHeartbeats == 3 {
1347+
h.logger.Error(h.ctx, "coordinator failed 3 heartbeats and is unhealthy")
1348+
_ = sendCtx(h.ctx, h.update, hbUpdate{health: healthUpdateUnhealthy})
1349+
}
12741350
return
12751351
}
12761352
h.logger.Debug(h.ctx, "sent heartbeat")
1353+
if h.failedHeartbeats >= 3 {
1354+
h.logger.Info(h.ctx, "coordinator sent heartbeat and is healthy")
1355+
_ = sendCtx(h.ctx, h.update, hbUpdate{health: healthUpdateHealthy})
1356+
}
1357+
h.failedHeartbeats = 0
12771358
}
12781359

12791360
func (h *heartbeats) sendDelete() {

0 commit comments

Comments
 (0)