Skip to content

Commit 8429c92

Browse files
committed
Add a basic cluster simulation example
1 parent ed131c6 commit 8429c92

File tree

3 files changed

+341
-0
lines changed

3 files changed

+341
-0
lines changed

examples/cluster/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "cluster"
3+
version = "0.1.0"
4+
edition = "2021"
5+
publish = false
6+
7+
[dependencies]
8+
turmoil = { path = "../.." }
9+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
10+
tracing = "0.1"
11+
tokio = "1"
12+
rand = "0.8"
13+
getrandom = "0.3"

examples/cluster/src/lib.rs

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
use std::{
2+
cell::{RefCell, RefMut},
3+
collections::VecDeque,
4+
error::Error,
5+
future::Future,
6+
rc::Rc,
7+
time::Duration,
8+
};
9+
10+
use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
11+
use tokio::{
12+
io::{AsyncReadExt, AsyncWriteExt},
13+
task::JoinSet,
14+
};
15+
use turmoil::{
16+
net::{TcpListener, TcpStream},
17+
Sim,
18+
};
19+
20+
#[derive(Clone)]
21+
pub struct Cluster {
22+
shared: Rc<RefCell<State>>,
23+
rng: Rc<RefCell<SmallRng>>,
24+
}
25+
26+
pub struct State {
27+
machines: Vec<Machine>,
28+
events: VecDeque<Event>,
29+
}
30+
31+
pub enum Event {
32+
Crash(String),
33+
Spawn,
34+
}
35+
36+
#[derive(Clone, Debug)]
37+
pub struct Machine {
38+
pub name: String,
39+
pub failed: bool,
40+
}
41+
42+
impl Cluster {
43+
pub fn new(servers: usize, seed: Option<u64>) -> Self {
44+
let machines = (0..servers)
45+
.map(|i| format!("server_{}", i))
46+
.map(|name| Machine {
47+
name,
48+
failed: false,
49+
})
50+
.collect();
51+
52+
let seed = seed.unwrap_or_else(|| getrandom::u64().unwrap());
53+
54+
tracing::info!("starting turmoil sim with seed: {}", seed);
55+
56+
let rng = Rc::new(RefCell::new(SmallRng::seed_from_u64(seed)));
57+
58+
let state = State {
59+
machines,
60+
events: VecDeque::new(),
61+
};
62+
63+
Cluster {
64+
shared: Rc::new(RefCell::new(state)),
65+
rng,
66+
}
67+
}
68+
69+
pub fn setup(&self, sim: &mut Sim<'_>) {
70+
let me = self.shared.borrow();
71+
72+
for machine in me.machines.iter() {
73+
let machine = machine.clone();
74+
let name = machine.name.clone();
75+
sim.host(name.as_str(), move || async move {
76+
if let Err(e) = server("0.0.0.0:8080".into()).await {
77+
tracing::error!("server error: {:?}", e);
78+
}
79+
80+
Ok(())
81+
});
82+
}
83+
}
84+
85+
pub fn get_machines(&self) -> Vec<Machine> {
86+
self.shared.borrow().machines.clone()
87+
}
88+
89+
pub fn crash(&self, machine: String) {
90+
self.shared
91+
.borrow_mut()
92+
.events
93+
.push_back(Event::Crash(machine));
94+
}
95+
96+
pub fn rng(&self) -> RefMut<'_, SmallRng> {
97+
self.rng.borrow_mut()
98+
}
99+
100+
pub fn run(&mut self, sim: &mut Sim<'_>) -> Result<(), Box<dyn Error>> {
101+
loop {
102+
for event in self.shared.borrow_mut().events.drain(..) {
103+
match event {
104+
Event::Crash(machine) => sim.crash(machine),
105+
Event::Spawn => todo!(),
106+
}
107+
}
108+
109+
if sim.step()? {
110+
return Ok(());
111+
}
112+
}
113+
}
114+
}
115+
116+
pub fn echo_client_cycle(
117+
cluster: Cluster,
118+
clients: usize,
119+
duration: Duration,
120+
) -> impl Future<Output = Result<(), Box<dyn Error>>> {
121+
async move {
122+
let mut join = JoinSet::new();
123+
124+
for _client in 0..clients {
125+
let cluster = cluster.clone();
126+
127+
join.spawn_local(async move {
128+
let mut client = Client::new(cluster);
129+
130+
while turmoil::elapsed() < duration {
131+
if let Err(e) = client.request().await {
132+
client.reset();
133+
tracing::debug!("request error: {:?}", e)
134+
}
135+
}
136+
});
137+
}
138+
139+
let _ = join.join_all().await;
140+
141+
Ok(())
142+
}
143+
}
144+
145+
pub fn machine_attrition(
146+
cluster: Cluster,
147+
duration: Duration,
148+
machines_to_kill: usize,
149+
machines_to_leave: usize,
150+
) -> impl Future<Output = Result<(), Box<dyn Error>>> {
151+
async move {
152+
tracing::info!("starting attrition workload");
153+
let mut killed_machines = 0;
154+
155+
let mean_delay = duration / machines_to_kill as u32;
156+
157+
loop {
158+
if turmoil::elapsed() > duration {
159+
break;
160+
}
161+
162+
// let mut jitter = (Duration::from_secs(0)..mean_delay).gen_range(&mut rng);
163+
tokio::time::sleep(mean_delay).await;
164+
165+
let machines = cluster.get_machines();
166+
167+
let alive_machines = machines.iter().filter(|m| !m.failed).collect::<Vec<_>>();
168+
169+
if killed_machines < machines_to_kill && alive_machines.len() > machines_to_leave {
170+
// Kill random machine
171+
let machine_to_kill = alive_machines.choose(&mut *cluster.rng()).unwrap();
172+
173+
tracing::info!("assassinating: {}", machine_to_kill.name);
174+
175+
cluster.crash(machine_to_kill.name.clone());
176+
killed_machines += 1;
177+
}
178+
}
179+
180+
Ok(())
181+
}
182+
}
183+
184+
async fn server(addr: String) -> Result<(), Box<dyn Error>> {
185+
// Next up we create a TCP listener which will listen for incoming
186+
// connections. This TCP listener is bound to the address we determined
187+
// above and must be associated with an event loop.
188+
let listener = TcpListener::bind(&addr).await?;
189+
tracing::debug!("Listening on: {addr}");
190+
191+
loop {
192+
// Asynchronously wait for an inbound socket.
193+
let (mut socket, _) = listener.accept().await?;
194+
195+
tokio::spawn(async move {
196+
let mut buf = vec![0; 1024];
197+
198+
// In a loop, read data from the socket and write the data back.
199+
loop {
200+
let Ok(n) = socket.read(&mut buf).await else {
201+
tracing::error!("failed to read data from socket");
202+
return;
203+
};
204+
205+
if n == 0 {
206+
return;
207+
}
208+
209+
let Ok(_) = socket.write_all(&buf[0..n]).await else {
210+
tracing::error!("failed to write data to socket");
211+
return;
212+
};
213+
}
214+
});
215+
}
216+
}
217+
218+
struct Client {
219+
cluster: Cluster,
220+
conn: Option<TcpStream>,
221+
}
222+
223+
impl Client {
224+
fn new(cluster: Cluster) -> Self {
225+
Self {
226+
cluster,
227+
conn: None,
228+
}
229+
}
230+
231+
async fn connect(&mut self) -> Result<(), Box<dyn Error>> {
232+
let mut attempt = 0;
233+
let init_backoff = 50;
234+
235+
let machines = self.cluster.get_machines();
236+
237+
loop {
238+
let machine = machines.choose(&mut *self.cluster.rng()).unwrap();
239+
240+
tracing::debug!("connecting to {}", machine.name);
241+
242+
let addr = format!("{}:8080", machine.name);
243+
244+
match tokio::time::timeout(Duration::from_secs(1), TcpStream::connect(&addr)).await {
245+
Ok(Ok(stream)) => {
246+
self.conn = Some(stream);
247+
return Ok(());
248+
}
249+
Ok(Err(e)) => {
250+
tracing::debug!("connect error: {:?}", e);
251+
}
252+
Err(_) => {
253+
tracing::debug!("connect timeout");
254+
}
255+
};
256+
257+
attempt += 1;
258+
259+
let backoff = std::cmp::max(init_backoff * 2u64.pow(attempt), 3600);
260+
let jitter = self.cluster.rng().gen_range(0..backoff);
261+
262+
tokio::time::sleep(Duration::from_millis(jitter)).await;
263+
}
264+
}
265+
266+
async fn request(&mut self) -> Result<(), Box<dyn Error>> {
267+
if self.conn.is_none() {
268+
self.connect()
269+
.await
270+
.map_err(|e| format!("connect: {}", e))?;
271+
}
272+
273+
let stream = self.conn.as_mut().unwrap();
274+
275+
let msg = b"hello world";
276+
stream
277+
.write_all(msg)
278+
.await
279+
.map_err(|e| format!("write: {}", e))?;
280+
281+
let mut buf = vec![0; msg.len()];
282+
let _n = stream
283+
.read_exact(&mut buf)
284+
.await
285+
.map_err(|e| format!("read: {}", e))?;
286+
assert_eq!(buf, msg);
287+
288+
Ok(())
289+
}
290+
291+
fn reset(&mut self) {
292+
self.conn = None;
293+
}
294+
}

examples/cluster/tests/cluster.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use std::time::Duration;
2+
3+
use cluster::{echo_client_cycle, machine_attrition, Cluster};
4+
use tracing_subscriber::EnvFilter;
5+
6+
#[test]
7+
fn retries() {
8+
let _ = tracing_subscriber::fmt()
9+
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::from("info")))
10+
.with_test_writer()
11+
.try_init();
12+
13+
let test_duration = Duration::from_secs(10);
14+
15+
let mut cluster = Cluster::new(50, None);
16+
17+
let mut sim = turmoil::Builder::new()
18+
.simulation_duration(Duration::from_secs(360))
19+
.build();
20+
21+
cluster.setup(&mut sim);
22+
23+
sim.client(
24+
"echo_client_cycle",
25+
echo_client_cycle(cluster.clone(), 50, test_duration),
26+
);
27+
28+
sim.client(
29+
"attrition",
30+
machine_attrition(cluster.clone(), test_duration, 40, 10),
31+
);
32+
33+
cluster.run(&mut sim).unwrap();
34+
}

0 commit comments

Comments
 (0)