Skip to content

Commit 30d043a

Browse files
committed
Merge branch 'master' into persistence-2-consensus
2 parents 7375302 + c90ebfa commit 30d043a

File tree

5 files changed

+46
-60
lines changed

5 files changed

+46
-60
lines changed

src/harness.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::raft::{
3030
// production deployment, these participants might be on different actual machines,
3131
// but the harness manages all of them in a single process for convenience.
3232
pub struct Harness {
33+
cluster_name: String,
3334
instances: Vec<Instance>,
3435
diagnostics: Arc<Mutex<Diagnostics>>,
3536
failures: Arc<Mutex<FailureOptions>>,
@@ -40,6 +41,7 @@ pub struct Harness {
4041
// addresses, and then start the underlying services (which require knowledge of the
4142
// ports for the other participants).
4243
pub struct HarnessBuilder {
44+
cluster_name: String,
4345
bound: Vec<BoundAddress>,
4446
failure: FailureOptions,
4547
options: Options,
@@ -50,7 +52,6 @@ impl HarnessBuilder {
5052
// of the serving process for all instances managed by the harness.
5153
pub async fn build(
5254
self,
53-
cluster_name: &str,
5455
wipe_persistence: bool,
5556
) -> Result<(Harness, Pin<Box<dyn Future<Output = ()> + Send>>), Box<dyn Error>> {
5657
let diag = Arc::new(Mutex::new(Diagnostics::new()));
@@ -68,7 +69,7 @@ impl HarnessBuilder {
6869
let persistence_path = env::temp_dir()
6970
.as_path()
7071
.join("concord")
71-
.join(&cluster_name)
72+
.join(&self.cluster_name)
7273
.join(&address.name);
7374

7475
let options = raft_options.clone().with_persistence(
@@ -95,6 +96,7 @@ impl HarnessBuilder {
9596
join_all(serving).await;
9697
});
9798
let harness = Harness {
99+
cluster_name: self.cluster_name,
98100
instances,
99101
diagnostics: diag,
100102
failures,
@@ -105,6 +107,7 @@ impl HarnessBuilder {
105107
// Consumes this instance and returns an instance with the failure options set.
106108
pub fn with_failure(self: Self, failure_options: FailureOptions) -> Self {
107109
Self {
110+
cluster_name: self.cluster_name,
108111
bound: self.bound,
109112
failure: failure_options,
110113
options: self.options,
@@ -114,6 +117,7 @@ impl HarnessBuilder {
114117
// Consumes this instance and returns an instance with the raft options set.
115118
pub fn with_options(self: Self, options: Options) -> Self {
116119
Self {
120+
cluster_name: self.cluster_name,
117121
bound: self.bound,
118122
failure: self.failure,
119123
options,
@@ -129,15 +133,19 @@ impl HarnessBuilder {
129133
impl Harness {
130134
// Creates a harness builder. This will immediately bind an incoming port for
131135
// each supplied instance name.
132-
pub async fn builder(names: Vec<String>) -> Result<HarnessBuilder, Box<dyn Error>> {
136+
pub async fn builder(
137+
cluster_name: &str,
138+
server_names: &[&str],
139+
) -> Result<HarnessBuilder, Box<dyn Error>> {
133140
let mut bound = Vec::new();
134-
for name in names {
141+
for &name in server_names {
135142
let listener = TcpListener::bind("[::1]:0").await?;
136143
let port = listener.local_addr()?.port();
137-
let server = server("::1", port as i32, name.as_str());
144+
let server = server("::1", port as i32, name);
138145
bound.push(BoundAddress { listener, server })
139146
}
140147
Ok(HarnessBuilder {
148+
cluster_name: cluster_name.to_string(),
141149
bound,
142150
failure: FailureOptions::no_failures(),
143151

@@ -147,6 +155,10 @@ impl Harness {
147155
})
148156
}
149157

158+
pub fn name(&self) -> &str {
159+
&self.cluster_name
160+
}
161+
150162
// Returns all the addresses managed by this harness. Note that this can include
151163
// Raft members that are not currently part of the cluster.
152164
pub fn addresses(&self) -> Vec<Server> {
@@ -178,13 +190,13 @@ impl Harness {
178190

179191
// Update the cluster membership to contain only the supplied members. The supplied
180192
// members must all be valid servers as defined at harness creation time.
181-
pub async fn update_members(&self, members: Vec<String>) -> Result<(), Status> {
193+
pub async fn update_members(&self, members: Vec<&str>) -> Result<(), Status> {
182194
let client = self.make_raft_client();
183195

184196
let addresses = self.addresses().clone();
185197
let mut new_members: Vec<Server> = Vec::new();
186198
for m in members {
187-
let server = addresses.iter().find(|&a| &a.name == &m);
199+
let server = addresses.iter().find(|&a| &a.name == m);
188200
match server {
189201
None => return Err(Status::invalid_argument(format!("unknown server: {}", &m))),
190202
Some(s) => new_members.push(s.clone()),
@@ -363,7 +375,7 @@ impl Instance {
363375
failure_options,
364376
)
365377
.await
366-
.map_err(|e| format!("Failed to create Raft for '{}': {}", address.name, e))?,
378+
.map_err(|e| format!("Failed to create raft impl for '{}': {}", address.name, e))?,
367379
);
368380
let raft_grpc = RaftServer::from_arc(raft.clone());
369381

src/integration_test.rs

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,11 @@ use std::time::Duration;
66

77
const TIMEOUT: Duration = Duration::from_secs(3);
88
const CLUSTER_NAME: &str = "test-cluster";
9-
10-
fn names() -> Vec<String> {
11-
vec!["A".to_string(), "B".to_string(), "C".to_string()]
12-
}
9+
const NAMES: [&str; 3] = ["A", "B", "C"];
1310

1411
#[tokio::test]
1512
async fn test_start_and_elect_leader() {
16-
let harness = make_harness(names()).await;
17-
13+
let harness = make_harness(&NAMES).await;
1814
harness.wait_for_leader(TIMEOUT, term_greater(0)).await;
1915

2016
harness.validate().await;
@@ -25,7 +21,8 @@ async fn test_start_and_elect_leader() {
2521
async fn test_start_and_elect_leader_many_nodes() {
2622
let n = 17;
2723
let owned: Vec<String> = (1..=n).map(|i| i.to_string()).collect();
28-
let harness = make_harness(owned).await;
24+
let names: Vec<&str> = owned.iter().map(|s| s.as_str()).collect();
25+
let harness = make_harness(&names).await;
2926

3027
harness.wait_for_leader(TIMEOUT, term_greater(0)).await;
3128

@@ -35,7 +32,7 @@ async fn test_start_and_elect_leader_many_nodes() {
3532

3633
#[tokio::test]
3734
async fn test_disconnect_leader() {
38-
let harness = make_harness(names()).await;
35+
let harness = make_harness(&NAMES).await;
3936

4037
// Wait for the initial leader and capture its term and server.
4138
let (term1, leader1) = harness.wait_for_leader(TIMEOUT, term_greater(0)).await;
@@ -65,7 +62,7 @@ async fn test_disconnect_leader() {
6562

6663
#[tokio::test]
6764
async fn test_commit() {
68-
let harness = make_harness(names()).await;
65+
let harness = make_harness(&NAMES).await;
6966
harness.wait_for_leader(TIMEOUT, term_greater(0)).await;
7067
let client = harness.make_raft_client();
7168

@@ -79,42 +76,35 @@ async fn test_commit() {
7976

8077
#[tokio::test]
8178
async fn test_reconfigure_cluster() {
82-
let names = vec![
83-
"A".to_string(),
84-
"B".to_string(),
85-
"C".to_string(),
86-
"D".to_string(),
87-
"E".to_string(),
88-
];
89-
let harness = make_harness(names.clone()).await;
79+
let names = vec!["A", "B", "C", "D", "E"];
80+
let harness = make_harness(&names).await;
9081

9182
let (t1, leader1) = harness.wait_for_leader(TIMEOUT, term_greater(0)).await;
9283

93-
let without_leader: Vec<String> = names
94-
.clone()
84+
let without_leader: Vec<&str> = names
9585
.iter()
96-
.filter(|&s| *s != leader1.name)
97-
.map(|s| s.clone())
86+
.copied()
87+
.filter(|s| *s != leader1.name)
9888
.collect();
9989
assert_eq!(without_leader.len(), 4);
10090

10191
// Change cluster to contain only 3 members, and not including the current leader.
102-
let new_members: Vec<String> = without_leader.iter().take(3).map(|s| s.clone()).collect();
92+
let new_members: Vec<&str> = without_leader.iter().copied().take(3).collect();
10393
let result = harness.update_members(new_members.clone()).await;
10494
assert!(result.is_ok());
10595

10696
// Wait for a new leader and verify.
10797
let (_, leader2) = harness.wait_for_leader(TIMEOUT, term_greater(t1)).await;
10898
assert_ne!(&leader2.name, &leader1.name);
109-
assert!(new_members.contains(&leader2.name));
99+
assert!(new_members.contains(&leader2.name.as_str()));
110100

111101
harness.validate().await;
112102
harness.stop().await;
113103
}
114104

115105
#[tokio::test]
116106
async fn test_keyvalue() {
117-
let harness = make_harness(names()).await;
107+
let harness = make_harness(&NAMES).await;
118108
let mut kv = harness.make_kv_client().await;
119109

120110
let k1 = "k1".as_bytes().to_vec();
@@ -137,7 +127,7 @@ async fn test_keyvalue() {
137127
async fn test_snapshotting() {
138128
let raft_options =
139129
Options::new_without_persistence_for_testing().with_compaction(5 * 1024 * 1024, 1000);
140-
let harness = make_harness_with_options(names(), Some(raft_options)).await;
130+
let harness = make_harness_with_options(&NAMES, Some(raft_options)).await;
141131

142132
// Disconnect a node that will later have to catch up.
143133
harness.failures().lock().await.disconnect("B");
@@ -175,22 +165,21 @@ fn term_greater(n: i64) -> Box<dyn Fn(&(i64, Server)) -> bool> {
175165
Box::new(move |(term, _)| *term > n)
176166
}
177167

178-
async fn make_harness(nodes: Vec<String>) -> Harness {
168+
async fn make_harness(nodes: &[&str]) -> Harness {
179169
make_harness_with_options(nodes, None).await
180170
}
181171

182-
async fn make_harness_with_options(nodes: Vec<String>, options: Option<Options>) -> Harness {
183-
let mut builder = Harness::builder(nodes).await.expect("builder");
172+
async fn make_harness_with_options(nodes: &[&str], options: Option<Options>) -> Harness {
173+
let mut builder = Harness::builder(CLUSTER_NAME, nodes)
174+
.await
175+
.expect("builder");
184176

185177
if let Some(opts) = options {
186178
builder = builder.with_options(opts)
187179
}
188180

189181
let wipe_persistence = true;
190-
let (harness, serving) = builder
191-
.build(CLUSTER_NAME, wipe_persistence)
192-
.await
193-
.expect("harness");
182+
let (harness, serving) = builder.build(wipe_persistence).await.expect("harness");
194183
harness.start().await;
195184
tokio::spawn(async { serving.await });
196185
harness

src/main.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -226,16 +226,6 @@ async fn run_reconfigure_loop(
226226
info!("Finished")
227227
}
228228

229-
fn names() -> Vec<String> {
230-
vec![
231-
"A".to_string(),
232-
"B".to_string(),
233-
"C".to_string(),
234-
"D".to_string(),
235-
"E".to_string(),
236-
]
237-
}
238-
239229
#[tokio::main]
240230
async fn main() -> Result<(), Box<dyn Error>> {
241231
// This allows configuring the filters using the RUST_LOG env variable.
@@ -249,14 +239,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
249239
.init();
250240
let arguments = Arc::new(Arguments::from_args());
251241

252-
let (harness, serving) = Harness::builder(names())
242+
let (harness, serving) = Harness::builder(CLUSTER_NAME, &["A", "B", "C", "D", "E"])
253243
.await
254244
.expect("builder")
255245
.with_failure(make_default_failure_options())
256-
.build(CLUSTER_NAME, arguments.wipe_persistence)
246+
.build(arguments.wipe_persistence)
257247
.await
258248
.expect("harness");
259249

250+
info!("Created cluster '{}'", harness.name());
251+
260252
let addresses = harness.addresses();
261253
let diagnostics = harness.diagnostics();
262254

src/raft/consensus.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ impl RaftImpl {
132132
options: Options,
133133
failures: Arc<Mutex<FailureOptions>>,
134134
) -> RaftResult<RaftImpl> {
135-
let snapshot_bytes = state_machine.lock().await.create_snapshot();
136-
137135
// A config which gives us (once applied) all the initial members as voters.
138136
let initial_cluster_config = ClusterConfig {
139137
voters: all.clone().to_vec(),

src/raft/store.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ use crate::raft::diagnostics::ServerDiagnostics;
22
use crate::raft::error::RaftError::Initialization;
33
use crate::raft::error::RaftResult;
44
use crate::raft::log::LogSlice;
5-
use crate::raft::persistence::{
6-
Persistence, PersistenceError, PersistenceOptions, PersistentState,
7-
};
5+
use crate::raft::persistence::{Persistence, PersistenceOptions, PersistentState};
86
use crate::raft::raft_common_proto::entry::Data;
97
use crate::raft::raft_common_proto::entry::Data::Config;
108
use crate::raft::raft_common_proto::{ClusterConfig, Entry, EntryId, Server};
@@ -14,10 +12,7 @@ use bytes::Bytes;
1412
use futures::channel::oneshot::{Receiver, Sender, channel};
1513
use std::cmp::Ordering;
1614
use std::collections::BTreeSet;
17-
use std::error::Error;
18-
use std::fmt::{Display, Formatter, Write};
1915
use std::hash::{Hash, Hasher};
20-
use thiserror::Error;
2116
use tonic::{Code, Status};
2217
use tracing::{debug, info, warn};
2318

0 commit comments

Comments
 (0)