Skip to content

Commit f77065d

Browse files
committed
Implement leader stepdown when quorum lost.
1 parent 1ffbabe commit f77065d

File tree

1 file changed

+64
-28
lines changed

1 file changed

+64
-28
lines changed

src/raft.c

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ typedef struct raft_peer_t {
5050
char *host;
5151
int port;
5252
struct sockaddr_in addr;
53+
54+
int silent_ms; // how long was this peer silent
5355
} raft_peer_t;
5456

5557
typedef struct raft_data_t {
@@ -178,6 +180,7 @@ static void raft_peer_init(raft_peer_t *p) {
178180

179181
p->host = DEFAULT_LISTENHOST;
180182
p->port = DEFAULT_LISTENPORT;
183+
p->silent_ms = 0;
181184
}
182185

183186
static void raft_entry_init(raft_entry_t *e) {
@@ -549,6 +552,22 @@ static void raft_refresh_acked(raft_t r) {
549552
}
550553
}
551554

555+
static int raft_increase_silent_time(raft_t r, int ms) {
556+
int recent_peers = 1; // count myself as recent
557+
558+
for (int i = 0; i < r->config.peernum_max; i++) {
559+
if (!r->peers[i].up) continue;
560+
if (i == r->me) continue;
561+
562+
r->peers[i].silent_ms += ms;
563+
if (r->peers[i].silent_ms < r->config.election_ms_max) {
564+
recent_peers++;
565+
}
566+
}
567+
568+
return recent_peers;
569+
}
570+
552571
void raft_tick(raft_t r, int msec) {
553572
r->timer -= msec;
554573
if (r->timer < 0) {
@@ -578,6 +597,13 @@ void raft_tick(raft_t r, int msec) {
578597
raft_reset_timer(r);
579598
}
580599
raft_refresh_acked(r);
600+
601+
int recent_peers = raft_increase_silent_time(r, msec);
602+
if ((r->role == LEADER) && (recent_peers * 2 <= r->peernum)) {
603+
shout("lost quorum, demoting\n");
604+
r->leader = NOBODY;
605+
r->role = FOLLOWER;
606+
}
581607
}
582608

583609
static int raft_compact(raft_t raft) {
@@ -757,6 +783,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
757783
r->leader = sender;
758784
}
759785

786+
r->peers[sender].silent_ms = 0;
760787
raft_reset_timer(r);
761788

762789
if (m->acked > r->log.acked) {
@@ -768,38 +795,37 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
768795
p->acked.entries = r->log.acked;
769796
}
770797

771-
if (m->empty) {
772-
// just a heartbeat
773-
return;
774-
}
775-
776-
if (m->offset > e->bytes) {
777-
shout("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n", m->offset);
778-
goto finish;
779-
}
798+
if (!m->empty) {
799+
if (m->offset > e->bytes) {
800+
shout("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n", m->offset);
801+
goto finish;
802+
}
780803

781-
u->len = m->totallen;
782-
u->data = realloc(u->data, m->totallen);
804+
u->len = m->totallen;
805+
u->data = realloc(u->data, m->totallen);
783806

784-
memcpy(u->data + m->offset, m->data, m->len);
785-
e->term = m->term;
786-
e->bytes = m->offset + m->len;
787-
assert(e->bytes <= u->len);
807+
memcpy(u->data + m->offset, m->data, m->len);
808+
e->term = m->term;
809+
e->bytes = m->offset + m->len;
810+
assert(e->bytes <= u->len);
788811

789-
e->snapshot = m->snapshot;
812+
e->snapshot = m->snapshot;
790813

791-
if (e->bytes == u->len) {
792-
if (m->snapshot) {
793-
if (!raft_restore(r, m->previndex, e)) {
794-
shout("restore from snapshot failed\n");
795-
goto finish;
796-
}
797-
} else {
798-
if (!raft_append(r, m->previndex, m->prevterm, e)) {
799-
debug("log_append failed\n");
800-
goto finish;
814+
if (e->bytes == u->len) {
815+
if (m->snapshot) {
816+
if (!raft_restore(r, m->previndex, e)) {
817+
shout("restore from snapshot failed\n");
818+
goto finish;
819+
}
820+
} else {
821+
if (!raft_append(r, m->previndex, m->prevterm, e)) {
822+
debug("log_append failed\n");
823+
goto finish;
824+
}
801825
}
802826
}
827+
} else {
828+
// just a heartbeat
803829
}
804830

805831
reply.progress.entries = RAFT_LOG_LAST_INDEX(r) + 1;
@@ -839,6 +865,7 @@ static void raft_handle_done(raft_t r, raft_msg_done_t *m) {
839865
if (m->success) {
840866
debug("[from %d] ============= done\n", sender);
841867
peer->acked = m->progress;
868+
peer->silent_ms = 0;
842869
} else {
843870
debug("[from %d] ============= refused\n", sender);
844871
if (peer->acked.entries > 0) {
@@ -872,7 +899,7 @@ static void raft_handle_claim(raft_t r, raft_msg_claim_t *m) {
872899

873900
if (m->msg.term >= r->term) {
874901
if (r->role != FOLLOWER) {
875-
shout("demoting myself\n");
902+
shout("There is another candidate, demoting myself\n");
876903
}
877904
if (m->msg.term > r->term) {
878905
raft_set_term(r, m->term);
@@ -913,6 +940,14 @@ static void raft_reset_bytes_acked(raft_t r) {
913940
}
914941
}
915942

943+
static void raft_reset_silent_time(raft_t r, int id) {
944+
for (int i = 0; i < r->config.peernum_max; i++) {
945+
if ((i == id) || (id == NOBODY)) {
946+
r->peers[i].silent_ms = 0;
947+
}
948+
}
949+
}
950+
916951
static void raft_handle_vote(raft_t r, raft_msg_vote_t *m) {
917952
int sender = m->msg.from;
918953
raft_peer_t *peer = r->peers + sender;
@@ -931,14 +966,15 @@ static void raft_handle_vote(raft_t r, raft_msg_vote_t *m) {
931966
r->role = LEADER;
932967
r->leader = r->me;
933968
raft_reset_bytes_acked(r);
969+
raft_reset_silent_time(r, NOBODY);
934970
raft_reset_timer(r);
935971
}
936972
}
937973

938974
void raft_handle_message(raft_t r, raft_msg_t m) {
939975
if (m->term > r->term) {
940976
if (r->role != FOLLOWER) {
941-
shout("demoting myself\n");
977+
shout("I have an old term, demoting myself\n");
942978
}
943979
raft_set_term(r, m->term);
944980
r->role = FOLLOWER;

0 commit comments

Comments
 (0)