Skip to content

Commit 7d22b08

Browse files
committed
improved docs
1 parent 6f30404 commit 7d22b08

1 file changed

Lines changed: 207 additions & 86 deletions

File tree

README.md

Lines changed: 207 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,71 @@ impl Node<String> for EmailProcessor {
140140
}
141141
```
142142

143+
#### Built-in Retry Logic
144+
145+
Every node includes configurable retry logic with multiple strategies:
146+
147+
**No Retries** - Fail fast for critical operations:
148+
149+
```rust
150+
impl Node<WorkflowState> for CriticalNode {
151+
fn config(&self) -> NodeConfig {
152+
NodeConfig::minimal() // No retries, fail immediately
153+
}
154+
// ... rest of implementation
155+
}
156+
```
157+
158+
**Fixed Retries** - Consistent delays between attempts:
159+
160+
```rust
161+
impl Node<WorkflowState> for ReliableNode {
162+
fn config(&self) -> NodeConfig {
163+
NodeConfig::new().with_fixed_retry(3, Duration::from_secs(2))
164+
// 3 retries with 2 second delays
165+
}
166+
// ... rest of implementation
167+
}
168+
```
169+
170+
**Exponential Backoff** - Smart retry with increasing delays:
171+
172+
```rust
173+
impl Node<WorkflowState> for ResilientNode {
174+
fn config(&self) -> NodeConfig {
175+
NodeConfig::new().with_exponential_retry(5)
176+
// 5 retries with exponential backoff (100ms, 200ms, 400ms, 800ms, 1.6s)
177+
}
178+
// ... rest of implementation
179+
}
180+
```
181+
182+
**Custom Exponential Backoff** - Full control over retry behavior:
183+
184+
```rust
185+
impl Node<WorkflowState> for CustomNode {
186+
fn config(&self) -> NodeConfig {
187+
NodeConfig::new().with_retry(
188+
RetryMode::exponential_custom(
189+
3, // max retries
190+
Duration::from_millis(50), // base delay
191+
3.0, // multiplier
192+
Duration::from_secs(10), // max delay cap
193+
0.2, // 20% jitter
194+
)
195+
)
196+
}
197+
// ... rest of implementation
198+
}
199+
```
200+
201+
**Why Use Different Retry Modes?**
202+
203+
- **None**: Database transactions, critical validations where failure should be immediate
204+
- **Fixed**: Network calls, file operations where consistent timing is preferred
205+
- **Exponential**: API calls, external services where you want to back off gracefully
206+
- **Custom Exponential**: High-load scenarios where you need precise control over timing and jitter
207+
143208
### 2. Store - Share Data Between Nodes
144209

145210
Use the built-in store to pass data around your workflow:
@@ -179,6 +244,148 @@ workflow.register_node(WorkflowState::Validate, validator)
179244
let result = workflow.orchestrate(&store).await?;
180245
```
181246

247+
#### Complex Workflows
248+
249+
Build sophisticated state machine pipelines with conditional branching, error handling, and retry logic:
250+
251+
```mermaid
252+
graph TD
253+
A[Start] --> B[LoadData]
254+
B --> C{Validate}
255+
C -->|Valid| D[Process]
256+
C -->|Invalid| E[Sanitize]
257+
C -->|Critical Error| F[Error]
258+
E --> D
259+
D --> G{QualityCheck}
260+
G -->|High Quality| H[Enrich]
261+
G -->|Low Quality| I[BasicProcess]
262+
G -->|Failed & Retries Left| J[Retry]
263+
G -->|Failed & No Retries| K[Failed]
264+
H --> L[Complete]
265+
I --> L
266+
J --> D
267+
F --> M[Cleanup]
268+
M --> K
269+
```
270+
271+
```rust
272+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
273+
enum OrderState {
274+
Start,
275+
LoadData,
276+
Validate,
277+
Sanitize,
278+
Process,
279+
QualityCheck,
280+
Enrich,
281+
BasicProcess,
282+
Retry,
283+
Cleanup,
284+
Complete,
285+
Failed,
286+
Error,
287+
}
288+
289+
// Validation node with multiple possible outcomes
290+
struct ValidationNode;
291+
292+
#[async_trait]
293+
impl Node<OrderState> for ValidationNode {
294+
type PrepResult = String;
295+
type ExecResult = ValidationResult;
296+
297+
async fn prep(&self, store: &impl Store) -> Result<Self::PrepResult, CanoError> {
298+
let data: String = store.get("raw_data")?;
299+
Ok(data)
300+
}
301+
302+
async fn exec(&self, data: Self::PrepResult) -> Self::ExecResult {
303+
if data.contains("critical_error") {
304+
ValidationResult::CriticalError
305+
} else if data.len() < 10 {
306+
ValidationResult::Invalid
307+
} else {
308+
ValidationResult::Valid
309+
}
310+
}
311+
312+
async fn post(&self, store: &impl Store, result: Self::ExecResult)
313+
-> Result<OrderState, CanoError> {
314+
match result {
315+
ValidationResult::Valid => {
316+
store.put("validation_status", "passed")?;
317+
Ok(OrderState::Process)
318+
}
319+
ValidationResult::Invalid => {
320+
store.put("validation_status", "needs_sanitization")?;
321+
Ok(OrderState::Sanitize)
322+
}
323+
ValidationResult::CriticalError => {
324+
store.put("error_reason", "critical_validation_failure")?;
325+
Ok(OrderState::Error)
326+
}
327+
}
328+
}
329+
}
330+
331+
// Quality check node with conditional processing paths
332+
struct QualityCheckNode;
333+
334+
#[async_trait]
335+
impl Node<OrderState> for QualityCheckNode {
336+
type PrepResult = (String, i32);
337+
type ExecResult = QualityScore;
338+
339+
async fn prep(&self, store: &impl Store) -> Result<Self::PrepResult, CanoError> {
340+
let data: String = store.get("processed_data")?;
341+
let attempt: i32 = store.get("retry_count").unwrap_or(0);
342+
Ok((data, attempt))
343+
}
344+
345+
async fn exec(&self, (data, attempt): Self::PrepResult) -> Self::ExecResult {
346+
let score = calculate_quality_score(&data);
347+
QualityScore { score, attempt }
348+
}
349+
350+
async fn post(&self, store: &impl Store, result: Self::ExecResult)
351+
-> Result<OrderState, CanoError> {
352+
store.put("quality_score", result.score)?;
353+
354+
match result.score {
355+
90..=100 => Ok(OrderState::Enrich),
356+
60..=89 => Ok(OrderState::BasicProcess),
357+
_ if result.attempt < 3 => {
358+
store.put("retry_count", result.attempt + 1)?;
359+
Ok(OrderState::Retry)
360+
}
361+
_ => Ok(OrderState::Failed),
362+
}
363+
}
364+
}
365+
366+
// # ALL OTHER NODES...
367+
368+
// Build the complex workflow
369+
let mut workflow = Workflow::new(OrderState::Start);
370+
workflow
371+
.register_node(OrderState::Start, DataLoaderNode)
372+
.register_node(OrderState::LoadData, ValidationNode)
373+
.register_node(OrderState::Validate, SanitizeNode)
374+
.register_node(OrderState::Sanitize, ProcessNode)
375+
.register_node(OrderState::Process, QualityCheckNode)
376+
.register_node(OrderState::QualityCheck, EnrichNode)
377+
.register_node(OrderState::Enrich, CompleteNode) // -> Complete
378+
.register_node(OrderState::BasicProcess, CompleteNode) // -> Complete
379+
.register_node(OrderState::Retry, ProcessNode) // -> Process
380+
.register_node(OrderState::Error, CleanupNode) // -> Failed
381+
.add_exit_states(vec![
382+
OrderState::Complete,
383+
OrderState::Failed
384+
]);
385+
386+
let result = workflow.orchestrate(&store).await?;
387+
```
388+
182389
## Scheduler - Simplified Scheduling
183390

