Skip to content
Closed
1 change: 1 addition & 0 deletions src/bin/cql-stress-scylla-bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async fn prepare(args: Arc<ScyllaBenchArgs>, stats: Arc<ShardedStats>) -> Result
rate_limit_per_second,
operation_factory,
max_consecutive_errors_per_op: args.max_consecutive_errors_per_op,
max_errors_in_total: u64::MAX,
})
}

Expand Down
4 changes: 4 additions & 0 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct Configuration {

/// The maximum number of consecutive errors allowed before giving up.
pub max_consecutive_errors_per_op: u64,

/// The maximum, global number of errors allowed during the test.
/// After exceeding this number, the bench will be stopped.
pub max_errors_in_total: u64,
}

/// Contains all necessary context needed to execute an Operation.
Expand Down
100 changes: 98 additions & 2 deletions src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const INVALID_OP_ID_THRESHOLD: u64 = 1u64 << 63u64;
// Represents shareable state and configuration of a worker.
struct WorkerContext {
operation_counter: AtomicU64,
retry_countdown: AtomicU64,

rate_limiter: Option<RateLimiter>,
max_consecutive_errors_per_op: u64,
Expand All @@ -59,6 +60,7 @@ impl WorkerContext {
pub fn new(config: &Configuration, now: Instant) -> Self {
Self {
operation_counter: AtomicU64::new(0),
retry_countdown: AtomicU64::new(config.max_errors_in_total),

rate_limiter: config
.rate_limit_per_second
Expand All @@ -84,6 +86,22 @@ impl WorkerContext {
let id = self.operation_counter.fetch_add(1, Ordering::Relaxed);
(id < INVALID_OP_ID_THRESHOLD).then_some(id)
}

// Decrement the global retry counter. If the counter went down to zero,
// it returns ControlFlow::Break to indicate that the task runner
// should stop.
// If the retry counter is decremented after reaching zero, it will wrap
// around. That's fine - the idea is that only one task runner should report
// the error about exceeding the retry count, and that error should cause
// other tasks to be stopped.
fn decrement_global_retry_counter(&self) -> ControlFlow<()> {
let countdown = self.retry_countdown.fetch_sub(1, Ordering::Relaxed);
if countdown == 0 {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
}
}

pub struct WorkerSession {
Expand Down Expand Up @@ -133,6 +151,10 @@ impl WorkerSession {
Err(err) if self.consecutive_errors >= self.context.max_consecutive_errors_per_op => {
Err(err)
}
Err(err) if self.context.decrement_global_retry_counter() == ControlFlow::Break(()) => {
// We have exhausted our global number of allowed retries.
Err(err)
}
Err(err) if self.context.should_stop() => Err(err),
Err(_) => {
self.consecutive_errors += 1;
Expand Down Expand Up @@ -255,7 +277,7 @@ async fn do_run(config: Configuration, stop_receiver: oneshot::Receiver<()>) ->

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;

use tokio::sync::Semaphore;
Expand Down Expand Up @@ -311,6 +333,7 @@ mod tests {
rate_limit_per_second: None,
operation_factory: Arc::new(FnOperationFactory(f)),
max_consecutive_errors_per_op: 0,
max_errors_in_total: 0,
}
}

Expand Down Expand Up @@ -450,7 +473,9 @@ mod tests {

#[tokio::test]
async fn test_retrying() {
let cfg = make_test_cfg(AlternatingSuccessFailOp::new);
let mut cfg = make_test_cfg(AlternatingSuccessFailOp::new);
cfg.max_consecutive_errors_per_op = 0;
cfg.max_errors_in_total = u64::MAX;
let (_, fut) = run(cfg);
fut.await.unwrap_err(); // Expect error as there were no retries

Expand All @@ -461,6 +486,7 @@ mod tests {
// after a failed operation.
cfg.concurrency = 1;
cfg.max_consecutive_errors_per_op = 1;
cfg.max_errors_in_total = u64::MAX;
let (_, fut) = run(cfg);
fut.await.unwrap(); // Expect success as each op was retried
}
Expand All @@ -486,6 +512,7 @@ mod tests {

let mut cfg = make_test_cfg(move || AlwaysFailsOp(Some(sem_clone.clone())));
cfg.max_consecutive_errors_per_op = u64::MAX;
cfg.max_errors_in_total = u64::MAX;
let concurrency = cfg.concurrency as u32;

let (ctrl, fut) = run(cfg);
Expand All @@ -497,4 +524,73 @@ mod tests {
ctrl.ask_to_stop();
fut.await.unwrap_err();
}

#[tokio::test]
async fn test_max_errors_in_total() {
struct Op {
failed: bool,
decremented_after_failure: bool,
shared_counter: Arc<AtomicI64>,
}

make_runnable!(Op);
impl Op {
fn new(shared_counter: Arc<AtomicI64>) -> Self {
Op {
failed: false,
decremented_after_failure: false,
shared_counter,
}
}

async fn execute(&mut self, _ctx: &OperationContext) -> Result<ControlFlow<()>> {
if !self.failed {
// Report my error, only once
self.failed = true;
return Err(anyhow::anyhow!("fail"));
}
if !self.decremented_after_failure {
// Decrement the shared counter, only once
self.decremented_after_failure = true;
self.shared_counter.fetch_sub(1, Ordering::Relaxed);
}
if self.shared_counter.load(Ordering::Relaxed) <= 0 {
// If we are here then this means that all operations
// executed at least once after reporting an error.
// This means that the errors that the operation returned
// weren't enough to stop the whole workload, so stop
// the operation with a success.
return Ok(ControlFlow::Break(()));
}
// Not all operations reported their error or incremented
// the counter yet, keep spinning.
tokio::time::sleep(Duration::from_millis(10)).await; // Make sure we don't enter a spin loop
Ok(ControlFlow::Continue(()))
}
}

let test = |error_count: u64, retry_limit: u64, expect_stoppage: bool| async move {
let shared_counter = Arc::new(AtomicI64::new(error_count as i64));

let mut cfg = make_test_cfg(move || Op::new(shared_counter.clone()));
cfg.concurrency = error_count;
cfg.max_consecutive_errors_per_op = 1; // We need to allow the runner to retry individual failures
cfg.max_errors_in_total = retry_limit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to rename retry_limit to max_errors_allowed, it would be easier to understand.


let (_, fut) = run(cfg);
let res = fut.await;

if expect_stoppage {
assert!(res.is_err());
} else {
assert!(res.is_ok());
}
};

test(10, 20, false).await;
test(19, 20, false).await;
test(20, 20, false).await;
test(21, 20, true).await;
test(30, 20, true).await;
}
}