Skip to content

Commit 18eec6f

Browse files
committed
Add a method to check if a log entry has been applied on a particular node.
1 parent 252c0f6 commit 18eec6f

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

include/raft.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,12 @@ bool raft_peer_down(raft_t r, int id);
5555

5656
// --- Log Actions ---
5757

58-
// Emit an 'update'. Returns true if emitted successfully.
59-
bool raft_emit(raft_t r, raft_update_t update);
58+
// Emit an 'update'. Returns the log index if emitted successfully, or -1
59+
// otherwise.
60+
int raft_emit(raft_t r, raft_update_t update);
61+
62+
// Checks whether an entry at 'index' has been applied by the peer named 'id'.
63+
bool raft_applied(raft_t t, int id, int index);
6064

6165
// --- Control ---
6266

src/raft.c

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ typedef enum roles {
1919

2020
#define UDP_SAFE_SIZE 508
2121

22-
// raft module does not care what you mean by action and argument
2322
typedef struct raft_entry_t {
2423
int term;
2524
bool snapshot;
@@ -46,6 +45,7 @@ typedef struct raft_peer_t {
4645

4746
int seqno; // the rpc sequence number
4847
raft_progress_t acked; // the number of entries:bytes acked by this peer
48+
int applied; // the number of entries applied by this peer
4949

5050
char *host;
5151
int port;
@@ -111,6 +111,7 @@ typedef struct raft_msg_done_t {
111111
raft_msg_data_t msg;
112112
int term; // the term of the appended entry
113113
raft_progress_t progress; // the progress after appending
114+
int applied;
114115
bool success;
115116
// the message is considered acked when the last chunk appends successfully
116117
} raft_msg_done_t;
@@ -177,6 +178,7 @@ static void raft_peer_init(raft_peer_t *p) {
177178
p->up = false;
178179
p->seqno = 0;
179180
reset_progress(&p->acked);
181+
p->applied = 0;
180182

181183
p->host = DEFAULT_LISTENHOST;
182184
p->port = DEFAULT_LISTENPORT;
@@ -633,7 +635,7 @@ static int raft_compact(raft_t raft) {
633635
return compacted;
634636
}
635637

636-
bool raft_emit(raft_t r, raft_update_t update) {
638+
int raft_emit(raft_t r, raft_update_t update) {
637639
assert(r->leader == r->me);
638640
assert(r->role == LEADER);
639641

@@ -646,11 +648,12 @@ bool raft_emit(raft_t r, raft_update_t update) {
646648
"cannot emit new entries, the log is"
647649
" full and cannot be compacted\n"
648650
);
649-
return false;
651+
return -1;
650652
}
651653
}
652654

653-
raft_entry_t *e = &RAFT_LOG(r, r->log.first + r->log.size);
655+
int newindex = RAFT_LOG_LAST_INDEX(r) + 1;
656+
raft_entry_t *e = &RAFT_LOG(r, newindex);
654657
e->term = r->term;
655658
assert(e->update.len == 0);
656659
assert(e->update.data == NULL);
@@ -661,7 +664,13 @@ bool raft_emit(raft_t r, raft_update_t update) {
661664

662665
raft_beat(r, NOBODY);
663666
raft_reset_timer(r);
664-
return true;
667+
return newindex;
668+
}
669+
670+
bool raft_applied(raft_t r, int id, int index) {
671+
raft_peer_t *p = r->peers + id;
672+
if (!p->up) return false;
673+
return p->applied >= index;
665674
}
666675

667676
static bool raft_restore(raft_t r, int previndex, raft_entry_t *e) {
@@ -836,6 +845,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
836845
} else {
837846
reply.term = -1;
838847
}
848+
reply.applied = r->log.applied;
839849

840850
reply.success = true;
841851
finish:
@@ -863,6 +873,8 @@ static void raft_handle_done(raft_t r, raft_msg_done_t *m) {
863873
return;
864874
}
865875

876+
peer->applied = m->applied;
877+
866878
if (m->success) {
867879
debug("[from %d] ============= done\n", sender);
868880
peer->acked = m->progress;

0 commit comments

Comments
 (0)