Skip to content

Commit cb82cd6

Browse files
committed
Merge branch 'master' into persistence-2-consensus
2 parents 30d043a + 386ff7b commit cb82cd6

File tree

4 files changed

+347
-224
lines changed

4 files changed

+347
-224
lines changed

src/raft/consensus.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,18 +1028,29 @@ impl Raft for RaftImpl {
10281028
let next_term = term + 1;
10291029
state.reset_follower_timer(self.state.clone(), next_term);
10301030

1031-
// Make sure we have the previous log index sent. Note that COMPACTED
1032-
// can happen whenever we have no entries (e.g.,initially or just after
1033-
// a snapshot install).
1034-
let previous = &request.previous.ok_or(RaftError::missing("previous"))?;
1035-
let next_index = state.store.next_index();
1036-
if previous.index >= next_index {
1037-
// Let the leader know that this entry is too far in the future, so
1038-
// it can try again from an earlier index.
1031+
// Make sure we have the previous log index sent.
1032+
//
1033+
// This can be false in two cases:
1034+
// 1) The leader's "previous" is too far in the future, i.e., the index is larger
1035+
// than the next index we expect to append to our log.
1036+
// 2) The leader's "previous" is too far in the past, i.e., the entry has been
1037+
// compacted into our snapshot, and is no longer present in our log.
1038+
//
1039+
// Right now, we only handle case (1) below.
1040+
let leader_previous = &request.previous.ok_or(RaftError::missing("previous"))?;
1041+
let expected_next_index = state.store.next_index();
1042+
1043+
// The Raft paper says to return false if we can't check the exact (term, index)
1044+
// match of the leader's "previous", but we can't do that for case (2).
1045+
let conflict = state.store.conflict(leader_previous);
1046+
1047+
// Either way, we inform the leader that we were unable to append, and the leader will
1048+
// try again with an earlier "previous", which will hopefully match.
1049+
if conflict || leader_previous.index >= expected_next_index {
10391050
return Ok(Response::new(AppendResponse {
10401051
term,
10411052
success: false,
1042-
next: next_index,
1053+
next: expected_next_index,
10431054
}));
10441055
}
10451056

@@ -1408,11 +1419,11 @@ mod tests {
14081419
let append_request = AppendRequest {
14091420
term: 12,
14101421
leader: Some(leader.clone()),
1411-
previous: Some(entry_id(-1, -1)),
1422+
previous: Some(entry_id(0, 0)),
14121423
entries: vec![
1413-
entry(entry_id(8, 0), Vec::new()),
14141424
entry(entry_id(8, 1), Vec::new()),
14151425
entry(entry_id(8, 2), Vec::new()),
1426+
entry(entry_id(8, 3), Vec::new()),
14161427
],
14171428
committed: 0,
14181429
};
@@ -1432,16 +1443,16 @@ mod tests {
14321443
let state = raft_state.lock().await;
14331444
assert_eq!(state.current_leader(), Some(leader.clone()));
14341445
assert_eq!(state.term(), 12);
1435-
assert!(!state.store.is_index_compacted(0)); // Not compacted
1436-
assert_eq!(state.store.next_index(), 3);
1446+
assert!(!state.store.is_index_compacted(1)); // Not compacted
1447+
assert_eq!(state.store.next_index(), 4);
14371448
}
14381449

14391450
// Run a compaction, should have no effect
14401451
{
14411452
let mut state = raft_state.lock().await;
14421453
state.store.try_compact().await;
1443-
assert!(!state.store.is_index_compacted(0)); // Not compacted
1444-
assert_eq!(state.store.next_index(), 3);
1454+
assert!(!state.store.is_index_compacted(1)); // Not compacted
1455+
assert_eq!(state.store.next_index(), 4);
14451456
}
14461457

14471458
// Now send an append request with a payload large enough to trigger compaction.
@@ -1450,14 +1461,14 @@ mod tests {
14501461
let append_request_2 = AppendRequest {
14511462
term: 12,
14521463
leader: Some(leader.clone()),
1453-
previous: Some(entry_id(8, 2)),
1464+
previous: Some(entry_id(8, 3)),
14541465
entries: vec![entry(
1455-
entry_id(8, 3),
1466+
entry_id(8, 4),
14561467
vec![0; 2 * compaction_bytes as usize],
14571468
)],
14581469
// This tells the follower that the entries are committed (only committed
14591470
// entries are eligible for compaction).
1460-
committed: 3,
1471+
committed: 4,
14611472
};
14621473

14631474
let append_response_2 = client
@@ -1471,16 +1482,16 @@ mod tests {
14711482
assert!(append_response_2.success);
14721483
{
14731484
let state = raft_state.lock().await;
1474-
assert!(!state.store.is_index_compacted(0)); // Not compacted
1475-
assert_eq!(state.store.next_index(), 4); // New entry incorporated
1485+
assert!(!state.store.is_index_compacted(1)); // Not compacted
1486+
assert_eq!(state.store.next_index(), 5); // New entry incorporated
14761487
}
14771488

14781489
// Run a compaction, this one should actually compact things now.
14791490
{
14801491
let mut state = raft_state.lock().await;
14811492
state.store.try_compact().await;
1482-
assert!(state.store.is_index_compacted(0)); // Compacted
1483-
assert_eq!(state.store.get_latest_snapshot().last.index, 3)
1493+
assert!(state.store.is_index_compacted(1)); // Compacted
1494+
assert_eq!(state.store.get_latest_snapshot().last.index, 4)
14841495
}
14851496
}
14861497

src/raft/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ pub enum RaftError {
3030
#[error("Internal error: {0}")]
3131
Internal(String),
3232

33+
#[error("Non-contiguous log entries supplied. Expected index {expected}, got {actual}")]
34+
NonContiguousLog { expected: i64, actual: i64 },
35+
3336
#[error("Stale term state")]
3437
StaleState,
3538
}

0 commit comments

Comments
 (0)