|
| 1 | +//! Barriers allow tests to granularly observe and control the execution of |
| 2 | +//! source code by injecting observability and control hooks into source code. |
| 3 | +//! |
| 4 | +//! Barriers allow construction of complex tests which otherwise may rely on |
| 5 | +//! timing conditions (which are difficult to write, flaky, and hard to |
| 6 | +//! maintain) or monitoring and control of the network layer. |
| 7 | +//! |
| 8 | +//! Barriers are designed for Turmoil simulation tests. They allow test code |
| 9 | +//! to step the simulation until a barrier is triggered, and optionally suspend |
| 10 | +//! source code execution until the test is ready to proceed. |
| 11 | +//! |
| 12 | +//! # Architecture |
| 13 | +//! |
| 14 | +//! ```text |
| 15 | +//! ┌──────────────┐ ┌─────────────────────┐ |
| 16 | +//! │ Source Code │ │ Test Code │ |
| 17 | +//! │ │ │ │ |
| 18 | +//! │ ┌─────────┐ │ ┌──────────────────┐ │ │ |
| 19 | +//! │ │ Trigger ┼─┼─────►│ Barrier Repo │◄───┼── Barrier::build() │ |
| 20 | +//! │ └─────────┘ │ │ (Thread Local) │ │ │ |
| 21 | +//! │ ┌─────────┐ │ ┌───┼ ├────┼─► Barrier::wait() │ |
| 22 | +//! │ │ Resumer │◄┼──┘ └──────────────────┘ │ │ |
| 23 | +//! │ └─────────┘ │ │ │ |
| 24 | +//! │ │ │ │ |
| 25 | +//! └──────────────┘ └─────────────────────┘ |
| 26 | +//! ``` |
| 27 | +//! |
| 28 | +//! A barrier consists of two halves: a `Trigger` which defines the condition |
| 29 | +//! a barrier is waiting for, and a `Resumer` which controls when the source |
| 30 | +//! code in a barrier is released. Interesting points of source code may be |
| 31 | +//! annotated with triggers; these triggers will no-op if test code is not |
| 32 | +//! interested and are conditionally compiled out of non-test code. |
| 33 | +//! |
| 34 | +//! When test code creates a barrier, the condition and resumer is registered |
| 35 | +//! in the barrier repo. Most barriers are 'observe-only' and do not control |
| 36 | +//! execution (typically test code is simply driving simulation forward until |
| 37 | +//! a Barrier is triggered). However, test code may cause a future hitting a |
| 38 | +//! barrier to suspend until the test code resumes it. It can also cause the |
| 39 | +//! code to panic, if testing how panics are handled is desired. |
| 40 | +//! |
| 41 | +//! Triggers are type-safe Rust structs. Source code may define triggers as any |
| 42 | +//! type desired. Barrier conditions are defined as closures that match against |
| 43 | +//! a trigger. Reactions are built as an enum of well-defined actions; arbitrary |
| 44 | +//! reaction code is not allowed to curtail insane usage. |
| 45 | +//! |
| 46 | +//! Source code can use either [`trigger()`] (async, supports suspension) or |
| 47 | +//! [`trigger_noop()`] (sync, only for observation) depending on whether the |
| 48 | +//! execution flow needs to be potentially suspended by test code. |
| 49 | +//! |
| 50 | +//! Note: Each trigger event wakes at most one barrier and processes in order |
| 51 | +//! of registration. Avoid registering multiple barriers for the same triggers |
| 52 | +//! to avoid confusion. |
| 53 | +//! |
| 54 | +//! # Example |
| 55 | +//! |
| 56 | +//! ```ignore |
| 57 | +//! // In source code (conditionally compiled for simulation) |
| 58 | +//! async fn handle_prepare_ack(prepare_ack: PrepareAck) { |
| 59 | +//! // Processing... |
| 60 | +//! |
| 61 | +//! #[cfg(feature = "turmoil-barriers")] |
| 62 | +//! turmoil::barriers::trigger( |
| 63 | +//! MyBarriers::PrepareAckReceived(prepare_ack.tx_id) |
| 64 | +//! ).await; |
| 65 | +//! |
| 66 | +//! // Continue processing |
| 67 | +//! } |
| 68 | +//! |
| 69 | +//! // In test code: |
| 70 | +//! #[test] |
| 71 | +//! fn test_prepare_ack_handling() { |
| 72 | +//! let mut sim = turmoil::Builder::new().build(); |
| 73 | +//! |
| 74 | +//! // Register a barrier which will suspend when condition matches |
| 75 | +//! let mut barrier = Barrier::build( |
| 76 | +//! Reaction::Suspend, |
| 77 | +//! move |t: &MyBarriers| { |
| 78 | +//! matches!(t, MyBarriers::PrepareAckReceived(id) if *id == expected_tx_id) |
| 79 | +//! } |
| 80 | +//! ); |
| 81 | +//! |
| 82 | +//! sim.client("test", async move { |
| 83 | +//! // Trigger the function being tested |
| 84 | +//! handle_prepare_ack(PrepareAck { tx_id: expected_tx_id }).await; |
| 85 | +//! Ok(()) |
| 86 | +//! }); |
| 87 | +//! |
| 88 | +//! // Step simulation until barrier is triggered |
| 89 | +//! // Source code is now suspended at the trigger point |
| 90 | +//! let triggered = barrier.step_until_triggered(&mut sim).unwrap(); |
| 91 | +//! |
| 92 | +//! // When ready to continue source execution, drop triggered |
| 93 | +//! drop(triggered); |
| 94 | +//! |
| 95 | +//! sim.run().unwrap(); |
| 96 | +//! } |
| 97 | +//! ``` |
| 98 | +
|
| 99 | +use std::{any::Any, cell::RefCell, marker::PhantomData, ops::Deref}; |
| 100 | + |
| 101 | +use tokio::sync::{ |
| 102 | + mpsc::{self, UnboundedReceiver, UnboundedSender}, |
| 103 | + oneshot, |
| 104 | +}; |
| 105 | +use uuid::Uuid; |
| 106 | + |
| 107 | +thread_local! { |
| 108 | + static BARRIERS: BarrierRepo = BarrierRepo::new(); |
| 109 | +} |
| 110 | + |
| 111 | +struct BarrierRepo { |
| 112 | + barriers: RefCell<Vec<BarrierState>>, |
| 113 | +} |
| 114 | + |
| 115 | +impl BarrierRepo { |
| 116 | + fn new() -> Self { |
| 117 | + Self { |
| 118 | + barriers: RefCell::new(vec![]), |
| 119 | + } |
| 120 | + } |
| 121 | + |
| 122 | + fn insert(&self, barrier: BarrierState) { |
| 123 | + self.barriers.borrow_mut().push(barrier); |
| 124 | + } |
| 125 | + |
| 126 | + fn drop(&self, id: Uuid) { |
| 127 | + self.barriers.borrow_mut().retain(|t| t.id != id); |
| 128 | + } |
| 129 | + |
| 130 | + fn barrier<T: Any + Send>(&self, t: &T) -> Option<(Reaction, UnboundedSender<Waker>)> { |
| 131 | + let guard = self.barriers.borrow(); |
| 132 | + for barrier in guard.iter() { |
| 133 | + if (barrier.condition)(t) { |
| 134 | + return Some((barrier.reaction.clone(), barrier.to_test.clone())); |
| 135 | + } |
| 136 | + } |
| 137 | + None |
| 138 | + } |
| 139 | +} |
| 140 | + |
| 141 | +/// Trigger a barrier (if any registered) with the given value. |
| 142 | +/// |
| 143 | +/// Use this function when you need to give test code the ability to suspend |
| 144 | +/// source execution at trigger points. Supports both observation ([`Reaction::Noop`]) |
| 145 | +/// and suspension ([`Reaction::Suspend`]) of execution flow. |
| 146 | +/// |
| 147 | +/// If you only need to notify without suspension capability, use [`trigger_noop()`] |
| 148 | +/// instead. |
| 149 | +/// |
| 150 | +/// This function is a no-op if no barrier is registered for the given trigger type. |
| 151 | +pub async fn trigger<T: Any + Send>(t: T) { |
| 152 | + let Some((reaction, to_test)) = BARRIERS.with(|barriers| barriers.barrier(&t)) else { |
| 153 | + return; |
| 154 | + }; |
| 155 | + |
| 156 | + let (tx, rx) = oneshot::channel(); |
| 157 | + let waker = match reaction { |
| 158 | + Reaction::Noop => { |
| 159 | + tx.send(()).expect("Receiver is owned"); |
| 160 | + None |
| 161 | + } |
| 162 | + Reaction::Suspend => Some(tx), |
| 163 | + Reaction::Panic => panic!("Injected panic from barrier"), |
| 164 | + }; |
| 165 | + |
| 166 | + let _ = to_test.send((Box::new(t), waker)); |
| 167 | + let _ = rx.await; |
| 168 | +} |
| 169 | + |
| 170 | +/// Synchronously trigger a barrier with the given value. |
| 171 | +/// |
| 172 | +/// Use this function when you need to notify barriers about events without |
| 173 | +/// suspending execution. Only supports [`Reaction::Noop`] reactions and will |
| 174 | +/// panic if used with a barrier configured with [`Reaction::Suspend`]. |
| 175 | +/// |
| 176 | +/// For suspension capability, use [`trigger()`] instead. |
| 177 | +/// |
| 178 | +/// This function is a no-op if no barrier is registered for the given trigger type. |
| 179 | +pub fn trigger_noop<T: Any + Send>(t: T) { |
| 180 | + let Some((reaction, to_test)) = BARRIERS.with(|barriers| barriers.barrier(&t)) else { |
| 181 | + return; |
| 182 | + }; |
| 183 | + |
| 184 | + if let Reaction::Suspend = reaction { |
| 185 | + panic!( |
| 186 | + "trigger_noop() cannot be used with Reaction::Suspend barriers; use trigger() instead" |
| 187 | + ); |
| 188 | + } |
| 189 | + |
| 190 | + if let Reaction::Panic = reaction { |
| 191 | + panic!("Injected panic from barrier"); |
| 192 | + } |
| 193 | + |
| 194 | + let _ = to_test.send((Box::new(t), None)); |
| 195 | +} |
| 196 | + |
| 197 | +struct BarrierState { |
| 198 | + id: Uuid, |
| 199 | + condition: Box<Condition>, |
| 200 | + reaction: Reaction, |
| 201 | + to_test: UnboundedSender<Waker>, |
| 202 | +} |
| 203 | + |
| 204 | +type Condition = dyn Fn(&dyn Any) -> bool; |
| 205 | + |
| 206 | +/// A barrier that waits for source code to trigger a specific condition. |
| 207 | +/// |
| 208 | +/// Create barriers using [`Barrier::new()`] for observation-only barriers, |
| 209 | +/// or [`Barrier::build()`] to specify a custom reaction. |
| 210 | +/// |
| 211 | +/// # Example |
| 212 | +/// |
| 213 | +/// ```ignore |
| 214 | +/// // Wait for a specific event |
| 215 | +/// let mut barrier = Barrier::new(|t: &MyEvent| { |
| 216 | +/// matches!(t, MyEvent::SomethingHappened) |
| 217 | +/// }); |
| 218 | +/// |
| 219 | +/// // ... run simulation ... |
| 220 | +/// |
| 221 | +/// let triggered = barrier.wait().await.unwrap(); |
| 222 | +/// println!("Event occurred: {:?}", *triggered); |
| 223 | +/// ``` |
| 224 | +pub struct Barrier<T> { |
| 225 | + id: Uuid, |
| 226 | + from_src: UnboundedReceiver<Waker>, |
| 227 | + _t: PhantomData<T>, |
| 228 | +} |
| 229 | + |
| 230 | +impl<T: Any + Send> Barrier<T> { |
| 231 | + /// Create a new observation-only barrier that matches the given condition. |
| 232 | + /// |
| 233 | + /// This is equivalent to `Barrier::build(Reaction::Noop, condition)`. |
| 234 | + pub fn new(condition: impl Fn(&T) -> bool + 'static) -> Self { |
| 235 | + Self::build(Reaction::Noop, condition) |
| 236 | + } |
| 237 | + |
| 238 | + /// Create a barrier with a specific reaction and condition. |
| 239 | + /// |
| 240 | + /// # Arguments |
| 241 | + /// |
| 242 | + /// * `reaction` - What happens when the barrier is triggered |
| 243 | + /// * `condition` - A closure that returns true when the trigger matches |
| 244 | + pub fn build(reaction: Reaction, condition: impl Fn(&T) -> bool + 'static) -> Self { |
| 245 | + let condition = Box::new(move |t: &dyn Any| match t.downcast_ref::<T>() { |
| 246 | + Some(t) => condition(t), |
| 247 | + None => false, |
| 248 | + }); |
| 249 | + |
| 250 | + let (tx, rx) = mpsc::unbounded_channel(); |
| 251 | + let id = Uuid::new_v4(); |
| 252 | + let state = BarrierState { |
| 253 | + id, |
| 254 | + condition, |
| 255 | + reaction, |
| 256 | + to_test: tx, |
| 257 | + }; |
| 258 | + BARRIERS.with(|barriers| barriers.insert(state)); |
| 259 | + Self { |
| 260 | + id, |
| 261 | + from_src: rx, |
| 262 | + _t: PhantomData, |
| 263 | + } |
| 264 | + } |
| 265 | + |
| 266 | + /// Wait for the barrier to be triggered. |
| 267 | + /// |
| 268 | + /// Returns `Some(Triggered<T>)` when the barrier is triggered, or `None` |
| 269 | + /// if the barrier is dropped. |
| 270 | + /// |
| 271 | + /// For [`Reaction::Suspend`] barriers, the source code remains suspended |
| 272 | + /// until the returned [`Triggered`] handle is dropped. |
| 273 | + pub async fn wait(&mut self) -> Option<Triggered<T>> { |
| 274 | + let (data, release) = self.from_src.recv().await?; |
| 275 | + let data = *data.downcast::<T>().unwrap(); |
| 276 | + Some(Triggered { data, release }) |
| 277 | + } |
| 278 | +} |
| 279 | + |
| 280 | +impl<T> Drop for Barrier<T> { |
| 281 | + fn drop(&mut self) { |
| 282 | + BARRIERS.with(|barriers| barriers.drop(self.id)); |
| 283 | + } |
| 284 | +} |
| 285 | + |
| 286 | +/// A handle to a triggered barrier. |
| 287 | +/// |
| 288 | +/// This struct holds the trigger value and controls when suspended source code |
| 289 | +/// is released. For [`Reaction::Suspend`] barriers, dropping this handle will |
| 290 | +/// resume the suspended source code. |
| 291 | +/// |
| 292 | +/// Use [`Deref`] to access the trigger value. |
| 293 | +pub struct Triggered<T> { |
| 294 | + data: T, |
| 295 | + release: Option<oneshot::Sender<()>>, |
| 296 | +} |
| 297 | + |
| 298 | +impl<T> Deref for Triggered<T> { |
| 299 | + type Target = T; |
| 300 | + |
| 301 | + fn deref(&self) -> &Self::Target { |
| 302 | + &self.data |
| 303 | + } |
| 304 | +} |
| 305 | + |
| 306 | +impl<T> Drop for Triggered<T> { |
| 307 | + fn drop(&mut self) { |
| 308 | + if let Some(release) = self.release.take() { |
| 309 | + let _ = release.send(()); |
| 310 | + } |
| 311 | + } |
| 312 | +} |
| 313 | + |
| 314 | +/// The reaction when a barrier is triggered. |
| 315 | +#[derive(Debug, Clone)] |
| 316 | +pub enum Reaction { |
| 317 | + /// Observe only - source code continues immediately after trigger. |
| 318 | + Noop, |
| 319 | + /// Suspend source code execution until the [`Triggered`] handle is dropped. |
| 320 | + Suspend, |
| 321 | + /// Cause a panic at the trigger point (useful for testing panic handling). |
| 322 | + Panic, |
| 323 | +} |
| 324 | + |
| 325 | +type Waker = (Box<dyn Any + Send>, Option<oneshot::Sender<()>>); |
0 commit comments