Skip to content

Commit f737667

Browse files
authored
Make request_snapshot more safer (#499)
close #498 Signed-off-by: Wenbo Zhang <ethercflow@gmail.com>
1 parent 5ce52b4 commit f737667

6 files changed

Lines changed: 62 additions & 55 deletions

File tree

harness/tests/integration_cases/test_raft.rs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4860,11 +4860,7 @@ fn test_follower_request_snapshot() {
48604860
let prev_snapshot_idx = s.get_metadata().index;
48614861
let request_idx = nt.peers[&1].raft_log.committed;
48624862
assert!(prev_snapshot_idx < request_idx);
4863-
nt.peers
4864-
.get_mut(&2)
4865-
.unwrap()
4866-
.request_snapshot(request_idx)
4867-
.unwrap();
4863+
nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();
48684864

48694865
// Send the request snapshot message.
48704866
let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap();
@@ -4909,11 +4905,7 @@ fn test_request_snapshot_unavailable() {
49094905
let prev_snapshot_idx = s.get_metadata().index;
49104906
let request_idx = nt.peers[&1].raft_log.committed;
49114907
assert!(prev_snapshot_idx < request_idx);
4912-
nt.peers
4913-
.get_mut(&2)
4914-
.unwrap()
4915-
.request_snapshot(request_idx)
4916-
.unwrap();
4908+
nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();
49174909

49184910
// Send the request snapshot message.
49194911
let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap();
@@ -4966,12 +4958,7 @@ fn test_request_snapshot_matched_change() {
49664958
nt.peers.get_mut(&2).unwrap().raft_log.committed -= 1;
49674959

49684960
// Request the latest snapshot.
4969-
let request_idx = nt.peers[&2].raft_log.committed;
4970-
nt.peers
4971-
.get_mut(&2)
4972-
.unwrap()
4973-
.request_snapshot(request_idx)
4974-
.unwrap();
4961+
nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();
49754962
let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap();
49764963
// The request snapshot is ignored because it is considered as out of order.
49774964
nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap();
@@ -5015,12 +5002,7 @@ fn test_request_snapshot_none_replicate() {
50155002
.state = ProgressState::Probe;
50165003

50175004
// Request the latest snapshot.
5018-
let request_idx = nt.peers[&2].raft_log.committed;
5019-
nt.peers
5020-
.get_mut(&2)
5021-
.unwrap()
5022-
.request_snapshot(request_idx)
5023-
.unwrap();
5005+
nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();
50245006
let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap();
50255007
nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap();
50265008
assert!(nt.peers[&1].prs().get(2).unwrap().pending_request_snapshot != 0);
@@ -5042,12 +5024,7 @@ fn test_request_snapshot_step_down() {
50425024

50435025
// Recover and request the latest snapshot.
50445026
nt.recover();
5045-
let request_idx = nt.peers[&2].raft_log.committed;
5046-
nt.peers
5047-
.get_mut(&2)
5048-
.unwrap()
5049-
.request_snapshot(request_idx)
5050-
.unwrap();
5027+
nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();
50515028
nt.send(vec![new_message(3, 3, MessageType::MsgBeat, 0)]);
50525029
assert!(
50535030
nt.peers[&2].pending_request_snapshot == INVALID_INDEX,
@@ -5061,12 +5038,7 @@ fn test_request_snapshot_step_down() {
50615038
fn test_request_snapshot_on_role_change() {
50625039
let (mut nt, _) = prepare_request_snapshot();
50635040

5064-
let request_idx = nt.peers[&2].raft_log.committed;
5065-
nt.peers
5066-
.get_mut(&2)
5067-
.unwrap()
5068-
.request_snapshot(request_idx)
5069-
.unwrap();
5041+
nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();
50705042

50715043
// Becoming follower does not reset pending_request_snapshot.
50725044
let (term, id) = (nt.peers[&1].term, nt.peers[&1].id);
@@ -5086,6 +5058,28 @@ fn test_request_snapshot_on_role_change() {
50865058
);
50875059
}
50885060

5061+
// Abort request snapshot if term change.
5062+
#[test]
5063+
fn test_request_snapshot_after_term_change() {
5064+
let (mut nt, _) = prepare_request_snapshot();
5065+
5066+
nt.peers.get_mut(&2).unwrap().request_snapshot().unwrap();
5067+
5068+
assert!(
5069+
nt.peers[&2].pending_request_snapshot != INVALID_INDEX,
5070+
"{}",
5071+
nt.peers[&2].pending_request_snapshot
5072+
);
5073+
5074+
let term = nt.peers[&1].term;
5075+
nt.peers.get_mut(&2).unwrap().reset(term + 1);
5076+
assert!(
5077+
nt.peers[&2].pending_request_snapshot == INVALID_INDEX,
5078+
"{}",
5079+
nt.peers[&2].pending_request_snapshot
5080+
);
5081+
}
5082+
50895083
/// Tests group commit.
50905084
///
50915085
/// 1. Logs should be replicated to at least different groups before committed;

harness/tests/integration_cases/test_raft_snap.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,16 @@ fn test_request_snapshot() {
160160

161161
// Raft can not step request snapshot if there is no leader.
162162
assert_eq!(
163-
sm.raft
164-
.as_mut()
165-
.unwrap()
166-
.request_snapshot(INVALID_INDEX + 1)
167-
.unwrap_err(),
163+
sm.raft.as_mut().unwrap().request_snapshot().unwrap_err(),
164+
Error::RequestSnapshotDropped
165+
);
166+
167+
let term = sm.term;
168+
sm.become_follower(term + 1, 2);
169+
170+
// Raft can not step request snapshot if last raft log's term mismatch current term.
171+
assert_eq!(
172+
sm.raft.as_mut().unwrap().request_snapshot().unwrap_err(),
168173
Error::RequestSnapshotDropped
169174
);
170175

@@ -173,11 +178,7 @@ fn test_request_snapshot() {
173178

174179
// Raft can not step request snapshot if itself is a leader.
175180
assert_eq!(
176-
sm.raft
177-
.as_mut()
178-
.unwrap()
179-
.request_snapshot(INVALID_INDEX + 1)
180-
.unwrap_err(),
181+
sm.raft.as_mut().unwrap().request_snapshot().unwrap_err(),
181182
Error::RequestSnapshotDropped
182183
);
183184

src/raft.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2407,7 +2407,7 @@ impl<T: Storage> Raft<T> {
24072407
}
24082408

24092409
/// Request a snapshot from a leader.
2410-
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
2410+
pub fn request_snapshot(&mut self) -> Result<()> {
24112411
if self.state == StateRole::Leader {
24122412
info!(
24132413
self.logger,
@@ -2416,7 +2416,7 @@ impl<T: Storage> Raft<T> {
24162416
} else if self.leader_id == INVALID_ID {
24172417
info!(
24182418
self.logger,
2419-
"drop request snapshot because of no leader";
2419+
"no leader; dropping request snapshot";
24202420
"term" => self.term,
24212421
);
24222422
} else if self.snap().is_some() {
@@ -2430,9 +2430,19 @@ impl<T: Storage> Raft<T> {
24302430
"there is a pending snapshot; dropping request snapshot";
24312431
);
24322432
} else {
2433-
self.pending_request_snapshot = request_index;
2434-
self.send_request_snapshot();
2435-
return Ok(());
2433+
let request_index = self.raft_log.last_index();
2434+
let request_index_term = self.raft_log.term(request_index).unwrap();
2435+
if self.term == request_index_term {
2436+
self.pending_request_snapshot = request_index;
2437+
self.send_request_snapshot();
2438+
return Ok(());
2439+
}
2440+
info! {
2441+
self.logger,
2442+
"mismatched term; dropping request snapshot";
2443+
"term" => self.term,
2444+
"last_term" => request_index_term,
2445+
};
24362446
}
24372447
Err(Error::RequestSnapshotDropped)
24382448
}
@@ -2837,6 +2847,7 @@ impl<T: Storage> Raft<T> {
28372847
m.reject_hint = self.raft_log.last_index();
28382848
m.to = self.leader_id;
28392849
m.request_snapshot = self.pending_request_snapshot;
2850+
m.log_term = self.raft_log.term(m.reject_hint).unwrap();
28402851
self.r.send(m, &mut self.msgs);
28412852
}
28422853

src/raft_log.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -828,12 +828,12 @@ mod test {
828828
for i in 1..=unstable_index {
829829
storage
830830
.wl()
831-
.append(&[new_entry(i as u64, i as u64)])
831+
.append(&[new_entry(i, i)])
832832
.expect("append failed");
833833
}
834834
let mut raft_log = RaftLog::new(storage, default_logger());
835835
for i in unstable_index..last_index {
836-
raft_log.append(&[new_entry(i as u64 + 1, i as u64 + 1)]);
836+
raft_log.append(&[new_entry(i + 1, i + 1)]);
837837
}
838838
assert!(
839839
raft_log.maybe_commit(last_index, last_term),

src/raw_node.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -743,9 +743,10 @@ impl<T: Storage> RawNode<T> {
743743
}
744744

745745
/// Request a snapshot from a leader.
746-
/// The snapshot's index must be greater or equal to the request_index.
747-
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
748-
self.raft.request_snapshot(request_index)
746+
/// The snapshot's index must be greater or equal to the request_index (last_index) or
747+
/// the leader's term must be greater than the request term (last_index's term).
748+
pub fn request_snapshot(&mut self) -> Result<()> {
749+
self.raft.request_snapshot()
749750
}
750751

751752
/// TransferLeader tries to transfer leadership to the given transferee.

src/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ mod test {
538538
}
539539

540540
fn size_of<T: PbMessage>(m: &T) -> u32 {
541-
m.compute_size() as u32
541+
m.compute_size()
542542
}
543543

544544
fn new_snapshot(index: u64, term: u64, voters: Vec<u64>) -> Snapshot {

0 commit comments

Comments
 (0)