Skip to content

Commit fff6c8c

Browse files
committed
fix: enhance error handling in Node and Task execution phases to preserve original error variants when retries are disabled
1 parent 60a4c04 commit fff6c8c

3 files changed

Lines changed: 99 additions & 2 deletions

File tree

src/node.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,9 @@ where
344344
if attempt >= max_attempts {
345345
#[cfg(feature = "tracing")]
346346
tracing::error!(attempt = attempt, error = ?e, "Node execution failed after maximum attempts");
347+
if max_attempts <= 1 {
348+
return Err(e);
349+
}
347350
return Err(CanoError::retry_exhausted(format!(
348351
"Node post phase failed after {} attempt(s): {}",
349352
attempt, e
@@ -371,6 +374,9 @@ where
371374
if attempt >= max_attempts {
372375
#[cfg(feature = "tracing")]
373376
tracing::error!(attempt = attempt, error = ?e, "Node execution failed after maximum attempts");
377+
if max_attempts <= 1 {
378+
return Err(e);
379+
}
374380
return Err(CanoError::retry_exhausted(format!(
375381
"Node prep phase failed after {} attempt(s): {}",
376382
attempt, e
@@ -1940,4 +1946,84 @@ mod tests {
19401946
"expected RetryExhausted after retry exhaustion, got: {err}"
19411947
);
19421948
}
1949+
1950+
#[tokio::test]
1951+
async fn test_node_no_retry_preserves_error_variant() {
1952+
use std::sync::atomic::{AtomicUsize, Ordering};
1953+
1954+
struct PrepFailNode {
1955+
attempt_counter: Arc<AtomicUsize>,
1956+
}
1957+
1958+
#[async_trait]
1959+
impl Node<TestAction> for PrepFailNode {
1960+
type PrepResult = ();
1961+
type ExecResult = ();
1962+
1963+
fn config(&self) -> TaskConfig {
1964+
TaskConfig::minimal()
1965+
}
1966+
1967+
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
1968+
self.attempt_counter.fetch_add(1, Ordering::SeqCst);
1969+
Err(CanoError::preparation("prep boom"))
1970+
}
1971+
1972+
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
1973+
1974+
async fn post(
1975+
&self,
1976+
_store: &MemoryStore,
1977+
_exec_res: Self::ExecResult,
1978+
) -> Result<TestAction, CanoError> {
1979+
Ok(TestAction::Complete)
1980+
}
1981+
}
1982+
1983+
let node = PrepFailNode {
1984+
attempt_counter: Arc::new(AtomicUsize::new(0)),
1985+
};
1986+
let store = MemoryStore::new();
1987+
let err = node.run(&store).await.unwrap_err();
1988+
1989+
assert_eq!(node.attempt_counter.load(Ordering::SeqCst), 1);
1990+
assert!(
1991+
matches!(err, CanoError::Preparation(_)),
1992+
"expected original Preparation variant when retries disabled, got: {err}"
1993+
);
1994+
assert!(err.to_string().contains("prep boom"));
1995+
1996+
struct PostFailNode;
1997+
1998+
#[async_trait]
1999+
impl Node<TestAction> for PostFailNode {
2000+
type PrepResult = ();
2001+
type ExecResult = ();
2002+
2003+
fn config(&self) -> TaskConfig {
2004+
TaskConfig::minimal()
2005+
}
2006+
2007+
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
2008+
Ok(())
2009+
}
2010+
2011+
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
2012+
2013+
async fn post(
2014+
&self,
2015+
_store: &MemoryStore,
2016+
_exec_res: Self::ExecResult,
2017+
) -> Result<TestAction, CanoError> {
2018+
Err(CanoError::node_execution("post boom"))
2019+
}
2020+
}
2021+
2022+
let err = PostFailNode.run(&store).await.unwrap_err();
2023+
assert!(
2024+
matches!(err, CanoError::NodeExecution(_)),
2025+
"expected original NodeExecution variant when retries disabled, got: {err}"
2026+
);
2027+
assert!(err.to_string().contains("post boom"));
2028+
}
19432029
}

src/store/memory.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,10 @@ mod tests {
757757
let arc2: Arc<String> = store.get_shared("msg").unwrap();
758758

759759
assert_eq!(*arc1, "hello");
760-
assert_eq!(arc1, arc2);
760+
assert!(
761+
Arc::ptr_eq(&arc1, &arc2),
762+
"get_shared must return clones of the same Arc, not fresh allocations"
763+
);
761764
}
762765

763766
#[test]

src/task.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,9 @@ where
300300
}
301301

302302
if attempt >= max_attempts {
303+
if max_attempts <= 1 {
304+
return Err(e);
305+
}
303306
return Err(CanoError::retry_exhausted(format!(
304307
"Task failed after {} attempt(s): {}",
305308
attempt, e
@@ -1068,7 +1071,12 @@ mod tests {
10681071

10691072
assert!(result.is_err());
10701073
assert_eq!(counter.load(Ordering::SeqCst), 1);
1071-
assert!(result.unwrap_err().to_string().contains("immediate fail"));
1074+
let err = result.unwrap_err();
1075+
assert!(
1076+
matches!(err, CanoError::TaskExecution(_)),
1077+
"expected original TaskExecution variant when retries disabled, got: {err}"
1078+
);
1079+
assert!(err.to_string().contains("immediate fail"));
10721080
}
10731081

10741082
#[tokio::test]

0 commit comments

Comments
 (0)