Skip to content

Commit 5a75f8b

Browse files
authored
Timeout variant and per-attempt timeout in TaskConfig (#13)
- Added `Timeout` variant to `CanoError` for bounded operation deadline exceedance. - Introduced `attempt_timeout` in `TaskConfig` to wrap attempts in `tokio::time::timeout`. - Updated documentation to reflect new timeout features and their integration with retries. - Improved panic safety in workflows by catching panics and returning `CanoError::TaskExecution`. - Added tests for timeout behavior and bulkhead concurrency limits.
1 parent a4a2ab5 commit 5a75f8b

11 files changed

Lines changed: 470 additions & 18 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ resolver = "3"
33
members = ["cano", "cano-macros"]
44

55
[workspace.package]
6-
version = "0.10.2"
6+
version = "0.10.3"
77
edition = "2024"
88
rust-version = "1.95.0"
99
license = "MIT OR Apache-2.0"

cano/src/error.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
//! | `Workflow` | Workflow orchestration fails | Verify node registration and routing |
3030
//! | `Configuration` | Invalid node/workflow config | Check parameters and settings |
3131
//! | `RetryExhausted` | All retries exhausted | Increase retries or fix root cause |
32+
//! | `Timeout` | Per-attempt timeout reached | Increase `attempt_timeout` or speed up the task |
3233
//! | `Generic` | General errors | Check the specific error message |
3334
//!
3435
//! ## Using Errors in Your Nodes
@@ -178,6 +179,16 @@ pub enum CanoError {
178179
/// all configured retry attempts have been used up.
179180
RetryExhausted(String),
180181

182+
/// A bounded operation exceeded its deadline.
183+
///
184+
/// Emitted by per-attempt task timeouts (see
185+
/// [`crate::task::TaskConfig::with_attempt_timeout`]). The retry loop treats
186+
/// this as a recoverable failure: each attempt that times out is retried
187+
/// (subject to the configured `RetryMode`), and if all attempts time out
188+
/// the loop wraps the final timeout error in
189+
/// [`CanoError::RetryExhausted`].
190+
Timeout(String),
191+
181192
/// General-purpose error for other scenarios
182193
///
183194
/// Use this for errors that don't fit the other categories.
@@ -230,6 +241,11 @@ impl CanoError {
230241
CanoError::RetryExhausted(msg.into())
231242
}
232243

244+
/// Create a new timeout error
245+
pub fn timeout<S: Into<String>>(msg: S) -> Self {
246+
CanoError::Timeout(msg.into())
247+
}
248+
233249
/// Create a new generic error
234250
pub fn generic<S: Into<String>>(msg: S) -> Self {
235251
CanoError::Generic(msg.into())
@@ -260,6 +276,7 @@ impl CanoError {
260276
CanoError::Workflow(msg) => msg,
261277
CanoError::Configuration(msg) => msg,
262278
CanoError::RetryExhausted(msg) => msg,
279+
CanoError::Timeout(msg) => msg,
263280
CanoError::Generic(msg) => msg,
264281
CanoError::ResourceNotFound(msg) => msg,
265282
CanoError::ResourceTypeMismatch(msg) => msg,
@@ -277,6 +294,7 @@ impl CanoError {
277294
CanoError::Workflow(_) => "workflow",
278295
CanoError::Configuration(_) => "configuration",
279296
CanoError::RetryExhausted(_) => "retry_exhausted",
297+
CanoError::Timeout(_) => "timeout",
280298
CanoError::Generic(_) => "generic",
281299
CanoError::ResourceNotFound(_) => "resource_not_found",
282300
CanoError::ResourceTypeMismatch(_) => "resource_type_mismatch",
@@ -295,6 +313,7 @@ impl std::fmt::Display for CanoError {
295313
CanoError::Workflow(msg) => write!(f, "Workflow error: {msg}"),
296314
CanoError::Configuration(msg) => write!(f, "Configuration error: {msg}"),
297315
CanoError::RetryExhausted(msg) => write!(f, "Retry exhausted: {msg}"),
316+
CanoError::Timeout(msg) => write!(f, "Timeout error: {msg}"),
298317
CanoError::Generic(msg) => write!(f, "Error: {msg}"),
299318
CanoError::ResourceNotFound(msg) => write!(f, "Resource not found: {msg}"),
300319
CanoError::ResourceTypeMismatch(msg) => write!(f, "Resource type mismatch: {msg}"),
@@ -315,6 +334,7 @@ impl PartialEq for CanoError {
315334
(CanoError::Workflow(a), CanoError::Workflow(b)) => a == b,
316335
(CanoError::Configuration(a), CanoError::Configuration(b)) => a == b,
317336
(CanoError::RetryExhausted(a), CanoError::RetryExhausted(b)) => a == b,
337+
(CanoError::Timeout(a), CanoError::Timeout(b)) => a == b,
318338
(CanoError::Generic(a), CanoError::Generic(b)) => a == b,
319339
(CanoError::ResourceNotFound(a), CanoError::ResourceNotFound(b)) => a == b,
320340
(CanoError::ResourceTypeMismatch(a), CanoError::ResourceTypeMismatch(b)) => a == b,
@@ -519,6 +539,30 @@ mod tests {
519539
assert_eq!(e, f);
520540
}
521541

542+
#[test]
543+
fn test_timeout_constructor_and_category() {
544+
let err = CanoError::timeout("attempt deadline reached");
545+
assert_eq!(err.message(), "attempt deadline reached");
546+
assert_eq!(err.category(), "timeout");
547+
assert_eq!(format!("{err}"), "Timeout error: attempt deadline reached");
548+
}
549+
550+
#[test]
551+
fn test_timeout_partial_eq() {
552+
let a = CanoError::timeout("d");
553+
let b = CanoError::timeout("d");
554+
assert_eq!(a, b);
555+
556+
let c = CanoError::timeout("x");
557+
let d = CanoError::timeout("y");
558+
assert_ne!(c, d);
559+
560+
// Distinct from same-message variants
561+
let timeout = CanoError::timeout("k");
562+
let workflow = CanoError::workflow("k");
563+
assert_ne!(timeout, workflow);
564+
}
565+
522566
#[test]
523567
fn test_resource_not_found_distinct_from_resource_type_mismatch() {
524568
let not_found = CanoError::resource_not_found("key");

cano/src/task.rs

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,12 @@ impl Default for RetryMode {
259259
pub struct TaskConfig {
260260
/// Retry strategy for failed executions
261261
pub retry_mode: RetryMode,
262+
/// Optional per-attempt timeout. When set, each attempt inside
263+
/// [`run_with_retries`] is wrapped with [`tokio::time::timeout`]; an
264+
/// expired attempt produces a [`CanoError::Timeout`] and the retry loop
265+
/// continues per `retry_mode`. `None` (the default) preserves the
266+
/// previous behavior of letting attempts run unbounded.
267+
pub attempt_timeout: Option<Duration>,
262268
}
263269

264270
impl TaskConfig {
@@ -273,6 +279,7 @@ impl TaskConfig {
273279
pub fn minimal() -> Self {
274280
Self {
275281
retry_mode: RetryMode::None,
282+
attempt_timeout: None,
276283
}
277284
}
278285

@@ -291,6 +298,12 @@ impl TaskConfig {
291298
pub fn with_exponential_retry(self, max_retries: usize) -> Self {
292299
self.with_retry(RetryMode::exponential(max_retries))
293300
}
301+
302+
/// Apply a per-attempt timeout. Each retry attempt gets a fresh deadline.
303+
pub fn with_attempt_timeout(mut self, timeout: Duration) -> Self {
304+
self.attempt_timeout = Some(timeout);
305+
self
306+
}
294307
}
295308

296309
/// Default implementation for retry logic that can be used by any task
@@ -326,7 +339,19 @@ where
326339
#[cfg(feature = "tracing")]
327340
debug!(attempt = attempt + 1, "Executing task attempt");
328341

329-
match run_fn().await {
342+
let attempt_outcome = match config.attempt_timeout {
343+
Some(d) => match tokio::time::timeout(d, run_fn()).await {
344+
Ok(inner) => inner,
345+
Err(_) => Err(CanoError::timeout(format!(
346+
"Task attempt {} exceeded attempt_timeout of {:?}",
347+
attempt + 1,
348+
d
349+
))),
350+
},
351+
None => run_fn().await,
352+
};
353+
354+
match attempt_outcome {
330355
Ok(result) => {
331356
#[cfg(feature = "tracing")]
332357
info!(attempt = attempt + 1, "Task execution successful");
@@ -1169,6 +1194,83 @@ mod tests {
11691194
assert_eq!(bare_called.load(Ordering::SeqCst), 0);
11701195
}
11711196

1197+
#[tokio::test]
1198+
async fn test_attempt_timeout_triggers_retry() {
1199+
use std::sync::atomic::{AtomicUsize, Ordering};
1200+
1201+
let config = TaskConfig::new()
1202+
.with_fixed_retry(2, Duration::from_millis(1))
1203+
.with_attempt_timeout(Duration::from_millis(20));
1204+
let counter = Arc::new(AtomicUsize::new(0));
1205+
let counter_clone = Arc::clone(&counter);
1206+
1207+
let result = run_with_retries::<TaskResult<String>, _, _>(&config, || {
1208+
let counter = Arc::clone(&counter_clone);
1209+
async move {
1210+
counter.fetch_add(1, Ordering::SeqCst);
1211+
tokio::time::sleep(Duration::from_millis(200)).await;
1212+
Ok::<TaskResult<String>, CanoError>(TaskResult::Single("never".to_string()))
1213+
}
1214+
})
1215+
.await;
1216+
1217+
assert!(result.is_err());
1218+
let err = result.unwrap_err();
1219+
assert!(
1220+
matches!(err, CanoError::RetryExhausted(_)),
1221+
"expected RetryExhausted, got: {err}"
1222+
);
1223+
let msg = err.to_string();
1224+
assert!(
1225+
msg.contains("Timeout error") || msg.contains("attempt_timeout"),
1226+
"expected timeout context in error, got: {msg}"
1227+
);
1228+
// 1 initial + 2 retries — every attempt times out before the sleep returns.
1229+
assert_eq!(counter.load(Ordering::SeqCst), 3);
1230+
}
1231+
1232+
#[tokio::test]
1233+
async fn test_attempt_timeout_none_unchanged() {
1234+
let config = TaskConfig::new();
1235+
assert!(config.attempt_timeout.is_none());
1236+
1237+
let result = run_with_retries::<TaskResult<String>, _, _>(&config, || async {
1238+
tokio::time::sleep(Duration::from_millis(5)).await;
1239+
Ok::<TaskResult<String>, CanoError>(TaskResult::Single("ok".to_string()))
1240+
})
1241+
.await
1242+
.unwrap();
1243+
assert_eq!(result, TaskResult::Single("ok".to_string()));
1244+
}
1245+
1246+
#[tokio::test]
1247+
async fn test_attempt_timeout_resets_per_attempt() {
1248+
use std::sync::atomic::{AtomicUsize, Ordering};
1249+
1250+
// First attempt sleeps long enough to trip the timeout; second attempt
1251+
// returns immediately. Verifies each attempt gets a fresh deadline.
1252+
let config = TaskConfig::new()
1253+
.with_fixed_retry(1, Duration::from_millis(1))
1254+
.with_attempt_timeout(Duration::from_millis(30));
1255+
let counter = Arc::new(AtomicUsize::new(0));
1256+
let counter_clone = Arc::clone(&counter);
1257+
1258+
let result = run_with_retries::<TaskResult<String>, _, _>(&config, || {
1259+
let counter = Arc::clone(&counter_clone);
1260+
async move {
1261+
let n = counter.fetch_add(1, Ordering::SeqCst);
1262+
if n == 0 {
1263+
tokio::time::sleep(Duration::from_millis(200)).await;
1264+
}
1265+
Ok::<TaskResult<String>, CanoError>(TaskResult::Single("ok".to_string()))
1266+
}
1267+
})
1268+
.await
1269+
.unwrap();
1270+
assert_eq!(result, TaskResult::Single("ok".to_string()));
1271+
assert_eq!(counter.load(Ordering::SeqCst), 2);
1272+
}
1273+
11721274
#[tokio::test]
11731275
async fn test_retry_exhausted_error_type() {
11741276
use std::sync::atomic::{AtomicUsize, Ordering};

0 commit comments

Comments
 (0)