Skip to content

Commit f7e7fed

Browse files
committed
Simplify the log contains concept to be just a boolean
1 parent da42026 commit f7e7fed

File tree

3 files changed

+226
-86
lines changed

3 files changed

+226
-86
lines changed

src/raft/consensus.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
1717

1818
use diagnostics::ServerDiagnostics;
1919
use raft::StateMachine;
20-
use raft::log::ContainsResult;
2120
use raft::raft_common_proto::EntryId;
2221
use raft::raft_service_proto::{AppendRequest, AppendResponse, VoteRequest, VoteResponse};
2322
use raft::raft_service_proto::{CommitRequest, CommitResponse, StepDownRequest, StepDownResponse};
@@ -1006,18 +1005,29 @@ impl Raft for RaftImpl {
10061005
let next_term = term + 1;
10071006
state.reset_follower_timer(self.state.clone(), next_term);
10081007

1009-
// Make sure we have the previous log index sent. Note that COMPACTED
1010-
// can happen whenever we have no entries (e.g.,initially or just after
1011-
// a snapshot install).
1012-
let previous = &request.previous.ok_or(RaftError::missing("previous"))?;
1013-
let next_index = state.store.next_index();
1014-
if state.store.log_contains(previous) == ContainsResult::ABSENT {
1015-
// Let the leader know that this entry is too far in the future, so
1016-
// it can try again from with earlier index.
1008+
// Make sure we have the previous log index sent.
1009+
//
1010+
// This can be false in two cases:
1011+
// 1) The leader's "previous" is too far in the future, i.e., the index is larger
1012+
// than the next index we expect to append to our log.
1013+
// 2) The leader's "previous" is too far in the past, i.e., the entry has been
1014+
// compacted into our snapshot, and is no longer present in our log.
1015+
//
1016+
// Right now, we only handle case (1) below.
1017+
let leader_previous = &request.previous.ok_or(RaftError::missing("previous"))?;
1018+
let expected_next_index = state.store.next_index();
1019+
1020+
// The Raft paper says to return false if we can't check the exact (term, index)
1021+
// match of the leader's "previous", but we can't do that for case (2).
1022+
let conflict = state.store.conflict(leader_previous);
1023+
1024+
// Either way, we inform the leader that we were unable to append, and the leader will
1025+
// try again with an earlier "previous", which will hopefully match.
1026+
if conflict || leader_previous.index >= expected_next_index {
10171027
return Ok(Response::new(AppendResponse {
10181028
term,
10191029
success: false,
1020-
next: next_index,
1030+
next: expected_next_index,
10211031
}));
10221032
}
10231033

src/raft/log.rs

Lines changed: 127 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,6 @@ pub struct LogSlice {
1515
previous_id: EntryId,
1616
}
1717

18-
// The possible outcomes of asking a log slice whether an entry id is
19-
// contained inside the slice.
20-
#[derive(Debug, Clone, PartialEq)]
21-
pub enum ContainsResult {
22-
// Indicates that the beginning of the slice has advanced past an entry.
23-
COMPACTED,
24-
25-
// Indicates that an entry is present in the log slice.
26-
PRESENT,
27-
28-
// Indicates that an entry with this index has yet to be added.
29-
ABSENT,
30-
}
31-
3218
impl LogSlice {
3319
// Returns a new instance with the given entries. The "previous_id" parameter
3420
// holds the id of the last element in the log *not present* in this slice,
@@ -120,21 +106,20 @@ impl LogSlice {
120106
other_last.index >= this_last.index
121107
}
122108

123-
// Returns true if the supplied entry is present in this slice.
124-
pub fn contains(&self, query: &EntryId) -> ContainsResult {
109+
// Returns true if the supplied entry id is present in this slice. Both the term and
110+
// the index need to match for the result to be true.
111+
fn contains(&self, query: &EntryId) -> bool {
125112
if query.index <= self.previous_id.index {
126-
assert!(query.term <= self.previous_id.term);
127-
return ContainsResult::COMPACTED;
113+
return false;
128114
}
129115

130116
let idx = self.local_index(query.index);
131117
if idx >= self.entries.len() {
132-
return ContainsResult::ABSENT;
118+
return false;
133119
}
134120

135-
let &entry_id = &self.entries[idx as usize].id.as_ref().expect("id");
136-
assert!(entry_id == query);
137-
return ContainsResult::PRESENT;
121+
let &entry_id = &self.entries[idx].id.as_ref().expect("id");
122+
entry_id == query
138123
}
139124

140125
// Returns true if the supplied index lies before the range of entries present
@@ -153,7 +138,7 @@ impl LogSlice {
153138
return Some(entry.clone());
154139
}
155140
}
156-
return None;
141+
None
157142
}
158143

159144
// Adds the supplied entries to the end of the slice. Any conflicting
@@ -216,6 +201,25 @@ impl LogSlice {
216201
result
217202
}
218203

204+
// Returns true if the supplied entry ID represents a conflict, i.e., if we
205+
// have an entry with the supplied index, and a different term.
206+
pub fn conflict(&self, id: &EntryId) -> bool {
207+
let idx = id.index;
208+
209+
// Before our slice, no conflict to report.
210+
if idx < self.previous_id.index {
211+
return false;
212+
}
213+
214+
// After the last entry we have, also no conflict to report.
215+
if idx >= self.next_index() {
216+
return false;
217+
}
218+
219+
// Remaining case is that it's within our range. Resolve and check the term.
220+
self.id_at(idx).term != id.term
221+
}
222+
219223
// Returns the entry id for the entry at the supplied index. Must only be
220224
// called if the index is known to this slice.
221225
//
@@ -247,14 +251,22 @@ impl LogSlice {
247251
// Removes all entries up to and including the supplied id. Once this
248252
// returns, this instance starts immediately after the supplied id.
249253
pub fn prune_until(&mut self, entry_id: &EntryId) {
250-
assert_eq!(self.contains(entry_id), ContainsResult::PRESENT);
251-
252-
// We need to add 1 because the "drain()" call below is exclusive, but we want this
253-
// drain to be inclusive of the supplied entry.
254-
let local_idx = self.local_index(entry_id.index) + 1;
254+
assert!(
255+
entry_id.index >= self.previous_id.index,
256+
"Cannot prune into the past"
257+
);
255258

256-
for entry in self.entries.drain(0..local_idx) {
257-
self.size_bytes -= size_bytes(&entry);
259+
if self.contains(entry_id) {
260+
// We need to add 1 because the "drain()" call below is exclusive, but we want this
261+
// drain to be inclusive of the supplied entry.
262+
let local_idx = self.local_index(entry_id.index) + 1;
263+
for entry in self.entries.drain(0..local_idx) {
264+
self.size_bytes -= size_bytes(&entry);
265+
}
266+
} else {
267+
// Entry is not present, we just remove everything.
268+
self.entries.clear();
269+
self.size_bytes = 0;
258270
}
259271
self.previous_id = entry_id.clone();
260272
}
@@ -300,7 +312,7 @@ mod tests {
300312
#[test]
301313
fn test_initial() {
302314
let l = create_initial_slice();
303-
assert_eq!(l.contains(&entry_id(0, 0)), ContainsResult::COMPACTED);
315+
assert!(!l.contains(&entry_id(0, 0)));
304316

305317
let after = l.get_entries_after(&entry_id(0, 0));
306318
assert!(after.is_empty());
@@ -360,20 +372,50 @@ mod tests {
360372

361373
#[test]
362374
fn test_contains() {
363-
let l = create_default_slice();
375+
let l = create_default_slice(); // Contains entries with indices 1 through 6. Previous is (0,0).
376+
377+
// Case 1: Entry is equal to "previous".
378+
// The log slice does not "contain" the previous entry, but starts *after* it.
379+
assert!(!l.contains(&entry_id(0, 0)), "Should not contain previous_id");
380+
381+
// Case 2: Absent because index is too low (before previous_id).
382+
assert!(!l.contains(&entry_id(0, -5)), "Should not contain index too low");
383+
384+
// Case 3: Present and matching.
385+
assert!(l.contains(&entry_id(73, 3)), "Should contain existing entry (73, 3)");
386+
assert!(l.contains(&entry_id(73, 4)), "Should contain existing entry (73, 4)");
364387

365-
// Special sentinel case.
366-
assert_eq!(ContainsResult::COMPACTED, l.contains(&entry_id(0, 0)));
388+
// Case 4: Absent because index is too high (beyond next_index).
389+
let next = l.next_index(); // next_index is 7
390+
assert_eq!(7, next);
391+
assert!(!l.contains(&entry_id(95, next)), "Should not contain index too high (next_index)");
392+
assert!(!l.contains(&entry_id(12, next + 5)), "Should not contain index way too high");
393+
394+
// Case 5: Entry present, but term doesn't match. `contains` expects both term and index to match.
395+
// assert!(!l.contains(&entry_id(1, 1)), "Should NOT contain existing index with wrong term (71 vs 1)");
396+
}
397+
398+
#[test]
399+
fn test_conflict() {
400+
let l = create_default_slice(); // Contains entries with indices 1 through 6. Previous is (0,0).
367401

368-
// Check a few existing entries.
369-
assert_eq!(ContainsResult::PRESENT, l.contains(&entry_id(73, 3)));
370-
assert_eq!(ContainsResult::PRESENT, l.contains(&entry_id(73, 4)));
402+
// Case 1: Entry is equal to "previous_id.index".
403+
assert!(!l.conflict(&entry_id(0, 0)), "No conflict for previous_id.index");
371404

372-
// First index not present.
373-
let next = l.next_index();
405+
// Case 2: Absent because index is too low (before previous_id.index).
406+
assert!(!l.conflict(&entry_id(0, -5)), "No conflict for index too low");
407+
408+
// Case 3: Absent because index is too high (at or beyond next_index).
409+
let next = l.next_index(); // next_index is 7
374410
assert_eq!(7, next);
375-
assert_eq!(ContainsResult::ABSENT, l.contains(&entry_id(95, next)));
376-
assert_eq!(ContainsResult::ABSENT, l.contains(&entry_id(12, next)));
411+
assert!(!l.conflict(&entry_id(95, next)), "No conflict for index too high (next_index)");
412+
assert!(!l.conflict(&entry_id(12, next + 5)), "No conflict for index way too high");
413+
414+
// Case 4: No conflict, present and matching.
415+
assert!(!l.conflict(&entry_id(71, 1)), "No conflict, terms and index match");
416+
417+
// Case 5: Conflict, present with different term.
418+
assert!(l.conflict(&entry_id(70, 1)), "Conflict, index matches but term differs");
377419
}
378420

379421
#[test]
@@ -465,13 +507,52 @@ mod tests {
465507

466508
#[test]
467509
fn test_prune_until_all_entries() {
468-
let mut l = create_default_slice();
469-
assert_eq!(l.next_index(), 7);
470-
assert_eq!(l.size_bytes(), 6);
510+
{
511+
let mut l = create_default_slice();
512+
assert_eq!(l.next_index(), 7);
513+
assert_eq!(l.size_bytes(), 6);
514+
515+
l.prune_until(&entry_id(74, 5));
516+
assert_eq!(l.next_index(), 6);
517+
assert_eq!(l.size_bytes(), 0);
518+
}
471519

472-
l.prune_until(&entry_id(74, 6));
473-
assert_eq!(l.next_index(), 7);
520+
{
521+
let original_previous = entry_id(12, 189);
522+
let mut l2 = LogSlice::new_empty(original_previous.clone());
523+
l2.append(12 /* term */, Payload(Vec::new()));
524+
l2.append(13 /* term */, Payload(Vec::new()));
525+
assert_eq!(l2.next_index(), 192);
526+
527+
l2.prune_until(&original_previous);
528+
assert_eq!(l2.next_index(), 190);
529+
}
530+
}
531+
532+
// Tests that prune_until ends up emptying the slice if the requested
533+
// stopping entry is not present at all.
534+
#[test]
535+
fn test_prune_until_no_match() {
536+
let mut l = LogSlice::new_empty(EntryId {
537+
term: 22,
538+
index: 91,
539+
});
540+
assert_eq!(l.next_index(), 92);
541+
542+
l.append(26 /* term */, Payload(Vec::new()));
543+
l.append(26 /* term */, Payload(Vec::new()));
544+
l.append(26 /* term */, Payload(Vec::new()));
545+
assert_eq!(l.next_index(), 95);
546+
547+
// Prune until an entry with an index we know about, but the wrong term. Should
548+
// clear the slice entirely.
549+
l.prune_until(&EntryId {
550+
term: 16,
551+
index: 93,
552+
});
474553
assert_eq!(l.size_bytes(), 0);
554+
assert!(l.entries.is_empty());
555+
assert_eq!(l.next_index(), 94);
475556
}
476557

477558
#[test]

0 commit comments

Comments
 (0)