Skip to content

Commit 8a26520

Browse files
committed
correct completed/failed counts
1 parent addbf85 commit 8a26520

1 file changed

Lines changed: 142 additions & 4 deletions

File tree

src/workflow.rs

Lines changed: 142 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -609,9 +609,15 @@ where
609609
}
610610
}
611611
Err(_) => {
612-
// Timeout occurred - we need to handle the tasks that were still running
613-
// For simplicity, we'll count them as cancelled
614-
status.cancelled = workflow_count;
612+
// Timeout occurred - check which workflows are still in default state (Cancelled)
613+
// and count them, while preserving any that may have completed
614+
for result in &results {
615+
match result {
616+
WorkflowResult::Success(_) => status.completed += 1,
617+
WorkflowResult::Failed(_) => status.failed += 1,
618+
WorkflowResult::Cancelled => status.cancelled += 1,
619+
}
620+
}
615621
}
616622
}
617623
}
@@ -669,7 +675,14 @@ where
669675
}
670676
Err(_) => {
671677
// Duration timeout occurred before quota was reached
672-
status.cancelled = workflow_count;
678+
// Check which workflows actually completed and only mark the rest as cancelled
679+
for result in &results {
680+
match result {
681+
WorkflowResult::Success(_) => status.completed += 1,
682+
WorkflowResult::Failed(_) => status.failed += 1,
683+
WorkflowResult::Cancelled => status.cancelled += 1,
684+
}
685+
}
673686
}
674687
}
675688
}
@@ -1682,4 +1695,129 @@ mod tests {
16821695
format!("{cancelled:?}"),
16831696
];
16841697
}
1698+
1699+
#[tokio::test]
1700+
async fn test_timeout_preserves_completed_workflows() {
1701+
// This test verifies that when a timeout occurs, the system correctly
1702+
// counts workflows that completed before the timeout rather than
1703+
// marking all workflows as cancelled
1704+
1705+
// Create a node that completes instantly
1706+
#[derive(Clone)]
1707+
struct InstantNode;
1708+
1709+
#[async_trait]
1710+
impl Node<TestState> for InstantNode {
1711+
type PrepResult = ();
1712+
type ExecResult = ();
1713+
1714+
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
1715+
Ok(())
1716+
}
1717+
1718+
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
1719+
()
1720+
}
1721+
1722+
async fn post(
1723+
&self,
1724+
_store: &MemoryStore,
1725+
_exec_res: Self::ExecResult,
1726+
) -> Result<TestState, CanoError> {
1727+
Ok(TestState::Complete)
1728+
}
1729+
}
1730+
1731+
// Create a node that takes longer than our timeout
1732+
#[derive(Clone)]
1733+
struct SlowNode;
1734+
1735+
#[async_trait]
1736+
impl Node<TestState> for SlowNode {
1737+
type PrepResult = ();
1738+
type ExecResult = ();
1739+
1740+
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
1741+
// Sleep longer than our timeout
1742+
tokio::time::sleep(Duration::from_millis(200)).await;
1743+
Ok(())
1744+
}
1745+
1746+
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
1747+
()
1748+
}
1749+
1750+
async fn post(
1751+
&self,
1752+
_store: &MemoryStore,
1753+
_exec_res: Self::ExecResult,
1754+
) -> Result<TestState, CanoError> {
1755+
Ok(TestState::Complete)
1756+
}
1757+
}
1758+
1759+
// Test with a mix of fast and slow workflows
1760+
let mut concurrent_workflow = ConcurrentWorkflow::new(TestState::Start);
1761+
concurrent_workflow.add_exit_state(TestState::Complete);
1762+
1763+
// Register both types of nodes for the same state
1764+
// (we'll create separate workflows for testing)
1765+
1766+
// First test: all fast workflows with short timeout - should all complete
1767+
concurrent_workflow.register_node(TestState::Start, InstantNode);
1768+
1769+
let stores = vec![MemoryStore::new(), MemoryStore::new()];
1770+
let (_results, status) = concurrent_workflow
1771+
.execute_concurrent(
1772+
stores,
1773+
WaitStrategy::WaitDuration(Duration::from_millis(50)),
1774+
)
1775+
.await
1776+
.unwrap();
1777+
1778+
// All should complete successfully since they're instant
1779+
assert_eq!(status.total_workflows, 2);
1780+
assert_eq!(status.completed, 2);
1781+
assert_eq!(status.failed, 0);
1782+
assert_eq!(status.cancelled, 0);
1783+
1784+
// Now test with slow workflows and short timeout
1785+
let mut slow_workflow = ConcurrentWorkflow::new(TestState::Start);
1786+
slow_workflow.add_exit_state(TestState::Complete);
1787+
slow_workflow.register_node(TestState::Start, SlowNode);
1788+
1789+
let stores = vec![MemoryStore::new(), MemoryStore::new()];
1790+
let (results, status) = slow_workflow
1791+
.execute_concurrent(
1792+
stores,
1793+
WaitStrategy::WaitDuration(Duration::from_millis(50)),
1794+
)
1795+
.await
1796+
.unwrap();
1797+
1798+
// These should timeout and be cancelled
1799+
assert_eq!(status.total_workflows, 2);
1800+
// With our fix, we should properly count what actually happened
1801+
// Since they timeout before completion, they should all be cancelled
1802+
assert_eq!(status.completed, 0);
1803+
assert_eq!(status.failed, 0);
1804+
assert_eq!(status.cancelled, 2);
1805+
1806+
// Verify that results match the status counts
1807+
let mut success_count = 0;
1808+
let mut failed_count = 0;
1809+
let mut cancelled_count = 0;
1810+
1811+
for result in &results {
1812+
match result {
1813+
WorkflowResult::Success(_) => success_count += 1,
1814+
WorkflowResult::Failed(_) => failed_count += 1,
1815+
WorkflowResult::Cancelled => cancelled_count += 1,
1816+
}
1817+
}
1818+
1819+
assert_eq!(success_count, status.completed);
1820+
assert_eq!(failed_count, status.failed);
1821+
assert_eq!(cancelled_count, status.cancelled);
1822+
}
16851823
}

0 commit comments

Comments
 (0)