Skip to content

Commit 5744f78

Browse files
authored
Task Runner Refactor (#5)
* Node: Trait / workflow changes * fix warning and fmt * improved task docs * improved docs * NodeConfig -> TagConfig * updated docs * badge on main branch * remove badge * deprecate register_node * task benchmarks * only running ci on main branch interactions * importing run_with_retries locally * moving test imports to the correct place
1 parent 2c4d1b9 commit 5744f78

28 files changed

Lines changed: 2970 additions & 563 deletions

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ name: CI
22

33
on:
44
push:
5+
branches: [ "main" ]
56
pull_request:
7+
branches: [ "main" ]
68

79
env:
810
CARGO_TERM_COLOR: always

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "cano"
33
version = "0.4.0"
44
homepage = "https://github.com/nassor/cano"
55
edition = "2024"
6-
description = "Async AI & Data Workflows in Rust"
6+
description = "Simple & Fast Async Workflows in Rust - Build data processing pipelines with Tasks and Nodes"
77
license = "MIT"
88
repository = "https://github.com/nassor/cano"
99
readme = "README.md"
@@ -85,3 +85,7 @@ harness = false
8585
[[bench]]
8686
name = "store_performance"
8787
harness = false
88+
89+
[[bench]]
90+
name = "task_performance"
91+
harness = false

README.md

Lines changed: 112 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010

1111
Cano is an async workflow engine for Rust that manages complex processing through composable workflows. It can be used for data processing, AI inference workflows, and background jobs. Cano provides a simple, fast and type-safe API for defining workflows with retry strategies, scheduling capabilities, and shared state management.
1212

13-
The engine is built on three core concepts: **Nodes** to encapsulate business logic, **Workflows** to manage state transitions, and **Schedulers** to run workflows on a schedule.
13+
The engine is built on three core concepts: **Tasks** and **Nodes** to encapsulate business logic, **Workflows** to manage state transitions, and **Schedulers** to run workflows on a schedule.
1414

1515
*The Node API is inspired by the [PocketFlow](https://github.com/The-Pocket/PocketFlow) project, adapted for Rust's async ecosystem.*
1616

1717
## Features
1818

19-
- **Node-based API**: Single `Node` trait for implementing processing logic
19+
- **Task & Node APIs**: Single `Task` trait for simple processing logic, or `Node` trait for structured three-phase lifecycle
2020
- **State Machines**: Type-safe enum-driven state transitions with compile-time checking
21-
- **Retry Strategies**: None, fixed delays, and exponential backoff with jitter
21+
- **Retry Strategies**: None, fixed delays, and exponential backoff with jitter (for both Tasks and Nodes)
2222
- **Flexible Storage**: Built-in `MemoryStore` or custom struct types for data sharing
2323
- **Workflow Scheduling**: Built-in scheduler with intervals, cron schedules, and manual triggers
2424
- **Concurrent Execution**: Execute multiple workflow instances in parallel with timeout strategies
@@ -45,9 +45,29 @@ use cano::prelude::*;
4545
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4646
enum WorkflowState {
4747
Start,
48+
Process,
4849
Complete,
4950
}
5051

52+
// Simple Task implementation - single run method
53+
struct SimpleTask;
54+
55+
#[async_trait]
56+
impl Task<WorkflowState> for SimpleTask {
57+
fn config(&self) -> TaskConfig {
58+
// Configure retry behavior for resilience
59+
TaskConfig::new().with_exponential_retry(2)
60+
}
61+
62+
async fn run(&self, store: &MemoryStore) -> Result<WorkflowState, CanoError> {
63+
let input: String = store.get("input").unwrap_or_default();
64+
println!("Processing: {input}");
65+
store.put("result", "task_processed".to_string())?;
66+
Ok(WorkflowState::Process)
67+
}
68+
}
69+
70+
// Structured Node implementation - three-phase lifecycle
5171
struct ProcessorNode;
5272

5373
#[async_trait]
@@ -56,31 +76,32 @@ impl Node<WorkflowState> for ProcessorNode {
5676
type ExecResult = bool;
5777

5878
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
59-
let input: String = store.get("input").unwrap_or_default();
79+
let input: String = store.get("result").unwrap_or_default();
6080
Ok(input)
6181
}
6282

6383
async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult {
64-
println!("Processing: {prep_res}");
84+
println!("Node processing: {prep_res}");
6585
true // Success
6686
}
6787

6888
async fn post(&self, store: &MemoryStore, exec_res: Self::ExecResult)
6989
-> Result<WorkflowState, CanoError> {
7090
if exec_res {
71-
store.put("result", "processed".to_string())?;
91+
store.put("final_result", "node_processed".to_string())?;
7292
Ok(WorkflowState::Complete)
7393
} else {
74-
Ok(WorkflowState::Start) // Retry
94+
Ok(WorkflowState::Process) // Retry
7595
}
7696
}
7797
}
7898

7999
#[tokio::main]
80100
async fn main() -> Result<(), CanoError> {
81-
// Create workflow
101+
// Create workflow - can mix Tasks and Nodes
82102
let mut workflow = Workflow::new(WorkflowState::Start);
83-
workflow.register_node(WorkflowState::Start, ProcessorNode)
103+
workflow.register(WorkflowState::Start, SimpleTask) // Task
104+
.register(WorkflowState::Process, ProcessorNode) // Node
84105
.add_exit_state(WorkflowState::Complete);
85106

86107
// Create store and run workflow
@@ -96,9 +117,42 @@ async fn main() -> Result<(), CanoError> {
96117

97118
## Core Concepts
98119

99-
### 1. Nodes - Processing Units
120+
### 1. Tasks & Nodes - Processing Units
121+
122+
Cano provides two approaches for implementing processing logic:
123+
124+
#### Tasks - Simple & Flexible
125+
126+
A `Task` provides a simplified interface with a single `run` method. Use tasks when you want simplicity and don't need structured phases or built-in retry logic:
127+
128+
```rust
129+
struct DataProcessor;
130+
131+
#[async_trait]
132+
impl Task<String> for DataProcessor {
133+
fn config(&self) -> TaskConfig {
134+
// Configure retry behavior (optional)
135+
TaskConfig::new().with_fixed_retry(3, Duration::from_secs(1))
136+
}
137+
138+
async fn run(&self, store: &MemoryStore) -> Result<String, CanoError> {
139+
// Load data
140+
let input: String = store.get("input")?;
141+
142+
// Process data
143+
let result = format!("processed: {input}");
144+
145+
// Store result and determine next state
146+
store.put("output", result)?;
147+
Ok("complete".to_string())
148+
}
149+
}
150+
```
151+
152+
#### Nodes - Structured & Resilient
153+
154+
A `Node` implements a structured three-phase lifecycle with built-in retry capabilities. Nodes are a superset of Tasks with additional structure and retry strategies:
100155

101-
A `Node` implements the processing logic for your workflow. Each node follows a three-phase lifecycle:
102156
1. **Prep**: Load data, validate inputs, setup resources
103157
2. **Exec**: Core processing logic (with automatic retry support)
104158
3. **Post**: Store results, cleanup, determine next action
@@ -133,21 +187,47 @@ impl Node<String> for EmailProcessor {
133187
}
134188
```
135189

190+
#### Compatibility & When to Use Which
191+
192+
- **Every Node automatically implements Task** - you can use any Node wherever Tasks are accepted
193+
- **Use Task for**: Simple processing, quick prototypes, one-off operations
194+
- **Use Node for**: Production workloads, complex processing, when you need structured three-phase lifecycle
195+
136196
#### Retry Strategies
137197

138-
Configure retry behavior using `NodeConfig`:
198+
Both Tasks and Nodes support retry strategies. Configure retry behavior using `TaskConfig`:
139199

140200
```rust
201+
// Task with retry configuration
202+
impl Task<WorkflowState> for ReliableTask {
203+
fn config(&self) -> TaskConfig {
204+
// No retries (fail fast)
205+
TaskConfig::minimal()
206+
207+
// Fixed retries: 3 attempts with 2 second delays
208+
// TaskConfig::new().with_fixed_retry(3, Duration::from_secs(2))
209+
210+
// Exponential backoff: 5 retries with increasing delays
211+
// TaskConfig::new().with_exponential_retry(5)
212+
}
213+
214+
async fn run(&self, store: &MemoryStore) -> Result<WorkflowState, CanoError> {
215+
// Your task logic here...
216+
Ok(WorkflowState::Complete)
217+
}
218+
}
219+
220+
// Node with retry configuration
141221
impl Node<WorkflowState> for ReliableNode {
142-
fn config(&self) -> NodeConfig {
222+
fn config(&self) -> TaskConfig {
143223
// No retries (fail fast)
144-
NodeConfig::minimal()
224+
TaskConfig::minimal()
145225

146226
// Fixed retries: 3 attempts with 2 second delays
147-
// NodeConfig::new().with_fixed_retry(3, Duration::from_secs(2))
227+
// TaskConfig::new().with_fixed_retry(3, Duration::from_secs(2))
148228

149229
// Exponential backoff: 5 retries with increasing delays
150-
// NodeConfig::new().with_exponential_retry(5)
230+
// TaskConfig::new().with_exponential_retry(5)
151231
}
152232
// ... rest of implementation
153233
}
@@ -209,7 +289,7 @@ impl Node<ProcessingState, RequestCtx> for MetricsNode {
209289

210290
### 3. Workflows - State Management
211291

212-
Build workflows with state machine semantics:
292+
Build workflows with state machine semantics. Workflows can register both Tasks and Nodes using the unified `register` method:
213293

214294
```rust
215295
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -221,8 +301,8 @@ enum WorkflowState {
221301
}
222302

223303
let mut workflow = Workflow::new(WorkflowState::Validate);
224-
workflow.register_node(WorkflowState::Validate, validator)
225-
.register_node(WorkflowState::Process, processor)
304+
workflow.register(WorkflowState::Validate, validator_task) // Task
305+
.register(WorkflowState::Process, processor_node) // Node
226306
.add_exit_states(vec![WorkflowState::Complete, WorkflowState::Error]);
227307

228308
let result = workflow.orchestrate(&store).await?;
@@ -343,15 +423,15 @@ impl Node<OrderState> for QualityCheckNode {
343423
// Build the complete workflow
344424
let mut workflow = Workflow::new(OrderState::Start);
345425
workflow
346-
.register_node(OrderState::Start, DataLoaderNode)
347-
.register_node(OrderState::Validate, ValidationNode)
348-
.register_node(OrderState::Sanitize, SanitizeNode)
349-
.register_node(OrderState::Process, ProcessNode)
350-
.register_node(OrderState::QualityCheck, QualityCheckNode)
351-
.register_node(OrderState::Enrich, EnrichNode)
352-
.register_node(OrderState::BasicProcess, CompleteNode)
353-
.register_node(OrderState::Retry, ProcessNode)
354-
.register_node(OrderState::Error, CleanupNode)
426+
.register(OrderState::Start, DataLoaderNode)
427+
.register(OrderState::Validate, ValidationNode)
428+
.register(OrderState::Sanitize, SanitizeNode)
429+
.register(OrderState::Process, ProcessNode)
430+
.register(OrderState::QualityCheck, QualityCheckNode)
431+
.register(OrderState::Enrich, EnrichNode)
432+
.register(OrderState::BasicProcess, CompleteNode)
433+
.register(OrderState::Retry, ProcessNode)
434+
.register(OrderState::Error, CleanupNode)
355435
.add_exit_states(vec![OrderState::Complete, OrderState::Failed]);
356436

357437
let result = workflow.orchestrate(&store).await?;
@@ -366,7 +446,7 @@ use cano::prelude::*;
366446

367447
// Create a concurrent workflow with the same API as regular workflows
368448
let mut concurrent_workflow = ConcurrentWorkflow::new(ProcessingState::Start);
369-
concurrent_workflow.register_node(ProcessingState::Start, processing_node);
449+
concurrent_workflow.register(ProcessingState::Start, processing_node);
370450
concurrent_workflow.add_exit_state(ProcessingState::Complete);
371451

372452
// Execute with different wait strategies
@@ -434,11 +514,11 @@ async fn main() -> CanoResult<()> {
434514

435515
// Create regular workflows with consistent API
436516
let mut workflow1 = Workflow::new(MyState::Start);
437-
workflow1.register_node(MyState::Start, MyTask);
517+
workflow1.register(MyState::Start, MyTask);
438518
workflow1.add_exit_state(MyState::Complete);
439519

440520
let mut workflow2 = Workflow::new(MyState::Start);
441-
workflow2.register_node(MyState::Start, MyTask);
521+
workflow2.register(MyState::Start, MyTask);
442522
workflow2.add_exit_state(MyState::Complete);
443523

444524
// Schedule regular workflows
@@ -449,7 +529,7 @@ async fn main() -> CanoResult<()> {
449529

450530
// Create concurrent workflow with identical API to regular workflows
451531
let mut concurrent_workflow = ConcurrentWorkflow::new(MyState::Start);
452-
concurrent_workflow.register_node(MyState::Start, MyTask);
532+
concurrent_workflow.register(MyState::Start, MyTask);
453533
concurrent_workflow.add_exit_state(MyState::Complete);
454534

455535
// Schedule concurrent workflows (multiple instances in parallel)

0 commit comments

Comments
 (0)