Skip to content
Closed
10 changes: 5 additions & 5 deletions src/bin/cql-stress-scylla-bench/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) struct ScyllaBenchArgs {
pub password: String,
pub mode: Mode,
pub latency_type: LatencyType,
pub max_retries_per_op: u64,
pub max_consecutive_errors_per_op: u64,
pub concurrency: u64,
pub maximum_rate: u64,

Expand Down Expand Up @@ -174,9 +174,9 @@ where
let max_errors_at_row = flag.u64_var(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is a bit confusing, I think it could just be max_consecutive_errors_per_op, no?.

"error-at-row-limit",
0,
"the maximum number of attempts allowed for a single operation. \
"the maximum number of consecutive errors allowed. \
After exceeding it, the workflow will terminate with an error. \
Set to 0 if you want to have unlimited retries",
Set to 0 if you want to disable this limit",
);
let concurrency = flag.u64_var("concurrency", 16, "number of used tasks");
let maximum_rate = flag.u64_var(
Expand Down Expand Up @@ -330,7 +330,7 @@ where
// Zero means unlimited tries,
// and #tries == #retries + 1,
// therefore just subtract with wraparound and treat u64::MAX as infinity
let max_retries_per_op = max_errors_at_row.get().wrapping_sub(1);
let max_consecutive_errors_per_op = max_errors_at_row.get().wrapping_sub(1);
Comment on lines 337 to +340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sub(1) correct?

I think that the maximum number of consecutive errors should be equal to the value of error-at-row-limit, not error-at-row-limit - 1.

Another comment says:

/// The maximum number of consecutive errors allowed before giving up.
pub max_consecutive_errors_per_op: u64,

So AFAIU, the definition of max_consecutive_errors_per_op is the same as that of error-at-row-limit, not one less.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what the comment about retries is trying to say, previously it made some sense, as the variable used to specifiy the number of retries, but now it would be good to update it so that it explains the new variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test test_retrying() sets max_consecutive_errors_per_op = 0, and then runs an operation which fails the first time it's executed. It's expected that the run will fail.

With sub(1) this would correspond to error-at-row-limit = 1. That would mean that error-at-row-limit = 1 doesn't allow any errors, this sounds wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the above comments. From what I understood, the error-at-row-limit defines the number of errors that can occur before failing. In other words, it defines the number of retries - in the worst case scenario we should try to perform the operation error-at-row-limit + 1 times (first try + error-at-row-limit retries).

I think we should either change it to Option<NonZeroU64> (set to None when error-at-row-limit is 0) and remove the wrapping_sub or change the >= to > in WorkerSession::end_operation here:

Err(err) if self.consecutive_errors >= self.context.max_consecutive_errors_per_op => {
    Err(err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be also nice to add tests for edge cases such as error-at-row-limit = 1. In this case, the following scenario would be OK:

1st try: ERROR
2nd try: SUCCESS

, but this should fail:

1st try: ERROR
2nd try: ERROR
... there is no 3rd try, benchmarks terminates

As of now, I believe that the first scenario wouldn't pass since we use wrapping_sub(1) and we make use of >= comparison.


let hdr_latency_resolution = match hdr_latency_units.get().as_str() {
"ns" => 1,
Expand Down Expand Up @@ -376,7 +376,7 @@ where
mode,
concurrency,
latency_type,
max_retries_per_op,
max_consecutive_errors_per_op,
maximum_rate,
test_duration: test_duration.get(),
partition_count,
Expand Down
2 changes: 1 addition & 1 deletion src/bin/cql-stress-scylla-bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn prepare(args: Arc<ScyllaBenchArgs>, stats: Arc<ShardedStats>) -> Result
concurrency: args.concurrency,
rate_limit_per_second,
operation_factory,
max_retries_per_op: args.max_retries_per_op,
max_consecutive_errors_per_op: args.max_consecutive_errors_per_op,
})
}

Expand Down
5 changes: 2 additions & 3 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ pub struct Configuration {
/// during the stress.
pub operation_factory: Arc<dyn OperationFactory>,

/// The maximum number of attempts an operation should be retried
/// before giving up.
pub max_retries_per_op: u64,
/// The maximum number of consecutive errors allowed before giving up.
pub max_consecutive_errors_per_op: u64,
}

/// Contains all necessary context needed to execute an Operation.
Expand Down
54 changes: 24 additions & 30 deletions src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct WorkerContext {
operation_counter: AtomicU64,

rate_limiter: Option<RateLimiter>,
max_retries_per_op: u64,
max_consecutive_errors_per_op: u64,
}

impl WorkerContext {
Expand All @@ -63,7 +63,7 @@ impl WorkerContext {
rate_limiter: config
.rate_limit_per_second
.map(|rate| RateLimiter::new(now, rate)),
max_retries_per_op: config.max_retries_per_op,
max_consecutive_errors_per_op: config.max_consecutive_errors_per_op,
}
}

Expand All @@ -89,7 +89,7 @@ impl WorkerContext {
pub struct WorkerSession {
context: Arc<WorkerContext>,
op_id: u64,
trial_idx: u64,
consecutive_errors: u64,
}

// Not the most beautiful interface, but it works - unlike async callbacks,
Expand All @@ -99,16 +99,13 @@ impl WorkerSession {
Self {
context,
op_id: 0,
trial_idx: 0,
consecutive_errors: 0,
}
}

// Should be called before starting an operation.
pub async fn start_operation(&mut self) -> Option<OperationContext> {
if self.trial_idx == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed? The op_id is now incremented even when current operation failed and should be retried (using the same operation_id).

let next_op_id = self.context.issue_operation_id()?;
self.op_id = next_op_id;
}
self.op_id = self.context.issue_operation_id()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to increase op_id on failure?

AFAIU previously the op_id was an unique identifier for each operation, so if I wanted to insert values 0..100 I could just insert op_id. Then in case where inserting 13 fails, I know that the executor will try to insert 13 again, not 14.


let scheduled_start_time = if let Some(rate_limiter) = &self.context.rate_limiter {
let start_time = rate_limiter.issue_next_start_time();
Expand All @@ -130,13 +127,15 @@ impl WorkerSession {
pub fn end_operation(&mut self, result: Result<ControlFlow<()>>) -> Result<ControlFlow<()>> {
match result {
Ok(flow) => {
self.trial_idx = 0;
self.consecutive_errors = 0;
Ok(flow)
}
Err(err) if self.trial_idx >= self.context.max_retries_per_op => Err(err),
Err(err) if self.consecutive_errors >= self.context.max_consecutive_errors_per_op => {
Err(err)
}
Err(err) if self.context.should_stop() => Err(err),
Err(_) => {
self.trial_idx += 1;
self.consecutive_errors += 1;
Ok(ControlFlow::Continue(()))
}
}
Expand Down Expand Up @@ -256,10 +255,8 @@ async fn do_run(config: Configuration, stop_receiver: oneshot::Receiver<()>) ->

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::Mutex;

use tokio::sync::Semaphore;
use tokio::time::Instant;
Expand Down Expand Up @@ -313,7 +310,7 @@ mod tests {
concurrency: 10,
rate_limit_per_second: None,
operation_factory: Arc::new(FnOperationFactory(f)),
max_retries_per_op: 0,
max_consecutive_errors_per_op: 0,
}
}

Expand Down Expand Up @@ -430,30 +427,22 @@ mod tests {
fut.await.unwrap_err();
}

struct AlternatingSuccessFailOp {
tried_ops: Mutex<HashSet<u64>>,
}
struct AlternatingSuccessFailOp;

make_runnable!(AlternatingSuccessFailOp);
impl AlternatingSuccessFailOp {
fn new() -> Self {
AlternatingSuccessFailOp {
tried_ops: Mutex::new(HashSet::new()),
}
AlternatingSuccessFailOp
}

async fn execute(&mut self, ctx: &OperationContext) -> Result<ControlFlow<()>> {
if ctx.operation_id >= 100 {
return Ok(ControlFlow::Break(()));
}

let mut lock = self.tried_ops.lock().unwrap();
let was_missing = lock.insert(ctx.operation_id);
if was_missing {
// First visit, so fail
Ok(ControlFlow::Break(()))
} else if ctx.operation_id % 2 == 0 {
// Fail on even numbers
Err(anyhow::anyhow!("oops"))
} else {
// Already tried and failed - return success this time
// Suceeed on odd numbers
Ok(ControlFlow::Continue(()))
}
}
Expand All @@ -466,7 +455,12 @@ mod tests {
fut.await.unwrap_err(); // Expect error as there were no retries

let mut cfg = make_test_cfg(AlternatingSuccessFailOp::new);
cfg.max_retries_per_op = 1;
// We can't use higher concurrency because we want to have alternating
// failures and successes. New operation IDs are issued for each operation,
// even after a failure, so we don't have a way to associate some context
// after a failed operation.
cfg.concurrency = 1;
cfg.max_consecutive_errors_per_op = 1;
let (_, fut) = run(cfg);
fut.await.unwrap(); // Expect success as each op was retried
}
Expand All @@ -491,7 +485,7 @@ mod tests {
let sem_clone = Arc::clone(&sem);

let mut cfg = make_test_cfg(move || AlwaysFailsOp(Some(sem_clone.clone())));
cfg.max_retries_per_op = u64::MAX;
cfg.max_consecutive_errors_per_op = u64::MAX;
let concurrency = cfg.concurrency as u32;

let (ctrl, fut) = run(cfg);
Expand Down