@@ -586,10 +586,12 @@ type querier struct {
586
586
587
587
workQ * workQ [mKey ]
588
588
heartbeats * heartbeats
589
- updates <- chan struct {}
589
+ updates <- chan hbUpdate
590
590
591
591
mu sync.Mutex
592
592
mappers map [mKey ]* countedMapper
593
+ conns map [* connIO ]struct {}
594
+ healthy bool
593
595
}
594
596
595
597
type countedMapper struct {
@@ -604,7 +606,7 @@ func newQuerier(
604
606
self uuid.UUID , newConnections chan * connIO , numWorkers int ,
605
607
firstHeartbeat chan <- struct {},
606
608
) * querier {
607
- updates := make (chan struct {} )
609
+ updates := make (chan hbUpdate )
608
610
q := & querier {
609
611
ctx : ctx ,
610
612
logger : logger .Named ("querier" ),
@@ -614,7 +616,9 @@ func newQuerier(
614
616
workQ : newWorkQ [mKey ](ctx ),
615
617
heartbeats : newHeartbeats (ctx , logger , ps , store , self , updates , firstHeartbeat ),
616
618
mappers : make (map [mKey ]* countedMapper ),
619
+ conns : make (map [* connIO ]struct {}),
617
620
updates : updates ,
621
+ healthy : true , // assume we start healthy
618
622
}
619
623
go q .subscribe ()
620
624
go q .handleConnIO ()
@@ -639,6 +643,15 @@ func (q *querier) handleConnIO() {
639
643
func (q * querier ) newConn (c * connIO ) {
640
644
q .mu .Lock ()
641
645
defer q .mu .Unlock ()
646
+ if ! q .healthy {
647
+ err := c .updates .Close ()
648
+ q .logger .Info (q .ctx , "closed incoming connection while unhealthy" ,
649
+ slog .Error (err ),
650
+ slog .F ("agent_id" , c .agent ),
651
+ slog .F ("client_id" , c .client ),
652
+ )
653
+ return
654
+ }
642
655
mk := mKey {
643
656
agent : c .agent ,
644
657
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself
@@ -661,13 +674,15 @@ func (q *querier) newConn(c *connIO) {
661
674
return
662
675
}
663
676
cm .count ++
677
+ q .conns [c ] = struct {}{}
664
678
go q .cleanupConn (c )
665
679
}
666
680
667
681
func (q * querier ) cleanupConn (c * connIO ) {
668
682
<- c .ctx .Done ()
669
683
q .mu .Lock ()
670
684
defer q .mu .Unlock ()
685
+ delete (q .conns , c )
671
686
mk := mKey {
672
687
agent : c .agent ,
673
688
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself
@@ -911,8 +926,18 @@ func (q *querier) handleUpdates() {
911
926
select {
912
927
case <- q .ctx .Done ():
913
928
return
914
- case <- q .updates :
915
- q .updateAll ()
929
+ case u := <- q .updates :
930
+ if u .filter == filterUpdateUpdated {
931
+ q .updateAll ()
932
+ }
933
+ if u .health == healthUpdateUnhealthy {
934
+ q .unhealthyCloseAll ()
935
+ continue
936
+ }
937
+ if u .health == healthUpdateHealthy {
938
+ q .setHealthy ()
939
+ continue
940
+ }
916
941
}
917
942
}
918
943
}
@@ -932,6 +957,30 @@ func (q *querier) updateAll() {
932
957
}
933
958
}
934
959
960
+ // unhealthyCloseAll marks the coordinator unhealthy and closes all connections. We do this so that clients and agents
961
+ // are forced to reconnect to the coordinator, and will hopefully land on a healthy coordinator.
962
+ func (q * querier ) unhealthyCloseAll () {
963
+ q .mu .Lock ()
964
+ defer q .mu .Unlock ()
965
+ q .healthy = false
966
+ for c := range q .conns {
967
+ // close connections async so that we don't block the querier routine that responds to updates
968
+ go func (c * connIO ) {
969
+ err := c .updates .Close ()
970
+ if err != nil {
971
+ q .logger .Debug (q .ctx , "error closing conn while unhealthy" , slog .Error (err ))
972
+ }
973
+ }(c )
974
+ // NOTE: we don't need to remove the connection from the map, as that will happen async in q.cleanupConn()
975
+ }
976
+ }
977
+
978
+ func (q * querier ) setHealthy () {
979
+ q .mu .Lock ()
980
+ defer q .mu .Unlock ()
981
+ q .healthy = true
982
+ }
983
+
935
984
func (q * querier ) getAll (ctx context.Context ) (map [uuid.UUID ]database.TailnetAgent , map [uuid.UUID ][]database.TailnetClient , error ) {
936
985
agents , err := q .store .GetAllTailnetAgents (ctx )
937
986
if err != nil {
@@ -1078,6 +1127,28 @@ func (q *workQ[K]) done(key K) {
1078
1127
q .cond .Signal ()
1079
1128
}
1080
1129
1130
+ type filterUpdate int
1131
+
1132
+ const (
1133
+ filterUpdateNone filterUpdate = iota
1134
+ filterUpdateUpdated
1135
+ )
1136
+
1137
+ type healthUpdate int
1138
+
1139
+ const (
1140
+ healthUpdateNone healthUpdate = iota
1141
+ healthUpdateHealthy
1142
+ healthUpdateUnhealthy
1143
+ )
1144
+
1145
+ // hbUpdate is an update sent from the heartbeats to the querier. Zero values of the fields mean no update of that
1146
+ // kind.
1147
+ type hbUpdate struct {
1148
+ filter filterUpdate
1149
+ health healthUpdate
1150
+ }
1151
+
1081
1152
// heartbeats sends heartbeats for this coordinator on a timer, and monitors heartbeats from other coordinators. If a
1082
1153
// coordinator misses their heartbeat, we remove it from our map of "valid" coordinators, such that we will filter out
1083
1154
// any mappings for it when filter() is called, and we send a signal on the update channel, which triggers all mappers
@@ -1089,8 +1160,9 @@ type heartbeats struct {
1089
1160
store database.Store
1090
1161
self uuid.UUID
1091
1162
1092
- update chan <- struct {}
1093
- firstHeartbeat chan <- struct {}
1163
+ update chan <- hbUpdate
1164
+ firstHeartbeat chan <- struct {}
1165
+ failedHeartbeats int
1094
1166
1095
1167
lock sync.RWMutex
1096
1168
coordinators map [uuid.UUID ]time.Time
@@ -1103,7 +1175,7 @@ type heartbeats struct {
1103
1175
func newHeartbeats (
1104
1176
ctx context.Context , logger slog.Logger ,
1105
1177
ps pubsub.Pubsub , store database.Store ,
1106
- self uuid.UUID , update chan <- struct {} ,
1178
+ self uuid.UUID , update chan <- hbUpdate ,
1107
1179
firstHeartbeat chan <- struct {},
1108
1180
) * heartbeats {
1109
1181
h := & heartbeats {
@@ -1194,7 +1266,7 @@ func (h *heartbeats) recvBeat(id uuid.UUID) {
1194
1266
h .logger .Info (h .ctx , "heartbeats (re)started" , slog .F ("other_coordinator_id" , id ))
1195
1267
// send on a separate goroutine to avoid holding lock. Triggering update can be async
1196
1268
go func () {
1197
- _ = sendCtx (h .ctx , h .update , struct {}{ })
1269
+ _ = sendCtx (h .ctx , h .update , hbUpdate { filter : filterUpdateUpdated })
1198
1270
}()
1199
1271
}
1200
1272
h .coordinators [id ] = time .Now ()
@@ -1241,7 +1313,7 @@ func (h *heartbeats) checkExpiry() {
1241
1313
if expired {
1242
1314
// send on a separate goroutine to avoid holding lock. Triggering update can be async
1243
1315
go func () {
1244
- _ = sendCtx (h .ctx , h .update , struct {}{ })
1316
+ _ = sendCtx (h .ctx , h .update , hbUpdate { filter : filterUpdateUpdated })
1245
1317
}()
1246
1318
}
1247
1319
// we need to reset the timer for when the next oldest coordinator will expire, if any.
@@ -1269,11 +1341,20 @@ func (h *heartbeats) sendBeats() {
1269
1341
func (h * heartbeats ) sendBeat () {
1270
1342
_ , err := h .store .UpsertTailnetCoordinator (h .ctx , h .self )
1271
1343
if err != nil {
1272
- // just log errors, heartbeats are rescheduled on a timer
1273
1344
h .logger .Error (h .ctx , "failed to send heartbeat" , slog .Error (err ))
1345
+ h .failedHeartbeats ++
1346
+ if h .failedHeartbeats == 3 {
1347
+ h .logger .Error (h .ctx , "coordinator failed 3 heartbeats and is unhealthy" )
1348
+ _ = sendCtx (h .ctx , h .update , hbUpdate {health : healthUpdateUnhealthy })
1349
+ }
1274
1350
return
1275
1351
}
1276
1352
h .logger .Debug (h .ctx , "sent heartbeat" )
1353
+ if h .failedHeartbeats >= 3 {
1354
+ h .logger .Info (h .ctx , "coordinator sent heartbeat and is healthy" )
1355
+ _ = sendCtx (h .ctx , h .update , hbUpdate {health : healthUpdateHealthy })
1356
+ }
1357
+ h .failedHeartbeats = 0
1277
1358
}
1278
1359
1279
1360
func (h * heartbeats ) sendDelete () {
0 commit comments