@@ -19,7 +19,6 @@ typedef enum roles {
19
19
20
20
#define UDP_SAFE_SIZE 508
21
21
22
- // raft module does not care what you mean by action and argument
23
22
typedef struct raft_entry_t {
24
23
int term ;
25
24
bool snapshot ;
@@ -46,6 +45,7 @@ typedef struct raft_peer_t {
46
45
47
46
int seqno ; // the rpc sequence number
48
47
raft_progress_t acked ; // the number of entries:bytes acked by this peer
48
+ int applied ; // the number of entries applied by this peer
49
49
50
50
char * host ;
51
51
int port ;
@@ -111,6 +111,7 @@ typedef struct raft_msg_done_t {
111
111
raft_msg_data_t msg ;
112
112
int term ; // the term of the appended entry
113
113
raft_progress_t progress ; // the progress after appending
114
+ int applied ;
114
115
bool success ;
115
116
// the message is considered acked when the last chunk appends successfully
116
117
} raft_msg_done_t ;
@@ -177,6 +178,7 @@ static void raft_peer_init(raft_peer_t *p) {
177
178
p -> up = false;
178
179
p -> seqno = 0 ;
179
180
reset_progress (& p -> acked );
181
+ p -> applied = 0 ;
180
182
181
183
p -> host = DEFAULT_LISTENHOST ;
182
184
p -> port = DEFAULT_LISTENPORT ;
@@ -633,7 +635,7 @@ static int raft_compact(raft_t raft) {
633
635
return compacted ;
634
636
}
635
637
636
- bool raft_emit (raft_t r , raft_update_t update ) {
638
+ int raft_emit (raft_t r , raft_update_t update ) {
637
639
assert (r -> leader == r -> me );
638
640
assert (r -> role == LEADER );
639
641
@@ -646,11 +648,12 @@ bool raft_emit(raft_t r, raft_update_t update) {
646
648
"cannot emit new entries, the log is"
647
649
" full and cannot be compacted\n"
648
650
);
649
- return false ;
651
+ return -1 ;
650
652
}
651
653
}
652
654
653
- raft_entry_t * e = & RAFT_LOG (r , r -> log .first + r -> log .size );
655
+ int newindex = RAFT_LOG_LAST_INDEX (r ) + 1 ;
656
+ raft_entry_t * e = & RAFT_LOG (r , newindex );
654
657
e -> term = r -> term ;
655
658
assert (e -> update .len == 0 );
656
659
assert (e -> update .data == NULL );
@@ -661,7 +664,13 @@ bool raft_emit(raft_t r, raft_update_t update) {
661
664
662
665
raft_beat (r , NOBODY );
663
666
raft_reset_timer (r );
664
- return true;
667
+ return newindex ;
668
+ }
669
+
670
+ bool raft_applied (raft_t r , int id , int index ) {
671
+ raft_peer_t * p = r -> peers + id ;
672
+ if (!p -> up ) return false;
673
+ return p -> applied >= index ;
665
674
}
666
675
667
676
static bool raft_restore (raft_t r , int previndex , raft_entry_t * e ) {
@@ -836,6 +845,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
836
845
} else {
837
846
reply .term = -1 ;
838
847
}
848
+ reply .applied = r -> log .applied ;
839
849
840
850
reply .success = true;
841
851
finish :
@@ -863,6 +873,8 @@ static void raft_handle_done(raft_t r, raft_msg_done_t *m) {
863
873
return ;
864
874
}
865
875
876
+ peer -> applied = m -> applied ;
877
+
866
878
if (m -> success ) {
867
879
debug ("[from %d] ============= done\n" , sender );
868
880
peer -> acked = m -> progress ;
0 commit comments