184391
The Scheduler module provides an easy-to-use scheduler for running workflows on various schedules. Perfect for building background job systems, periodic data processing, and automated workflows.
@@ -272,92 +479,6 @@ tokio::spawn(async move {
272479
scheduler.stop_with_timeout(Duration::from_secs(30)).await?;
273480
```
274481

275-
## 🧩 Advanced Features
276-
277-
### Built-in Retry Logic
278-
279-
Every node includes configurable retry logic with multiple strategies:
280-
281-
#### Retry Modes
282-
283-
**No Retries** - Fail fast for critical operations:
284-
285-
```rust
286-
impl Node<WorkflowState> for CriticalNode {
287-
fn config(&self) -> NodeConfig {
288-
NodeConfig::minimal() // No retries, fail immediately
289-
}
290-
// ... rest of implementation
291-
}
292-
```
293-
294-
**Fixed Retries** - Consistent delays between attempts:
295-
296-
```rust
297-
impl Node<WorkflowState> for ReliableNode {
298-
fn config(&self) -> NodeConfig {
299-
NodeConfig::new().with_fixed_retry(3, Duration::from_secs(2))
300-
// 3 retries with 2 second delays
301-
}
302-
// ... rest of implementation
303-
}
304-
```
305-
306-
**Exponential Backoff** - Smart retry with increasing delays:
307-
308-
```rust
309-
impl Node<WorkflowState> for ResilientNode {
310-
fn config(&self) -> NodeConfig {
311-
NodeConfig::new().with_exponential_retry(5)
312-
// 5 retries with exponential backoff (100ms, 200ms, 400ms, 800ms, 1.6s)
313-
}
314-
// ... rest of implementation
315-
}
316-
```
317-
318-
**Custom Exponential Backoff** - Full control over retry behavior:
319-
320-
```rust
321-
impl Node<WorkflowState> for CustomNode {
322-
fn config(&self) -> NodeConfig {
323-
NodeConfig::new().with_retry(
324-
RetryMode::exponential_custom(
325-
3, // max retries
326-
Duration::from_millis(50), // base delay
327-
3.0, // multiplier
328-
Duration::from_secs(10), // max delay cap
329-
0.2, // 20% jitter
330-
)
331-
)
332-
}
333-
// ... rest of implementation
334-
}
335-
```
336-
337-
#### Why Use Different Retry Modes?
338-
339-
- **None**: Database transactions, critical validations where failure should be immediate
340-
- **Fixed**: Network calls, file operations where consistent timing is preferred
341-
- **Exponential**: API calls, external services where you want to back off gracefully
342-
- **Custom Exponential**: High-load scenarios where you need precise control over timing and jitter
343-
344-
### Complex Workflows
345-
346-
Chain multiple nodes together to build sophisticated state machine pipelines:
347-
348-
```rust
349-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
350-
enum State { Start, Validate, Process, Complete, Error }
351-
352-
let mut workflow = Workflow::new(State::Start);
353-
workflow.register_node(State::Start, data_loader)
354-
.register_node(State::Validate, validator)
355-
.register_node(State::Process, processor)
356-
.add_exit_states(vec![State::Complete, State::Error]);
357-
358-
let result = workflow.orchestrate(&store).await?;
359-
```
360-
361482
## 🧪 Testing & Benchmarks
362483

363484
Verify everything works and see performance metrics:

0 commit comments

Comments
 (0)