diff --git a/examples/grpc/Cargo.toml b/examples/grpc/Cargo.toml index 228627f9..0cba716a 100644 --- a/examples/grpc/Cargo.toml +++ b/examples/grpc/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "grpc" +name = "grpc-example" version = "0.1.0" edition = "2021" publish = false diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index fd9ca4f8..f40f7b4a 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -1,30 +1,24 @@ -use bytes::{Buf, Bytes}; +use std::fmt::Debug; use std::future::poll_fn; -use std::{ - fmt::Debug, - io::{self, Error, Result}, - net::SocketAddr, - pin::Pin, - sync::Arc, - task::{ready, Context, Poll}, -}; -use tokio::{ - io::{AsyncRead, AsyncWrite, ReadBuf}, - runtime::Handle, - sync::{mpsc, oneshot}, - time::sleep, -}; - -use crate::{ - envelope::{Envelope, Protocol, Segment, Syn}, - host::is_same, - host::SequencedSegment, - net::SocketPair, - world::World, - ToSocketAddrs, TRACING_TARGET, -}; +use std::io::{self, Error, Result}; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{ready, Context, Poll}; + +use bytes::{Buf, Bytes}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::runtime::Handle; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::sleep; use super::split_owned::{OwnedReadHalf, OwnedWriteHalf}; +use crate::envelope::{Envelope, Protocol, Segment, Syn}; +use crate::host::is_same; +use crate::host::SequencedSegment; +use crate::net::SocketPair; +use crate::world::World; +use crate::{ToSocketAddrs, TRACING_TARGET}; /// A simulated TCP stream between a local and a remote socket. /// diff --git a/src/sim.rs b/src/sim.rs index af184538..60b037e1 100644 --- a/src/sim.rs +++ b/src/sim.rs @@ -1,4 +1,3 @@ -use rand::seq::SliceRandom; use std::cell::RefCell; use std::future::Future; use std::net::IpAddr; @@ -7,6 +6,7 @@ use std::sync::Arc; use std::time::UNIX_EPOCH; use indexmap::IndexMap; +use rand::seq::SliceRandom; use tokio::time::Duration; use tracing::Level; @@ -442,7 +442,7 @@ impl<'a> Sim<'a> { #[cfg(test)] mod test { use rand::Rng; - use std::future; + use std::{future, io}; use std::{ net::{IpAddr, Ipv4Addr}, rc::Rc, @@ -685,7 +685,9 @@ mod test { sim.client("client", async move { // Peers are partitioned. TCP setup should fail. - let _ = TcpStream::connect("server:1234").await.unwrap_err(); + let err = TcpStream::connect("server:1234").await.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) }); @@ -733,10 +735,9 @@ mod test { async move { let udp_socket = UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?; - udp_socket - .send_to(&[42], format!("{}:1234", host_b)) - .await - .expect("sending packet should appear to work, even if partitioned"); + // If hosts are partitioned, this will return an unreachable + // error. + let _ = udp_socket.send_to(&[42], format!("{}:1234", host_b)).await; *a_did_receive.lock().unwrap() = Some(matches!( tokio::time::timeout( @@ -758,10 +759,9 @@ mod test { async move { let udp_socket = UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?; - udp_socket - .send_to(&[42], format!("{}:1234", host_a)) - .await - .expect("sending packet should work"); + // If hosts are partitioned, this will return an unreachable + // error. + let _ = udp_socket.send_to(&[42], format!("{}:1234", host_a)).await; *b_did_receive.lock().unwrap() = Some(matches!( tokio::time::timeout( diff --git a/src/top.rs b/src/top.rs index 40dbb023..98574a67 100644 --- a/src/top.rs +++ b/src/top.rs @@ -1,17 +1,18 @@ -use crate::envelope::{Envelope, Protocol}; -use crate::host::Host; -use crate::rt::Rt; -use crate::{config, TRACING_TARGET}; - -use indexmap::IndexMap; -use rand::{Rng, RngCore}; -use rand_distr::{Distribution, Exp}; use std::collections::VecDeque; use std::io::{Error, ErrorKind, Result}; use std::net::{IpAddr, SocketAddr}; use std::time::Duration; + +use indexmap::IndexMap; +use rand::{Rng, RngCore}; +use rand_distr::{Distribution, Exp}; use tokio::time::Instant; +use crate::envelope::{Envelope, Protocol}; +use crate::host::Host; +use crate::rt::Rt; +use crate::{config, TRACING_TARGET}; + /// Describes the network topology. pub(crate) struct Topology { config: config::Link, @@ -184,7 +185,10 @@ impl Topology { /// Register a link between two hosts pub(crate) fn register(&mut self, a: IpAddr, b: IpAddr) { let pair = Pair::new(a, b); - assert!(self.links.insert(pair, Link::new(self.rt.now())).is_none()); + assert!(self + .links + .insert(pair, Link::new(self.rt.now(), self.config.clone())) + .is_none()); } pub(crate) fn set_max_message_latency(&mut self, value: Duration) { @@ -227,14 +231,19 @@ impl Topology { dst: SocketAddr, message: Protocol, ) -> Result<()> { - if let Some(link) = self.links.get_mut(&Pair::new(src.ip(), dst.ip())) { + if let Some(link) = self + .links + .get_mut(&Pair::new(src.ip(), dst.ip())) + // Treat partitions as unlinked hosts + .and_then(|l| match l.get_state_for_message(src.ip(), dst.ip()) { + State::Healthy | State::Hold => Some(l), + State::ExplicitPartition | State::RandPartition => None, + }) + { link.enqueue_message(&self.config, rand, src, dst, message); Ok(()) } else { - Err(Error::new( - ErrorKind::ConnectionRefused, - "Connection refused", - )) + Err(Error::new(ErrorKind::HostUnreachable, "host unreachable")) } } @@ -307,11 +316,11 @@ enum DeliveryStatus { } impl Link { - fn new(now: Instant) -> Link { + fn new(now: Instant, config: config::Link) -> Link { Link { state_a_b: State::Healthy, state_b_a: State::Healthy, - config: config::Link::default(), + config, sent: VecDeque::new(), deliverable: IndexMap::new(), now, @@ -442,13 +451,14 @@ impl Link { } // Randomly break or repair this link. + // + // FIXME: pull this operation up to the topology level. A link shouldn't be + // responsible for determining its own random failure. fn rand_partition_or_repair(&mut self, global_config: &config::Link, rand: &mut dyn RngCore) { - let do_rand = self.rand_partition(global_config.message_loss(), rand); match (self.state_a_b, self.state_b_a) { (State::Healthy, _) | (_, State::Healthy) => { - if do_rand { - self.state_a_b = State::RandPartition; - self.state_b_a = State::RandPartition; + if self.rand_fail(global_config.message_loss(), rand) { + self.rand_partition(); } } (State::RandPartition, _) | (_, State::RandPartition) => { @@ -460,6 +470,20 @@ impl Link { } } + /// With some chance `config.fail_rate`, randomly partition this link. + fn rand_fail(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool { + let config = self.config.message_loss.as_ref().unwrap_or(global); + let fail_rate = config.fail_rate; + fail_rate > 0.0 && rand.gen_bool(fail_rate) + } + + /// With some chance `config.repair_rate`, randomly repair this link. + fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool { + let config = self.config.message_loss.as_ref().unwrap_or(global); + let repair_rate = config.repair_rate; + repair_rate > 0.0 && rand.gen_bool(repair_rate) + } + fn hold(&mut self) { self.state_a_b = State::Hold; self.state_b_a = State::Hold; @@ -476,6 +500,11 @@ impl Link { } } + fn rand_partition(&mut self) { + self.state_a_b = State::RandPartition; + self.state_b_a = State::RandPartition; + } + fn explicit_partition(&mut self) { self.state_a_b = State::ExplicitPartition; self.state_b_a = State::ExplicitPartition; @@ -503,19 +532,6 @@ impl Link { self.state_b_a = State::Healthy; } - /// Should the link be randomly partitioned - fn rand_partition(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool { - let config = self.config.message_loss.as_ref().unwrap_or(global); - let fail_rate = config.fail_rate; - fail_rate > 0.0 && rand.gen_bool(fail_rate) - } - - fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool { - let config = self.config.message_loss.as_ref().unwrap_or(global); - let repair_rate = config.repair_rate; - repair_rate > 0.0 && rand.gen_bool(repair_rate) - } - fn delay(&self, global: &config::Latency, rand: &mut dyn RngCore) -> Duration { let config = self.config.latency.as_ref().unwrap_or(global); diff --git a/tests/tcp.rs b/tests/tcp.rs index 8ce5aa60..f34907e3 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -1,22 +1,16 @@ -use std::{ - assert_eq, assert_ne, - io::{self, ErrorKind}, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, - rc::Rc, - time::Duration, -}; - use std::future; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - sync::{oneshot, Notify}, - time::timeout, -}; -use turmoil::{ - lookup, - net::{TcpListener, TcpStream}, - Builder, IpVersion, Result, -}; +use std::io; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::rc::Rc; +use std::time::Duration; +use std::{assert_eq, assert_ne}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::{oneshot, Notify}; +use tokio::time::timeout; +use turmoil::lookup; +use turmoil::net::{TcpListener, TcpStream}; +use turmoil::{Builder, IpVersion, Result}; const PORT: u16 = 1738; @@ -50,10 +44,9 @@ fn network_partitions_during_connect() -> Result { sim.client("client", async { turmoil::partition("client", "server"); - assert_error_kind( - TcpStream::connect(("server", PORT)).await, - io::ErrorKind::ConnectionRefused, - ); + let err = TcpStream::connect(("server", PORT)).await.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); turmoil::repair("client", "server"); turmoil::hold("client", "server"); @@ -226,7 +219,9 @@ fn network_partition_once_connected() -> Result { assert!(timeout(Duration::from_secs(1), s.read_u8()).await.is_err()); - s.write_u8(1).await?; + let err = s.write_u8(1).await.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) }); @@ -236,9 +231,9 @@ fn network_partition_once_connected() -> Result { turmoil::partition("server", "client"); - s.write_u8(1).await?; - - assert!(timeout(Duration::from_secs(1), s.read_u8()).await.is_err()); + let err = s.write_u8(1).await.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) }); @@ -1273,8 +1268,8 @@ fn socket_to_nonexistent_node() -> Result { ); let err = sock.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::ConnectionRefused); - assert_eq!(err.to_string(), "Connection refused"); + assert_eq!(err.kind(), io::ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) }); diff --git a/tests/udp.rs b/tests/udp.rs index b4f5925b..9f2083a2 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -1,17 +1,13 @@ -use std::{ - io::{self, ErrorKind}, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, - rc::Rc, - sync::{atomic::AtomicUsize, atomic::Ordering}, - time::Duration, -}; +use std::io::{self, ErrorKind}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::rc::Rc; +use std::sync::{atomic::AtomicUsize, atomic::Ordering}; +use std::time::Duration; use tokio::{sync::oneshot, time::timeout}; -use turmoil::{ - lookup, - net::{self, UdpSocket}, - Builder, IpVersion, Result, -}; +use turmoil::lookup; +use turmoil::net::{self, UdpSocket}; +use turmoil::{Builder, IpVersion, Result}; const PORT: u16 = 1738; @@ -31,10 +27,8 @@ async fn bind_to_v6(port: u16) -> std::result::Result Result<()> { - sock.send_to(b"ping", (lookup("server"), 1738)).await?; - - Ok(()) +async fn send_ping(sock: &net::UdpSocket) -> io::Result { + sock.send_to(b"ping", (lookup("server"), 1738)).await } fn try_send_ping(sock: &net::UdpSocket) -> Result<()> { @@ -240,11 +234,9 @@ fn network_partition() -> Result { turmoil::partition("client", "server"); let sock = bind().await?; - send_ping(&sock).await?; - - assert!(timeout(Duration::from_secs(1), recv_pong(&sock)) - .await - .is_err()); + let err = send_ping(&sock).await.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) }); @@ -569,8 +561,9 @@ fn loopback_localhost_public_v4() -> Result { let bind_addr = SocketAddr::new(bind_addr.ip(), 0); let socket = UdpSocket::bind(bind_addr).await?; - let res = socket.send_to(&expected, connect_addr).await; - assert_error_kind(res, io::ErrorKind::ConnectionRefused); + let err = socket.send_to(&expected, connect_addr).await.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) }); @@ -620,8 +613,9 @@ fn loopback_localhost_public_v6() -> Result { let bind_addr = SocketAddr::new(bind_addr.ip(), 0); let socket = UdpSocket::bind(bind_addr).await?; - let res = socket.send_to(&expected, connect_addr).await; - assert_error_kind(res, io::ErrorKind::ConnectionRefused); + let err = socket.send_to(&expected, connect_addr).await.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) }); @@ -729,8 +723,8 @@ fn socket_to_nonexistent_node() -> Result { ); let err = send.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::ConnectionRefused); - assert_eq!(err.to_string(), "Connection refused"); + assert_eq!(err.kind(), ErrorKind::HostUnreachable); + assert_eq!(err.to_string(), "host unreachable"); Ok(()) });