@@ -718,6 +718,33 @@ static bool raft_restore(raft_t r, int previndex, raft_entry_t *e) {
718
718
return true;
719
719
}
720
720
721
+ static bool raft_appendable (raft_t r , int previndex , int prevterm ) {
722
+ int low , high ;
723
+
724
+ raft_log_t * l = & r -> log ;
725
+
726
+ low = RAFT_LOG_FIRST_INDEX (r );
727
+ if (low == 0 ) low = -1 ; // allow appending at the start
728
+ high = RAFT_LOG_LAST_INDEX (r );
729
+
730
+ if (!inrange (low , previndex , high ))
731
+ {
732
+ debug (
733
+ "previndex %d is outside log range %d-%d\n" ,
734
+ previndex , low , high
735
+ );
736
+ return false;
737
+ }
738
+
739
+ if (previndex != -1 ) {
740
+ raft_entry_t * pe = & RAFT_LOG (r , previndex );
741
+ if (pe -> term != prevterm ) {
742
+ debug ("log term %d != prevterm %d\n" , pe -> term , prevterm );
743
+ return false;
744
+ }
745
+ }
746
+ }
747
+
721
748
static bool raft_append (raft_t r , int previndex , int prevterm , raft_entry_t * e ) {
722
749
assert (e -> bytes == e -> update .len );
723
750
assert (!e -> snapshot );
@@ -730,16 +757,9 @@ static bool raft_append(raft_t r, int previndex, int prevterm, raft_entry_t *e)
730
757
l , previndex , prevterm ,
731
758
e -> term
732
759
);
733
- if (previndex != -1 ) {
734
- if (previndex < l -> first ) {
735
- debug ("previndex < first\n" );
736
- return false;
737
- }
738
- }
739
- if (previndex > RAFT_LOG_LAST_INDEX (r )) {
740
- debug ("previndex(%d) > last(%d)\n" , previndex , RAFT_LOG_LAST_INDEX (r ));
741
- return false;
742
- }
760
+
761
+ if (!raft_appendable (r , previndex , prevterm )) return false;
762
+
743
763
if (previndex == RAFT_LOG_LAST_INDEX (r )) {
744
764
debug ("previndex == last\n" );
745
765
// appending to the end
@@ -755,14 +775,6 @@ static bool raft_append(raft_t r, int previndex, int prevterm, raft_entry_t *e)
755
775
}
756
776
}
757
777
758
- if (previndex != -1 ) {
759
- raft_entry_t * pe = & RAFT_LOG (r , previndex );
760
- if (pe -> term != prevterm ) {
761
- debug ("log term %d != prevterm %d\n" , pe -> term , prevterm );
762
- return false;
763
- }
764
- }
765
-
766
778
int index = previndex + 1 ;
767
779
raft_entry_t * slot = & RAFT_LOG (r , index );
768
780
@@ -797,14 +809,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
797
809
raft_entry_t * e = & r -> log .newentry ;
798
810
raft_update_t * u = & e -> update ;
799
811
800
- reply .progress .entries = RAFT_LOG_LAST_INDEX (r ) + 1 ;
801
- reply .progress .bytes = e -> bytes ;
802
-
803
- if (m -> previndex > RAFT_LOG_LAST_INDEX (r ))
804
- {
805
- debug ("got an update with previndex=%d > lastindex=%d\n" , m -> previndex , RAFT_LOG_LAST_INDEX (r ));
806
- goto finish ;
807
- }
812
+ if (!m -> snapshot && !raft_appendable (r , m -> previndex , m -> prevterm )) goto finish ;
808
813
809
814
if (reply .progress .entries > 0 ) {
810
815
reply .term = RAFT_LOG (r , reply .progress .entries - 1 ).term ;
@@ -838,6 +843,13 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
838
843
}
839
844
840
845
if (!m -> empty ) {
846
+ if ((m -> offset > 0 ) && (e -> term < m -> term )) {
847
+ shout ("a chunk of newer version of entry received, resetting progress to avoid corruption\n" );
848
+ e -> term = m -> term ;
849
+ e -> bytes = 0 ;
850
+ goto finish ;
851
+ }
852
+
841
853
if (m -> offset > e -> bytes ) {
842
854
shout ("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n" , m -> offset );
843
855
goto finish ;
@@ -870,18 +882,18 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
870
882
// just a heartbeat
871
883
}
872
884
873
- reply .progress .entries = RAFT_LOG_LAST_INDEX (r ) + 1 ;
874
- reply .progress .bytes = e -> bytes ;
875
- if (reply .progress .entries > 0 ) {
876
- reply .term = RAFT_LOG (r , reply .progress .entries - 1 ).term ;
885
+ if (RAFT_LOG_LAST_INDEX (r ) >= 0 ) {
886
+ reply .term = RAFT_LOG (r , RAFT_LOG_LAST_INDEX (r )).term ;
877
887
} else {
878
888
reply .term = -1 ;
879
889
}
880
890
reply .applied = r -> log .applied ;
881
891
882
892
reply .success = true;
883
893
finish :
884
- assert ((reply .progress .entries == m -> previndex + 1 ) || (reply .progress .bytes == 0 ));
894
+ reply .progress .entries = RAFT_LOG_LAST_INDEX (r ) + 1 ;
895
+ reply .progress .bytes = e -> bytes ;
896
+
885
897
raft_send (r , sender , & reply , sizeof (reply ));
886
898
}
887
899
0 commit comments