diff --git a/src/bin/cql-stress-scylla-bench/args.rs b/src/bin/cql-stress-scylla-bench/args.rs index ab151852..164623e3 100644 --- a/src/bin/cql-stress-scylla-bench/args.rs +++ b/src/bin/cql-stress-scylla-bench/args.rs @@ -42,7 +42,8 @@ pub(crate) struct ScyllaBenchArgs { pub password: String, pub mode: Mode, pub latency_type: LatencyType, - pub max_retries_per_op: u64, + pub max_consecutive_errors_per_op: u64, + pub max_errors_in_total: u64, pub concurrency: u64, pub maximum_rate: u64, @@ -174,9 +175,15 @@ where let max_errors_at_row = flag.u64_var( "error-at-row-limit", 0, - "the maximum number of attempts allowed for a single operation. \ + "the maximum number of consecutive errors allowed. \ After exceeding it, the workflow will terminate with an error. \ - Set to 0 if you want to have unlimited retries", + Set to 0 if you want to disable this limit", + ); + let max_errors = flag.u64_var( + "error-limit", + 0, + "the number of total errors after which the workflow should stop and fail; \ + set it to 0 (the default) to disable this limit", ); let concurrency = flag.u64_var("concurrency", 16, "number of used tasks"); let maximum_rate = flag.u64_var( @@ -330,7 +337,10 @@ where // Zero means unlimited tries, // and #tries == #retries + 1, // therefore just subtract with wraparound and treat u64::MAX as infinity - let max_retries_per_op = max_errors_at_row.get().wrapping_sub(1); + let max_consecutive_errors_per_op = max_errors_at_row.get().wrapping_sub(1); + + // Similar to above + let max_errors_in_total = max_errors.get().wrapping_sub(1); let hdr_latency_resolution = match hdr_latency_units.get().as_str() { "ns" => 1, @@ -376,7 +386,8 @@ where mode, concurrency, latency_type, - max_retries_per_op, + max_consecutive_errors_per_op, + max_errors_in_total, maximum_rate, test_duration: test_duration.get(), partition_count, @@ -418,6 +429,22 @@ impl ScyllaBenchArgs { println!("Mode:\t\t\t {}", show_mode(&self.mode)); println!("Workload:\t\t {}", show_workload(&self.workload)); println!("Timeout:\t\t {}", format_duration(self.timeout)); + if self.max_consecutive_errors_per_op == u64::MAX { + println!("Max error number at row: unlimited"); + } else { + println!( + "Max error number at row: {}", + self.max_consecutive_errors_per_op as u128 + 1, + ); + } + if self.max_errors_in_total == u64::MAX { + println!("Max error number:\t unlimited"); + } else { + println!( + "Max error number:\t {}", + self.max_errors_in_total as u128 + 1, + ); + } println!( "Consistency level:\t {}", show_consistency_level(&self.consistency_level) diff --git a/src/bin/cql-stress-scylla-bench/main.rs b/src/bin/cql-stress-scylla-bench/main.rs index 2c13f1ec..381197b3 100644 --- a/src/bin/cql-stress-scylla-bench/main.rs +++ b/src/bin/cql-stress-scylla-bench/main.rs @@ -93,13 +93,19 @@ async fn main() -> Result<()> { combined_stats.combine(&partial_stats); } result = &mut run_finished => { + let errors = match &result { + Ok(_) => &[], + Err(err) => err.errors.as_slice(), + }; + // Combine stats for the last time + let partial_stats = sharded_stats.get_combined_and_clear(); + combined_stats.combine(&partial_stats); + printer.print_final(&combined_stats, errors, &mut std::io::stdout())?; if result.is_ok() { - // Combine stats for the last time - let partial_stats = sharded_stats.get_combined_and_clear(); - combined_stats.combine(&partial_stats); - printer.print_final(&combined_stats, &mut std::io::stdout())?; + return Ok(()); + } else { + return Err(anyhow::anyhow!("Benchmark failed")); } - return result.context("An error occurred during the benchmark"); } } } @@ -145,7 +151,8 @@ async fn prepare(args: Arc, stats: Arc) -> Result concurrency: args.concurrency, rate_limit_per_second, operation_factory, - max_retries_per_op: args.max_retries_per_op as usize, + max_consecutive_errors_per_op: args.max_consecutive_errors_per_op, + max_errors_in_total: args.max_errors_in_total, }) } diff --git a/src/bin/cql-stress-scylla-bench/stats.rs b/src/bin/cql-stress-scylla-bench/stats.rs index bbd77c5d..6e08d181 100644 --- a/src/bin/cql-stress-scylla-bench/stats.rs +++ b/src/bin/cql-stress-scylla-bench/stats.rs @@ -252,7 +252,12 @@ impl StatsPrinter { Ok(()) } - pub fn print_final(&self, stats: &Stats, out: &mut impl Write) -> Result<()> { + pub fn print_final( + &self, + stats: &Stats, + errors: &[anyhow::Error], + out: &mut impl Write, + ) -> Result<()> { let time = Instant::now() - self.start_time; writeln!(out)?; writeln!(out, "Results:")?; @@ -274,7 +279,16 @@ impl StatsPrinter { self.print_final_latency_histogram("c-o fixed latency", &ls.co_fixed, out)?; } - // TODO: "critical errors" + if !errors.is_empty() { + writeln!( + out, + "\nFollowing critical errors were caught during the run:" + )?; + for err in errors { + // The {:#} syntax makes sure that the error is printed in one line + writeln!(out, " {:#}", err)?; + } + } Ok(()) } diff --git a/src/configuration.rs b/src/configuration.rs index e18ec689..139acfd6 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -33,9 +33,12 @@ pub struct Configuration { /// during the stress. pub operation_factory: Arc, - /// The maximum number of attempts an operation should be retried - /// before giving up. - pub max_retries_per_op: usize, + /// The maximum number of consecutive errors allowed before giving up. + pub max_consecutive_errors_per_op: u64, + + /// The maximum, global number of errors allowed during the test. + /// After exceeding this number, the bench will be stopped. + pub max_errors_in_total: u64, } /// Contains all necessary context needed to execute an Operation. diff --git a/src/run.rs b/src/run.rs index 7cf51d5e..14d46f20 100644 --- a/src/run.rs +++ b/src/run.rs @@ -50,20 +50,24 @@ const INVALID_OP_ID_THRESHOLD: u64 = 1u64 << 63u64; // Represents shareable state and configuration of a worker. struct WorkerContext { operation_counter: AtomicU64, + retry_countdown: AtomicU64, rate_limiter: Option, - max_retries_per_op: usize, + max_consecutive_errors_per_op: u64, + max_errors_in_total: u64, // For error reporting purposes only } impl WorkerContext { pub fn new(config: &Configuration, now: Instant) -> Self { Self { operation_counter: AtomicU64::new(0), + retry_countdown: AtomicU64::new(config.max_errors_in_total), rate_limiter: config .rate_limit_per_second .map(|rate| RateLimiter::new(now, rate)), - max_retries_per_op: config.max_retries_per_op, + max_consecutive_errors_per_op: config.max_consecutive_errors_per_op, + max_errors_in_total: config.max_errors_in_total, } } @@ -84,12 +88,28 @@ impl WorkerContext { let id = self.operation_counter.fetch_add(1, Ordering::Relaxed); (id < INVALID_OP_ID_THRESHOLD).then_some(id) } + + // Decrement the global retry counter. If the counter went down to zero, + // it returns ControlFlow::Break to indicate that the task runner + // should stop. + // If the retry counter is decremented after reaching zero, it will wrap + // around. That's fine - the idea is that only one task runner should report + // the error about exceeding the retry count, and that error should cause + // other tasks to be stopped. + fn decrement_global_retry_counter(&self) -> ControlFlow<()> { + let countdown = self.retry_countdown.fetch_sub(1, Ordering::Relaxed); + if countdown == 0 { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + } } pub struct WorkerSession { context: Arc, op_id: u64, - trial_idx: usize, + consecutive_errors: u64, } // Not the most beautiful interface, but it works - unlike async callbacks, @@ -99,16 +119,13 @@ impl WorkerSession { Self { context, op_id: 0, - trial_idx: 0, + consecutive_errors: 0, } } // Should be called before starting an operation. pub async fn start_operation(&mut self) -> Option { - if self.trial_idx == 0 { - let next_op_id = self.context.issue_operation_id()?; - self.op_id = next_op_id; - } + self.op_id = self.context.issue_operation_id()?; let scheduled_start_time = if let Some(rate_limiter) = &self.context.rate_limiter { let start_time = rate_limiter.issue_next_start_time(); @@ -130,13 +147,25 @@ impl WorkerSession { pub fn end_operation(&mut self, result: Result>) -> Result> { match result { Ok(flow) => { - self.trial_idx = 0; + self.consecutive_errors = 0; Ok(flow) } - Err(err) if self.trial_idx >= self.context.max_retries_per_op => Err(err), - Err(err) if self.context.should_stop() => Err(err), + Err(err) if self.consecutive_errors >= self.context.max_consecutive_errors_per_op => { + Err(err.context(format!( + "Maximum number of errors allowed per operation exceeded ({})", + self.context.max_consecutive_errors_per_op as u128 + 1, + ))) + } + Err(err) if self.context.decrement_global_retry_counter() == ControlFlow::Break(()) => { + // We have exhausted our global number of allowed retries. + Err(err.context(format!( + "Maximum global number of total errors exceeded ({})", + self.context.max_errors_in_total as u128 + 1, + ))) + } + Err(_) if self.context.should_stop() => Ok(ControlFlow::Break(())), Err(_) => { - self.trial_idx += 1; + self.consecutive_errors += 1; Ok(ControlFlow::Continue(())) } } @@ -174,13 +203,19 @@ impl RunController { } } +#[derive(Debug)] +pub struct RunError { + /// All errors that occured during the test. + pub errors: Vec, +} + /// Runs an operation multiple times in parallel, according to config. /// /// Returns a pair (controller, future), where: /// - `controller` is an object that can be used to control the state of the run, /// - `future` is a future which can be waited on in order to obtain the result /// of the run. It does not need to be polled in order for the run to progress. -pub fn run(config: Configuration) -> (RunController, impl Future>) { +pub fn run(config: Configuration) -> (RunController, impl Future>) { let (stop_sender, stop_receiver) = oneshot::channel(); let (result_sender, result_receiver) = oneshot::channel(); @@ -201,14 +236,21 @@ pub fn run(config: Configuration) -> (RunController, impl Future, _> = result_receiver.await; - result.unwrap_or_else(|_| Err(anyhow::anyhow!("The run was aborted"))) + let result: Result, _> = result_receiver.await; + result.unwrap_or_else(|_| { + Err(RunError { + errors: vec![anyhow::anyhow!("The run was aborted")], + }) + }) }; (controller, result_fut) } -async fn do_run(config: Configuration, stop_receiver: oneshot::Receiver<()>) -> Result<()> { +async fn do_run( + config: Configuration, + stop_receiver: oneshot::Receiver<()>, +) -> Result<(), RunError> { let start_time = Instant::now(); let ctx = Arc::new(WorkerContext::new(&config, start_time)); @@ -241,25 +283,26 @@ async fn do_run(config: Configuration, stop_receiver: oneshot::Receiver<()>) -> handle }; - let mut result: Result<()> = Ok(()); + let mut errors = Vec::new(); - // TODO: Collect all errors and report them while let Some(worker_result) = worker_handles.next().await { if let Err(err) = worker_result { - result = Err(err); ctx.ask_to_stop(); + errors.push(err); } } - result + if errors.is_empty() { + Ok(()) + } else { + Err(RunError { errors }) + } } #[cfg(test)] mod tests { - use std::collections::HashSet; - use std::sync::atomic::AtomicU64; + use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; - use std::sync::Mutex; use tokio::sync::Semaphore; use tokio::time::Instant; @@ -313,7 +356,8 @@ mod tests { concurrency: 10, rate_limit_per_second: None, operation_factory: Arc::new(FnOperationFactory(f)), - max_retries_per_op: 0, + max_consecutive_errors_per_op: 0, + max_errors_in_total: 0, } } @@ -430,30 +474,22 @@ mod tests { fut.await.unwrap_err(); } - struct AlternatingSuccessFailOp { - tried_ops: Mutex>, - } + struct AlternatingSuccessFailOp; make_runnable!(AlternatingSuccessFailOp); impl AlternatingSuccessFailOp { fn new() -> Self { - AlternatingSuccessFailOp { - tried_ops: Mutex::new(HashSet::new()), - } + AlternatingSuccessFailOp } async fn execute(&mut self, ctx: &OperationContext) -> Result> { if ctx.operation_id >= 100 { - return Ok(ControlFlow::Break(())); - } - - let mut lock = self.tried_ops.lock().unwrap(); - let was_missing = lock.insert(ctx.operation_id); - if was_missing { - // First visit, so fail + Ok(ControlFlow::Break(())) + } else if ctx.operation_id % 2 == 0 { + // Fail on even numbers Err(anyhow::anyhow!("oops")) } else { - // Already tried and failed - return success this time + // Suceeed on odd numbers Ok(ControlFlow::Continue(())) } } @@ -461,12 +497,20 @@ mod tests { #[tokio::test] async fn test_retrying() { - let cfg = make_test_cfg(AlternatingSuccessFailOp::new); + let mut cfg = make_test_cfg(AlternatingSuccessFailOp::new); + cfg.max_consecutive_errors_per_op = 0; + cfg.max_errors_in_total = u64::MAX; let (_, fut) = run(cfg); fut.await.unwrap_err(); // Expect error as there were no retries let mut cfg = make_test_cfg(AlternatingSuccessFailOp::new); - cfg.max_retries_per_op = 1; + // We can't use higher concurrency because we want to have alternating + // failures and successes. New operation IDs are issued for each operation, + // even after a failure, so we don't have a way to associate some context + // after a failed operation. + cfg.concurrency = 1; + cfg.max_consecutive_errors_per_op = 1; + cfg.max_errors_in_total = u64::MAX; let (_, fut) = run(cfg); fut.await.unwrap(); // Expect success as each op was retried } @@ -491,7 +535,8 @@ mod tests { let sem_clone = Arc::clone(&sem); let mut cfg = make_test_cfg(move || AlwaysFailsOp(Some(sem_clone.clone()))); - cfg.max_retries_per_op = usize::MAX; + cfg.max_consecutive_errors_per_op = u64::MAX; + cfg.max_errors_in_total = u64::MAX; let concurrency = cfg.concurrency as u32; let (ctrl, fut) = run(cfg); @@ -501,6 +546,104 @@ mod tests { // Ask to stop and make sure that the workload finishes ctrl.ask_to_stop(); - fut.await.unwrap_err(); + fut.await.unwrap(); + } + + #[tokio::test] + async fn test_max_errors_in_total() { + struct Op { + failed: bool, + decremented_after_failure: bool, + shared_counter: Arc, + } + + make_runnable!(Op); + impl Op { + fn new(shared_counter: Arc) -> Self { + Op { + failed: false, + decremented_after_failure: false, + shared_counter, + } + } + + async fn execute(&mut self, _ctx: &OperationContext) -> Result> { + if !self.failed { + // Report my error, only once + self.failed = true; + return Err(anyhow::anyhow!("fail")); + } + if !self.decremented_after_failure { + // Decrement the shared counter, only once + self.decremented_after_failure = true; + self.shared_counter.fetch_sub(1, Ordering::Relaxed); + } + if self.shared_counter.load(Ordering::Relaxed) <= 0 { + // If we are here then this means that all operations + // executed at least once after reporting an error. + // This means that the errors that the operation returned + // weren't enough to stop the whole workload, so stop + // the operation with a success. + return Ok(ControlFlow::Break(())); + } + // Not all operations reported their error or incremented + // the counter yet, keep spinning. + tokio::time::sleep(Duration::from_millis(10)).await; // Make sure we don't enter a spin loop + Ok(ControlFlow::Continue(())) + } + } + + let test = |error_count: u64, retry_limit: u64, expect_stoppage: bool| async move { + let shared_counter = Arc::new(AtomicI64::new(error_count as i64)); + + let mut cfg = make_test_cfg(move || Op::new(shared_counter.clone())); + cfg.concurrency = error_count; + cfg.max_consecutive_errors_per_op = 1; // We need to allow the runner to retry individual failures + cfg.max_errors_in_total = retry_limit; + + let (_, fut) = run(cfg); + let res = fut.await; + + if expect_stoppage { + assert!(res.is_err()); + } else { + assert!(res.is_ok()); + } + }; + + test(10, 20, false).await; + test(19, 20, false).await; + test(20, 20, false).await; + test(21, 20, true).await; + test(30, 20, true).await; + } + + #[tokio::test] + #[ntest::timeout(1000)] + async fn test_stops_after_one_fails() { + struct Op(bool); + + make_runnable!(Op); + impl Op { + async fn execute(&mut self, _ctx: &OperationContext) -> Result> { + // Yield so that we don't get stuck in a loop and block the executor thread + tokio::task::yield_now().await; + if self.0 { + Ok(ControlFlow::Continue(())) + } else { + Err(anyhow::anyhow!("error")) + } + } + } + + let counter = AtomicU64::new(0); + let mut cfg = make_test_cfg(move || { + let id = counter.fetch_add(1, Ordering::Relaxed); + Op(id > 0) // Operation with id==0 always fail, others always succeed + }); + cfg.concurrency = 3; + + let (_, fut) = run(cfg); + fut.await.unwrap_err(); // Error from one task should stop other tasks } }