@@ -50,6 +50,8 @@ typedef struct raft_peer_t {
50
50
char * host ;
51
51
int port ;
52
52
struct sockaddr_in addr ;
53
+
54
+ int silent_ms ; // how long was this peer silent
53
55
} raft_peer_t ;
54
56
55
57
typedef struct raft_data_t {
@@ -178,6 +180,7 @@ static void raft_peer_init(raft_peer_t *p) {
178
180
179
181
p -> host = DEFAULT_LISTENHOST ;
180
182
p -> port = DEFAULT_LISTENPORT ;
183
+ p -> silent_ms = 0 ;
181
184
}
182
185
183
186
static void raft_entry_init (raft_entry_t * e ) {
@@ -549,6 +552,22 @@ static void raft_refresh_acked(raft_t r) {
549
552
}
550
553
}
551
554
555
+ static int raft_increase_silent_time (raft_t r , int ms ) {
556
+ int recent_peers = 1 ; // count myself as recent
557
+
558
+ for (int i = 0 ; i < r -> config .peernum_max ; i ++ ) {
559
+ if (!r -> peers [i ].up ) continue ;
560
+ if (i == r -> me ) continue ;
561
+
562
+ r -> peers [i ].silent_ms += ms ;
563
+ if (r -> peers [i ].silent_ms < r -> config .election_ms_max ) {
564
+ recent_peers ++ ;
565
+ }
566
+ }
567
+
568
+ return recent_peers ;
569
+ }
570
+
552
571
void raft_tick (raft_t r , int msec ) {
553
572
r -> timer -= msec ;
554
573
if (r -> timer < 0 ) {
@@ -578,6 +597,13 @@ void raft_tick(raft_t r, int msec) {
578
597
raft_reset_timer (r );
579
598
}
580
599
raft_refresh_acked (r );
600
+
601
+ int recent_peers = raft_increase_silent_time (r , msec );
602
+ if ((r -> role == LEADER ) && (recent_peers * 2 <= r -> peernum )) {
603
+ shout ("lost quorum, demoting\n" );
604
+ r -> leader = NOBODY ;
605
+ r -> role = FOLLOWER ;
606
+ }
581
607
}
582
608
583
609
static int raft_compact (raft_t raft ) {
@@ -757,6 +783,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
757
783
r -> leader = sender ;
758
784
}
759
785
786
+ r -> peers [sender ].silent_ms = 0 ;
760
787
raft_reset_timer (r );
761
788
762
789
if (m -> acked > r -> log .acked ) {
@@ -768,38 +795,37 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
768
795
p -> acked .entries = r -> log .acked ;
769
796
}
770
797
771
- if (m -> empty ) {
772
- // just a heartbeat
773
- return ;
774
- }
775
-
776
- if (m -> offset > e -> bytes ) {
777
- shout ("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n" , m -> offset );
778
- goto finish ;
779
- }
798
+ if (!m -> empty ) {
799
+ if (m -> offset > e -> bytes ) {
800
+ shout ("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n" , m -> offset );
801
+ goto finish ;
802
+ }
780
803
781
- u -> len = m -> totallen ;
782
- u -> data = realloc (u -> data , m -> totallen );
804
+ u -> len = m -> totallen ;
805
+ u -> data = realloc (u -> data , m -> totallen );
783
806
784
- memcpy (u -> data + m -> offset , m -> data , m -> len );
785
- e -> term = m -> term ;
786
- e -> bytes = m -> offset + m -> len ;
787
- assert (e -> bytes <= u -> len );
807
+ memcpy (u -> data + m -> offset , m -> data , m -> len );
808
+ e -> term = m -> term ;
809
+ e -> bytes = m -> offset + m -> len ;
810
+ assert (e -> bytes <= u -> len );
788
811
789
- e -> snapshot = m -> snapshot ;
812
+ e -> snapshot = m -> snapshot ;
790
813
791
- if (e -> bytes == u -> len ) {
792
- if (m -> snapshot ) {
793
- if (!raft_restore (r , m -> previndex , e )) {
794
- shout ("restore from snapshot failed\n" );
795
- goto finish ;
796
- }
797
- } else {
798
- if (!raft_append (r , m -> previndex , m -> prevterm , e )) {
799
- debug ("log_append failed\n" );
800
- goto finish ;
814
+ if (e -> bytes == u -> len ) {
815
+ if (m -> snapshot ) {
816
+ if (!raft_restore (r , m -> previndex , e )) {
817
+ shout ("restore from snapshot failed\n" );
818
+ goto finish ;
819
+ }
820
+ } else {
821
+ if (!raft_append (r , m -> previndex , m -> prevterm , e )) {
822
+ debug ("log_append failed\n" );
823
+ goto finish ;
824
+ }
801
825
}
802
826
}
827
+ } else {
828
+ // just a heartbeat
803
829
}
804
830
805
831
reply .progress .entries = RAFT_LOG_LAST_INDEX (r ) + 1 ;
@@ -839,6 +865,7 @@ static void raft_handle_done(raft_t r, raft_msg_done_t *m) {
839
865
if (m -> success ) {
840
866
debug ("[from %d] ============= done\n" , sender );
841
867
peer -> acked = m -> progress ;
868
+ peer -> silent_ms = 0 ;
842
869
} else {
843
870
debug ("[from %d] ============= refused\n" , sender );
844
871
if (peer -> acked .entries > 0 ) {
@@ -872,7 +899,7 @@ static void raft_handle_claim(raft_t r, raft_msg_claim_t *m) {
872
899
873
900
if (m -> msg .term >= r -> term ) {
874
901
if (r -> role != FOLLOWER ) {
875
- shout ("demoting myself\n" );
902
+ shout ("There is another candidate, demoting myself\n" );
876
903
}
877
904
if (m -> msg .term > r -> term ) {
878
905
raft_set_term (r , m -> term );
@@ -913,6 +940,14 @@ static void raft_reset_bytes_acked(raft_t r) {
913
940
}
914
941
}
915
942
943
+ static void raft_reset_silent_time (raft_t r , int id ) {
944
+ for (int i = 0 ; i < r -> config .peernum_max ; i ++ ) {
945
+ if ((i == id ) || (id == NOBODY )) {
946
+ r -> peers [i ].silent_ms = 0 ;
947
+ }
948
+ }
949
+ }
950
+
916
951
static void raft_handle_vote (raft_t r , raft_msg_vote_t * m ) {
917
952
int sender = m -> msg .from ;
918
953
raft_peer_t * peer = r -> peers + sender ;
@@ -931,14 +966,15 @@ static void raft_handle_vote(raft_t r, raft_msg_vote_t *m) {
931
966
r -> role = LEADER ;
932
967
r -> leader = r -> me ;
933
968
raft_reset_bytes_acked (r );
969
+ raft_reset_silent_time (r , NOBODY );
934
970
raft_reset_timer (r );
935
971
}
936
972
}
937
973
938
974
void raft_handle_message (raft_t r , raft_msg_t m ) {
939
975
if (m -> term > r -> term ) {
940
976
if (r -> role != FOLLOWER ) {
941
- shout ("demoting myself\n" );
977
+ shout ("I have an old term, demoting myself\n" );
942
978
}
943
979
raft_set_term (r , m -> term );
944
980
r -> role = FOLLOWER ;
0 commit comments