|
| 1 | +//! # Mixed Task and Node Workflow Example |
| 2 | +//! |
| 3 | +//! This example demonstrates the power of Cano's unified registration system by |
| 4 | +//! mixing both Tasks and Nodes in the same workflow: |
| 5 | +//! |
| 6 | +//! 1. **DataGeneratorTask**: Simple task that generates test data |
| 7 | +//! 2. **ProcessorNode**: Structured node with retry logic for data processing |
| 8 | +//! 3. **ValidatorTask**: Quick validation task |
| 9 | +//! 4. **ReportNode**: Structured node for generating final reports |
| 10 | +//! |
| 11 | +//! **Key Features Demonstrated:** |
| 12 | +//! - **Unified Registration**: Both Tasks and Nodes use `.register()` method |
| 13 | +//! - **Seamless Interoperability**: Tasks and Nodes work together in one workflow |
| 14 | +//! - **Flexible Architecture**: Choose Task or Node based on specific needs |
| 15 | +//! - **Automatic Compatibility**: Every Node implements Task automatically |
| 16 | +//! |
| 17 | +//! **When to use Task vs Node:** |
| 18 | +//! - Use **Task** for simple, flexible processing where you want full control |
| 19 | +//! - Use **Node** for structured processing with built-in retry and error handling |
| 20 | +//! |
| 21 | +//! Run with: |
| 22 | +//! ```bash |
| 23 | +//! cargo run --example mixed_workflow |
| 24 | +//! ``` |
| 25 | +
|
| 26 | +use async_trait::async_trait; |
| 27 | +use cano::prelude::*; |
| 28 | +use rand::Rng; |
| 29 | + |
| 30 | +/// Workflow states |
| 31 | +#[derive(Debug, Clone, PartialEq, Eq, Hash)] |
| 32 | +enum WorkflowState { |
| 33 | + GenerateData, |
| 34 | + ProcessData, |
| 35 | + ValidateResults, |
| 36 | + GenerateReport, |
| 37 | + Complete, |
| 38 | +} |
| 39 | + |
| 40 | +/// Simple Task for data generation - maximum flexibility |
| 41 | +struct DataGeneratorTask { |
| 42 | + size: usize, |
| 43 | +} |
| 44 | + |
| 45 | +impl DataGeneratorTask { |
| 46 | + fn new(size: usize) -> Self { |
| 47 | + Self { size } |
| 48 | + } |
| 49 | +} |
| 50 | + |
| 51 | +#[async_trait] |
| 52 | +impl Task<WorkflowState> for DataGeneratorTask { |
| 53 | + async fn run(&self, store: &MemoryStore) -> CanoResult<WorkflowState> { |
| 54 | + println!( |
| 55 | + "📊 DataGeneratorTask: Generating {} data points...", |
| 56 | + self.size |
| 57 | + ); |
| 58 | + |
| 59 | + let mut rng = rand::rng(); |
| 60 | + let data: Vec<f64> = (0..self.size) |
| 61 | + .map(|_| rng.random_range(0.0..100.0)) |
| 62 | + .collect(); |
| 63 | + |
| 64 | + println!( |
| 65 | + " Generated data range: {:.2} to {:.2}", |
| 66 | + data.iter().fold(f64::INFINITY, |a, &b| a.min(b)), |
| 67 | + data.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)) |
| 68 | + ); |
| 69 | + |
| 70 | + store.put("raw_data", data)?; |
| 71 | + store.put("generation_time", std::time::SystemTime::now())?; |
| 72 | + |
| 73 | + Ok(WorkflowState::ProcessData) |
| 74 | + } |
| 75 | +} |
| 76 | + |
| 77 | +/// Structured Node for data processing - built-in retry and error handling |
| 78 | +struct ProcessorNode { |
| 79 | + threshold: f64, |
| 80 | +} |
| 81 | + |
| 82 | +impl ProcessorNode { |
| 83 | + fn new(threshold: f64) -> Self { |
| 84 | + Self { threshold } |
| 85 | + } |
| 86 | +} |
| 87 | + |
| 88 | +#[async_trait] |
| 89 | +impl Node<WorkflowState> for ProcessorNode { |
| 90 | + type PrepResult = Vec<f64>; |
| 91 | + type ExecResult = (Vec<f64>, DataStats); |
| 92 | + |
| 93 | + async fn prep(&self, store: &MemoryStore) -> CanoResult<Self::PrepResult> { |
| 94 | + println!("🔧 ProcessorNode::prep - Loading and validating data..."); |
| 95 | + |
| 96 | + let data: Vec<f64> = store.get("raw_data")?; |
| 97 | + if data.is_empty() { |
| 98 | + return Err(CanoError::node_execution("No data to process".to_string())); |
| 99 | + } |
| 100 | + |
| 101 | + println!(" Loaded {} data points for processing", data.len()); |
| 102 | + Ok(data) |
| 103 | + } |
| 104 | + |
| 105 | + async fn exec(&self, raw_data: Self::PrepResult) -> Self::ExecResult { |
| 106 | + println!( |
| 107 | + "⚙️ ProcessorNode::exec - Processing data with threshold {}...", |
| 108 | + self.threshold |
| 109 | + ); |
| 110 | + |
| 111 | + // Complex processing that might benefit from retry logic |
| 112 | + let processed_data: Vec<f64> = raw_data |
| 113 | + .iter() |
| 114 | + .filter(|&&x| x > self.threshold) |
| 115 | + .map(|&x| x * 1.5) // Apply some transformation |
| 116 | + .collect(); |
| 117 | + |
| 118 | + let stats = if processed_data.is_empty() { |
| 119 | + DataStats { |
| 120 | + count: 0, |
| 121 | + mean: 0.0, |
| 122 | + max: 0.0, |
| 123 | + min: 0.0, |
| 124 | + } |
| 125 | + } else { |
| 126 | + DataStats { |
| 127 | + count: processed_data.len(), |
| 128 | + mean: processed_data.iter().sum::<f64>() / processed_data.len() as f64, |
| 129 | + max: processed_data |
| 130 | + .iter() |
| 131 | + .fold(f64::NEG_INFINITY, |a, &b| a.max(b)), |
| 132 | + min: processed_data.iter().fold(f64::INFINITY, |a, &b| a.min(b)), |
| 133 | + } |
| 134 | + }; |
| 135 | + |
| 136 | + println!(" Processed {} points above threshold", stats.count); |
| 137 | + (processed_data, stats) |
| 138 | + } |
| 139 | + |
| 140 | + async fn post( |
| 141 | + &self, |
| 142 | + store: &MemoryStore, |
| 143 | + exec_res: Self::ExecResult, |
| 144 | + ) -> CanoResult<WorkflowState> { |
| 145 | + println!("📋 ProcessorNode::post - Finalizing processing..."); |
| 146 | + |
| 147 | + let (processed_data, stats) = exec_res; |
| 148 | + |
| 149 | + store.put("processed_data", processed_data)?; |
| 150 | + store.put("stats", stats.clone())?; |
| 151 | + |
| 152 | + if stats.count == 0 { |
| 153 | + println!(" ⚠️ No data survived processing - might need to adjust threshold"); |
| 154 | + return Ok(WorkflowState::GenerateReport); // Skip validation |
| 155 | + } |
| 156 | + |
| 157 | + println!( |
| 158 | + " ✅ Processing complete - {} points ready for validation", |
| 159 | + stats.count |
| 160 | + ); |
| 161 | + Ok(WorkflowState::ValidateResults) |
| 162 | + } |
| 163 | +} |
| 164 | + |
| 165 | +/// Simple Task for quick validation |
| 166 | +struct ValidatorTask; |
| 167 | + |
| 168 | +#[async_trait] |
| 169 | +impl Task<WorkflowState> for ValidatorTask { |
| 170 | + async fn run(&self, store: &MemoryStore) -> CanoResult<WorkflowState> { |
| 171 | + println!("✅ ValidatorTask: Running validation checks..."); |
| 172 | + |
| 173 | + let stats: DataStats = store.get("stats")?; |
| 174 | + let processed_data: Vec<f64> = store.get("processed_data")?; |
| 175 | + |
| 176 | + // Quick validation checks |
| 177 | + let mut validation_results = Vec::new(); |
| 178 | + |
| 179 | + if stats.count != processed_data.len() { |
| 180 | + validation_results.push("Count mismatch between stats and data".to_string()); |
| 181 | + } |
| 182 | + |
| 183 | + if stats.mean.is_nan() || stats.mean.is_infinite() { |
| 184 | + validation_results.push("Invalid mean value".to_string()); |
| 185 | + } |
| 186 | + |
| 187 | + if processed_data |
| 188 | + .iter() |
| 189 | + .any(|&x| x.is_nan() || x.is_infinite()) |
| 190 | + { |
| 191 | + validation_results.push("Invalid values in processed data".to_string()); |
| 192 | + } |
| 193 | + |
| 194 | + store.put("validation_errors", validation_results.clone())?; |
| 195 | + |
| 196 | + if validation_results.is_empty() { |
| 197 | + println!(" ✅ All validation checks passed!"); |
| 198 | + } else { |
| 199 | + println!( |
| 200 | + " ⚠️ Found {} validation issues", |
| 201 | + validation_results.len() |
| 202 | + ); |
| 203 | + for error in &validation_results { |
| 204 | + println!(" - {error}"); |
| 205 | + } |
| 206 | + } |
| 207 | + |
| 208 | + Ok(WorkflowState::GenerateReport) |
| 209 | + } |
| 210 | +} |
| 211 | + |
| 212 | +/// Structured Node for report generation |
| 213 | +struct ReportNode; |
| 214 | + |
| 215 | +#[async_trait] |
| 216 | +impl Node<WorkflowState> for ReportNode { |
| 217 | + type PrepResult = (); |
| 218 | + type ExecResult = (); |
| 219 | + |
| 220 | + async fn prep(&self, store: &MemoryStore) -> CanoResult<Self::PrepResult> { |
| 221 | + println!("📊 ReportNode::prep - Gathering report data..."); |
| 222 | + |
| 223 | + // Ensure we have all required data |
| 224 | + let _stats: DataStats = store.get("stats")?; |
| 225 | + let _validation_errors: Vec<String> = store.get("validation_errors")?; |
| 226 | + |
| 227 | + Ok(()) |
| 228 | + } |
| 229 | + |
| 230 | + async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult { |
| 231 | + println!("📝 ReportNode::exec - Generating comprehensive report..."); |
| 232 | + |
| 233 | + // Simulate report generation |
| 234 | + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; |
| 235 | + |
| 236 | + () |
| 237 | + } |
| 238 | + |
| 239 | + async fn post( |
| 240 | + &self, |
| 241 | + store: &MemoryStore, |
| 242 | + _exec_res: Self::ExecResult, |
| 243 | + ) -> CanoResult<WorkflowState> { |
| 244 | + println!("📋 ReportNode::post - Finalizing report..."); |
| 245 | + |
| 246 | + let stats: DataStats = store.get("stats")?; |
| 247 | + let validation_errors: Vec<String> = store.get("validation_errors")?; |
| 248 | + |
| 249 | + // Generate final report |
| 250 | + let report = format!( |
| 251 | + "=== PROCESSING REPORT ===\n\ |
| 252 | + Data Points Processed: {}\n\ |
| 253 | + Mean Value: {:.2}\n\ |
| 254 | + Min Value: {:.2}\n\ |
| 255 | + Max Value: {:.2}\n\ |
| 256 | + Validation Issues: {}\n\ |
| 257 | + Status: {}", |
| 258 | + stats.count, |
| 259 | + stats.mean, |
| 260 | + stats.min, |
| 261 | + stats.max, |
| 262 | + validation_errors.len(), |
| 263 | + if validation_errors.is_empty() { |
| 264 | + "✅ PASSED" |
| 265 | + } else { |
| 266 | + "⚠️ WITH WARNINGS" |
| 267 | + } |
| 268 | + ); |
| 269 | + |
| 270 | + store.put("final_report", report)?; |
| 271 | + println!(" 📄 Report generated successfully!"); |
| 272 | + |
| 273 | + Ok(WorkflowState::Complete) |
| 274 | + } |
| 275 | +} |
| 276 | + |
| 277 | +#[derive(Debug, Clone)] |
| 278 | +struct DataStats { |
| 279 | + count: usize, |
| 280 | + mean: f64, |
| 281 | + min: f64, |
| 282 | + max: f64, |
| 283 | +} |
| 284 | + |
| 285 | +#[tokio::main] |
| 286 | +async fn main() -> CanoResult<()> { |
| 287 | + println!("🚀 Starting Mixed Task/Node workflow example\n"); |
| 288 | + |
| 289 | + // Create workflow |
| 290 | + let mut workflow = Workflow::new(WorkflowState::GenerateData); |
| 291 | + |
| 292 | + // Register mix of Tasks and Nodes using the unified .register() method |
| 293 | + println!("🔧 Registering workflow components:"); |
| 294 | + println!(" 📊 DataGeneratorTask (Task) -> Generate"); |
| 295 | + workflow.register(WorkflowState::GenerateData, DataGeneratorTask::new(20)); |
| 296 | + |
| 297 | + println!(" ⚙️ ProcessorNode (Node) -> Process"); |
| 298 | + workflow.register(WorkflowState::ProcessData, ProcessorNode::new(25.0)); |
| 299 | + |
| 300 | + println!(" ✅ ValidatorTask (Task) -> Validate"); |
| 301 | + workflow.register(WorkflowState::ValidateResults, ValidatorTask); |
| 302 | + |
| 303 | + println!(" 📊 ReportNode (Node) -> Report"); |
| 304 | + workflow.register(WorkflowState::GenerateReport, ReportNode); |
| 305 | + |
| 306 | + // Set exit state |
| 307 | + workflow.add_exit_states(vec![WorkflowState::Complete]); |
| 308 | + |
| 309 | + println!("\n🎯 Running mixed Task/Node workflow...\n"); |
| 310 | + |
| 311 | + // Run the workflow |
| 312 | + let store = MemoryStore::new(); |
| 313 | + match workflow.orchestrate(&store).await { |
| 314 | + Ok(_final_state) => { |
| 315 | + println!("\n🎉 Mixed workflow completed successfully!"); |
| 316 | + |
| 317 | + if let Ok(report) = store.get::<String>("final_report") { |
| 318 | + println!("\n{report}"); |
| 319 | + } |
| 320 | + } |
| 321 | + Err(e) => { |
| 322 | + eprintln!("❌ Workflow failed: {e}"); |
| 323 | + return Err(e); |
| 324 | + } |
| 325 | + } |
| 326 | + |
| 327 | + println!("\n💡 This example shows how Tasks and Nodes work together:"); |
| 328 | + println!(" • Tasks provide flexibility for simple operations"); |
| 329 | + println!(" • Nodes provide structure for complex processing with retry logic"); |
| 330 | + println!(" • Both use the same .register() method"); |
| 331 | + println!(" • Both can be mixed freely in the same workflow"); |
| 332 | + |
| 333 | + Ok(()) |
| 334 | +} |
0 commit comments