Skip to content

Commit 4f69a62

Browse files
committed
Fix a bug with large message getting corrupted.
1 parent 45143b6 commit 4f69a62

File tree

1 file changed

+43
-31
lines changed

1 file changed

+43
-31
lines changed

src/raft.c

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,33 @@ static bool raft_restore(raft_t r, int previndex, raft_entry_t *e) {
718718
return true;
719719
}
720720

721+
static bool raft_appendable(raft_t r, int previndex, int prevterm) {
722+
int low, high;
723+
724+
raft_log_t *l = &r->log;
725+
726+
low = RAFT_LOG_FIRST_INDEX(r);
727+
if (low == 0) low = -1; // allow appending at the start
728+
high = RAFT_LOG_LAST_INDEX(r);
729+
730+
if (!inrange(low, previndex, high))
731+
{
732+
debug(
733+
"previndex %d is outside log range %d-%d\n",
734+
previndex, low, high
735+
);
736+
return false;
737+
}
738+
739+
if (previndex != -1) {
740+
raft_entry_t *pe = &RAFT_LOG(r, previndex);
741+
if (pe->term != prevterm) {
742+
debug("log term %d != prevterm %d\n", pe->term, prevterm);
743+
return false;
744+
}
745+
}
746+
}
747+
721748
static bool raft_append(raft_t r, int previndex, int prevterm, raft_entry_t *e) {
722749
assert(e->bytes == e->update.len);
723750
assert(!e->snapshot);
@@ -730,16 +757,9 @@ static bool raft_append(raft_t r, int previndex, int prevterm, raft_entry_t *e)
730757
l, previndex, prevterm,
731758
e->term
732759
);
733-
if (previndex != -1) {
734-
if (previndex < l->first) {
735-
debug("previndex < first\n");
736-
return false;
737-
}
738-
}
739-
if (previndex > RAFT_LOG_LAST_INDEX(r)) {
740-
debug("previndex(%d) > last(%d)\n", previndex, RAFT_LOG_LAST_INDEX(r));
741-
return false;
742-
}
760+
761+
if (!raft_appendable(r, previndex, prevterm)) return false;
762+
743763
if (previndex == RAFT_LOG_LAST_INDEX(r)) {
744764
debug("previndex == last\n");
745765
// appending to the end
@@ -755,14 +775,6 @@ static bool raft_append(raft_t r, int previndex, int prevterm, raft_entry_t *e)
755775
}
756776
}
757777

758-
if (previndex != -1) {
759-
raft_entry_t *pe = &RAFT_LOG(r, previndex);
760-
if (pe->term != prevterm) {
761-
debug("log term %d != prevterm %d\n", pe->term, prevterm);
762-
return false;
763-
}
764-
}
765-
766778
int index = previndex + 1;
767779
raft_entry_t *slot = &RAFT_LOG(r, index);
768780

@@ -797,14 +809,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
797809
raft_entry_t *e = &r->log.newentry;
798810
raft_update_t *u = &e->update;
799811

800-
reply.progress.entries = RAFT_LOG_LAST_INDEX(r) + 1;
801-
reply.progress.bytes = e->bytes;
802-
803-
if (m->previndex > RAFT_LOG_LAST_INDEX(r))
804-
{
805-
debug("got an update with previndex=%d > lastindex=%d\n", m->previndex, RAFT_LOG_LAST_INDEX(r));
806-
goto finish;
807-
}
812+
if (!m->snapshot && !raft_appendable(r, m->previndex, m->prevterm)) goto finish;
808813

809814
if (reply.progress.entries > 0) {
810815
reply.term = RAFT_LOG(r, reply.progress.entries - 1).term;
@@ -838,6 +843,13 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
838843
}
839844

840845
if (!m->empty) {
846+
if ((m->offset > 0) && (e->term < m->term)) {
847+
shout("a chunk of newer version of entry received, resetting progress to avoid corruption\n");
848+
e->term = m->term;
849+
e->bytes = 0;
850+
goto finish;
851+
}
852+
841853
if (m->offset > e->bytes) {
842854
shout("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n", m->offset);
843855
goto finish;
@@ -870,18 +882,18 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
870882
// just a heartbeat
871883
}
872884

873-
reply.progress.entries = RAFT_LOG_LAST_INDEX(r) + 1;
874-
reply.progress.bytes = e->bytes;
875-
if (reply.progress.entries > 0) {
876-
reply.term = RAFT_LOG(r, reply.progress.entries - 1).term;
885+
if (RAFT_LOG_LAST_INDEX(r) >= 0) {
886+
reply.term = RAFT_LOG(r, RAFT_LOG_LAST_INDEX(r)).term;
877887
} else {
878888
reply.term = -1;
879889
}
880890
reply.applied = r->log.applied;
881891

882892
reply.success = true;
883893
finish:
884-
assert((reply.progress.entries == m->previndex + 1) || (reply.progress.bytes == 0));
894+
reply.progress.entries = RAFT_LOG_LAST_INDEX(r) + 1;
895+
reply.progress.bytes = e->bytes;
896+
885897
raft_send(r, sender, &reply, sizeof(reply));
886898
}
887899

0 commit comments

Comments
 (0)