Skip to content

Commit 7e779d5

Browse files
committed
fix: update workflow state transitions to remove Aggregate step and streamline process
1 parent 2036189 commit 7e779d5

1 file changed

Lines changed: 5 additions & 8 deletions

File tree

README.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ graph TD
4646
T1 --> Join{Join All}
4747
T2 --> Join
4848
T3 --> Join
49-
Join --> Aggregate[Aggregate]
50-
Aggregate --> Complete([Complete])
49+
Join --> Complete([Complete])
5150
```
5251

5352
```rust
@@ -57,8 +56,6 @@ use std::time::Duration;
5756
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
5857
enum FlowState {
5958
Start,
60-
FetchData,
61-
Aggregate,
6259
Complete,
6360
}
6461

@@ -78,7 +75,7 @@ impl Task<FlowState> for FetchSourceTask {
7875
let key = format!("source_{}", self.source_id);
7976
store.put(&key, format!("data_from_{}", self.source_id))?;
8077

81-
Ok(TaskResult::Single(FlowState::Aggregate))
78+
Ok(TaskResult::Single(FlowState::Complete))
8279
}
8380
}
8481

@@ -94,15 +91,15 @@ async fn main() -> Result<(), CanoError> {
9491
];
9592

9693
// 2. Configure join strategy
97-
// Wait for ALL tasks to complete successfully before moving to Aggregate
94+
// Wait for ALL tasks to complete successfully before moving to Complete
9895
let join_config = JoinConfig::new(
9996
JoinStrategy::All,
100-
FlowState::Aggregate
97+
FlowState::Complete
10198
).with_timeout(Duration::from_secs(5));
10299

103100
// 3. Build Workflow
104101
let workflow = Workflow::new(store)
105-
// Start -> Split into parallel tasks
102+
// Start -> Split into parallel tasks -> Complete
106103
.register_split(
107104
FlowState::Start,
108105
sources,

0 commit comments

Comments
 (0)