Skip to content

Commit feda921

Browse files
committed
Merge branch 'master' into persistence-2-consensus
2 parents 30d043a + 386ff7b commit feda921

File tree

6 files changed

+401
-253
lines changed

6 files changed

+401
-253
lines changed

src/harness.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use futures::Future;
77
use futures::future::join_all;
88
use std::env;
99
use std::error::Error;
10+
use std::fs::remove_dir_all;
11+
use std::path::PathBuf;
1012
use std::pin::Pin;
1113
use std::sync::Arc;
1214
use std::time::Duration;
@@ -34,6 +36,7 @@ pub struct Harness {
3436
instances: Vec<Instance>,
3537
diagnostics: Arc<Mutex<Diagnostics>>,
3638
failures: Arc<Mutex<FailureOptions>>,
39+
persistence_dir: String,
3740
}
3841

3942
// Used to capture the intermediate state while building a harness. This is necessary
@@ -62,20 +65,22 @@ impl HarnessBuilder {
6265
let mut instances = Vec::new();
6366

6467
let raft_options = self.options;
68+
69+
// Persist at something like /tmp/concord/<cluster>
70+
let cluster_dir: PathBuf = env::temp_dir()
71+
.as_path()
72+
.join("concord")
73+
.join(&self.cluster_name);
74+
6575
for bound in self.bound {
6676
let (address, listener) = (bound.server, bound.listener);
6777

68-
// Use something like /tmp/concord/<cluster>/<server> for persistence
69-
let persistence_path = env::temp_dir()
70-
.as_path()
71-
.join("concord")
72-
.join(&self.cluster_name)
73-
.join(&address.name);
78+
// Use <cluster-dir>/<server-name> for persistence
79+
let instance_dir = cluster_dir.clone().join(&address.name);
7480

75-
let options = raft_options.clone().with_persistence(
76-
persistence_path.to_str().unwrap().as_ref(),
77-
wipe_persistence,
78-
);
81+
let options = raft_options
82+
.clone()
83+
.with_persistence(instance_dir.to_str().unwrap().as_ref(), wipe_persistence);
7984

8085
let (instance, future) = Instance::new(
8186
&address,
@@ -100,6 +105,7 @@ impl HarnessBuilder {
100105
instances,
101106
diagnostics: diag,
102107
failures,
108+
persistence_dir: cluster_dir.into_os_string().into_string().unwrap(),
103109
};
104110
Ok((harness, future))
105111
}
@@ -230,6 +236,13 @@ impl Harness {
230236
}
231237
}
232238

239+
// Removes the directory in which the cluster's state lives (and all children).
240+
pub async fn wipe_persistence(&self) {
241+
let dir = self.persistence_dir.clone();
242+
assert!(!dir.is_empty());
243+
remove_dir_all(dir).expect("remove_dir_all");
244+
}
245+
233246
// Repeatedly makes KV requests until the supplied key has a value. Returns the result.
234247
#[cfg(test)]
235248
pub async fn wait_for_key(&self, key: &[u8], timeout_duration: Duration) -> GetResponse {

src/integration_test.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::harness::Harness;
22
use crate::keyvalue::keyvalue_proto::PutRequest;
33
use crate::raft::Options;
44
use crate::raft::raft_common_proto::Server;
5+
use rand::{Rng, distributions::Alphanumeric};
56
use std::time::Duration;
67

78
const TIMEOUT: Duration = Duration::from_secs(3);
@@ -15,6 +16,7 @@ async fn test_start_and_elect_leader() {
1516

1617
harness.validate().await;
1718
harness.stop().await;
19+
harness.wipe_persistence().await;
1820
}
1921

2022
#[tokio::test]
@@ -28,6 +30,7 @@ async fn test_start_and_elect_leader_many_nodes() {
2830

2931
harness.validate().await;
3032
harness.stop().await;
33+
harness.wipe_persistence().await;
3134
}
3235

3336
#[tokio::test]
@@ -58,6 +61,7 @@ async fn test_disconnect_leader() {
5861

5962
harness.validate().await;
6063
harness.stop().await;
64+
harness.wipe_persistence().await;
6165
}
6266

6367
#[tokio::test]
@@ -72,6 +76,7 @@ async fn test_commit() {
7276

7377
harness.validate().await;
7478
harness.stop().await;
79+
harness.wipe_persistence().await;
7580
}
7681

7782
#[tokio::test]
@@ -100,6 +105,7 @@ async fn test_reconfigure_cluster() {
100105

101106
harness.validate().await;
102107
harness.stop().await;
108+
harness.wipe_persistence().await;
103109
}
104110

105111
#[tokio::test]
@@ -121,6 +127,10 @@ async fn test_keyvalue() {
121127

122128
assert_eq!(&entry.key, &k1);
123129
assert_eq!(&entry.value, &v1);
130+
131+
harness.validate().await;
132+
harness.stop().await;
133+
harness.wipe_persistence().await;
124134
}
125135

126136
#[tokio::test]
@@ -158,6 +168,7 @@ async fn test_snapshotting() {
158168

159169
harness.validate().await;
160170
harness.stop().await;
171+
harness.wipe_persistence().await;
161172
}
162173

163174
// Convenience method that returns a matcher for terms greater than a value.
@@ -169,8 +180,18 @@ async fn make_harness(nodes: &[&str]) -> Harness {
169180
make_harness_with_options(nodes, None).await
170181
}
171182

183+
fn make_name() -> String {
184+
let suffix: String = rand::thread_rng()
185+
.sample_iter(&Alphanumeric)
186+
.filter(|c| c.is_ascii_lowercase())
187+
.take(4)
188+
.map(char::from)
189+
.collect();
190+
format!("{}-{}", CLUSTER_NAME, suffix)
191+
}
192+
172193
async fn make_harness_with_options(nodes: &[&str], options: Option<Options>) -> Harness {
173-
let mut builder = Harness::builder(CLUSTER_NAME, nodes)
194+
let mut builder = Harness::builder(make_name().as_str(), nodes)
174195
.await
175196
.expect("builder");
176197

src/raft/consensus.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,18 +1028,29 @@ impl Raft for RaftImpl {
10281028
let next_term = term + 1;
10291029
state.reset_follower_timer(self.state.clone(), next_term);
10301030

1031-
// Make sure we have the previous log index sent. Note that COMPACTED
1032-
// can happen whenever we have no entries (e.g.,initially or just after
1033-
// a snapshot install).
1034-
let previous = &request.previous.ok_or(RaftError::missing("previous"))?;
1035-
let next_index = state.store.next_index();
1036-
if previous.index >= next_index {
1037-
// Let the leader know that this entry is too far in the future, so
1038-
// it can try again from an earlier index.
1031+
// Make sure we have the previous log index sent.
1032+
//
1033+
// This can be false in two cases:
1034+
// 1) The leader's "previous" is too far in the future, i.e., the index is larger
1035+
// than the next index we expect to append to our log.
1036+
// 2) The leader's "previous" is too far in the past, i.e., the entry has been
1037+
// compacted into our snapshot, and is no longer present in our log.
1038+
//
1039+
// Right now, we only handle case (1) below.
1040+
let leader_previous = &request.previous.ok_or(RaftError::missing("previous"))?;
1041+
let expected_next_index = state.store.next_index();
1042+
1043+
// The Raft paper says to return false if we can't check the exact (term, index)
1044+
// match of the leader's "previous", but we can't do that for case (2).
1045+
let conflict = state.store.conflict(leader_previous);
1046+
1047+
// Either way, we inform the leader that we were unable to append, and the leader will
1048+
// try again with an earlier "previous", which will hopefully match.
1049+
if conflict || leader_previous.index >= expected_next_index {
10391050
return Ok(Response::new(AppendResponse {
10401051
term,
10411052
success: false,
1042-
next: next_index,
1053+
next: expected_next_index,
10431054
}));
10441055
}
10451056

@@ -1408,11 +1419,11 @@ mod tests {
14081419
let append_request = AppendRequest {
14091420
term: 12,
14101421
leader: Some(leader.clone()),
1411-
previous: Some(entry_id(-1, -1)),
1422+
previous: Some(entry_id(0, 0)),
14121423
entries: vec![
1413-
entry(entry_id(8, 0), Vec::new()),
14141424
entry(entry_id(8, 1), Vec::new()),
14151425
entry(entry_id(8, 2), Vec::new()),
1426+
entry(entry_id(8, 3), Vec::new()),
14161427
],
14171428
committed: 0,
14181429
};
@@ -1432,16 +1443,16 @@ mod tests {
14321443
let state = raft_state.lock().await;
14331444
assert_eq!(state.current_leader(), Some(leader.clone()));
14341445
assert_eq!(state.term(), 12);
1435-
assert!(!state.store.is_index_compacted(0)); // Not compacted
1436-
assert_eq!(state.store.next_index(), 3);
1446+
assert!(!state.store.is_index_compacted(1)); // Not compacted
1447+
assert_eq!(state.store.next_index(), 4);
14371448
}
14381449

14391450
// Run a compaction, should have no effect
14401451
{
14411452
let mut state = raft_state.lock().await;
14421453
state.store.try_compact().await;
1443-
assert!(!state.store.is_index_compacted(0)); // Not compacted
1444-
assert_eq!(state.store.next_index(), 3);
1454+
assert!(!state.store.is_index_compacted(1)); // Not compacted
1455+
assert_eq!(state.store.next_index(), 4);
14451456
}
14461457

14471458
// Now send an append request with a payload large enough to trigger compaction.
@@ -1450,14 +1461,14 @@ mod tests {
14501461
let append_request_2 = AppendRequest {
14511462
term: 12,
14521463
leader: Some(leader.clone()),
1453-
previous: Some(entry_id(8, 2)),
1464+
previous: Some(entry_id(8, 3)),
14541465
entries: vec![entry(
1455-
entry_id(8, 3),
1466+
entry_id(8, 4),
14561467
vec![0; 2 * compaction_bytes as usize],
14571468
)],
14581469
// This tells the follower that the entries are committed (only committed
14591470
// entries are eligible for compaction).
1460-
committed: 3,
1471+
committed: 4,
14611472
};
14621473

14631474
let append_response_2 = client
@@ -1471,16 +1482,16 @@ mod tests {
14711482
assert!(append_response_2.success);
14721483
{
14731484
let state = raft_state.lock().await;
1474-
assert!(!state.store.is_index_compacted(0)); // Not compacted
1475-
assert_eq!(state.store.next_index(), 4); // New entry incorporated
1485+
assert!(!state.store.is_index_compacted(1)); // Not compacted
1486+
assert_eq!(state.store.next_index(), 5); // New entry incorporated
14761487
}
14771488

14781489
// Run a compaction, this one should actually compact things now.
14791490
{
14801491
let mut state = raft_state.lock().await;
14811492
state.store.try_compact().await;
1482-
assert!(state.store.is_index_compacted(0)); // Compacted
1483-
assert_eq!(state.store.get_latest_snapshot().last.index, 3)
1493+
assert!(state.store.is_index_compacted(1)); // Compacted
1494+
assert_eq!(state.store.get_latest_snapshot().last.index, 4)
14841495
}
14851496
}
14861497

src/raft/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ pub enum RaftError {
3030
#[error("Internal error: {0}")]
3131
Internal(String),
3232

33+
#[error("Non-contiguous log entries supplied. Expected index {expected}, got {actual}")]
34+
NonContiguousLog { expected: i64, actual: i64 },
35+
3336
#[error("Stale term state")]
3437
StaleState,
3538
}

0 commit comments

Comments
 (0)