Skip to content

Add Concurrent Workflow Execution & Enhanced Scheduler#3

Closed
nassor wants to merge 6 commits intomainfrom
feature/concurrent-workflow
Closed

Add Concurrent Workflow Execution & Enhanced Scheduler#3
nassor wants to merge 6 commits intomainfrom
feature/concurrent-workflow

Conversation

@nassor
Copy link
Copy Markdown
Owner

@nassor nassor commented Jul 26, 2025

This major feature release introduces concurrent workflow execution capabilities to Cano, enabling parallel processing of multiple workflow instances with configurable timeout strategies. This provides dramatic performance improvements for I/O-bound operations and batch processing scenarios.

✨ Key Features Added

🔄 Concurrent Workflows

  • New ConcurrentWorkflow API for executing multiple workflow instances in parallel
  • Four timeout strategies for flexible execution control:
    • WaitForever - Execute all workflows to completion
    • WaitForQuota(n) - Complete a specific number, then cancel the rest
    • WaitDuration(duration) - Execute within a time limit
    • WaitQuotaOrDuration - Complete quota OR wait for duration, whichever comes first
  • Performance gains: 5x-47x speedup for I/O-bound operations (benchmarked)
  • Instance-based execution allowing different parameters per workflow instance

⏰ Enhanced Scheduler

  • Concurrent workflow scheduling with new methods:
    • every_seconds_concurrent(), every_minutes_concurrent(), every_hours_concurrent()
    • cron_concurrent() for cron-based concurrent execution
    • manual_concurrent() for manual concurrent triggers
  • Enhanced monitoring with concurrent workflow status tracking
  • Improved status reporting showing completed/failed instance counts

📋 Changes Summary

New Files

  • concurrent_workflow_example.rs - Complete concurrent workflow demonstration
  • workflow_concurrent_book_prepositions.rs - Real-world text processing example

Enhanced Files

  • scheduler.rs (+690 lines) - Full concurrent workflow scheduling implementation
  • workflow.rs (+655 lines) - Core concurrent workflow execution engine
  • workflow_performance.rs (+529 lines) - Comprehensive concurrent performance benchmarks
  • README.md - Complete documentation rewrite with concurrent workflow examples
  • Cargo.toml - New futures dependency and example configurations

🎯 Use Cases

  • Batch processing of independent data items
  • Parallel API calls to external services
  • I/O-bound operations with significant wait times
  • High-throughput data processing pipelines
  • Concurrent background job execution

📊 Performance Impact

  • 5x-47x performance improvement for I/O-bound concurrent operations
  • Minimal overhead for sequential workflows (unchanged)
  • Configurable concurrency to prevent resource exhaustion

🔧 Breaking Changes

  • None - All existing APIs remain unchanged and fully compatible

📚 Documentation

  • Complete API documentation with examples
  • Performance benchmarks and guidance
  • Best practices for concurrent vs sequential execution
  • Multiple real-world usage examples

This feature significantly expands Cano's capabilities for high-performance workflow orchestration while maintaining the simple, type-safe API that makes it easy to use.

Copilot AI review requested due to automatic review settings July 26, 2025 14:35
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces concurrent workflow execution capabilities to Cano, enabling parallel processing of multiple workflow instances with configurable timeout strategies. The changes provide significant performance improvements for I/O-bound operations while maintaining full backward compatibility.

  • New ConcurrentWorkflow API with four timeout strategies (WaitForever, WaitForQuota, WaitDuration, WaitQuotaOrDuration)
  • Enhanced scheduler with concurrent workflow scheduling methods and status tracking
  • Comprehensive examples and benchmarks demonstrating concurrent processing benefits

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/workflow.rs Core concurrent workflow implementation with strategy patterns and builder API
src/scheduler.rs Extended scheduler with concurrent workflow support and enhanced status reporting
src/lib.rs Updated exports to include new concurrent workflow types
examples/workflow_concurrent_book_prepositions.rs Real-world concurrent book analysis example demonstrating parallel text processing
examples/concurrent_workflow_example.rs Basic concurrent workflow usage examples with different strategies
benches/workflow_performance.rs Comprehensive concurrent vs sequential performance benchmarks
README.md Updated documentation with concurrent workflow examples and usage patterns
Cargo.toml Added futures dependency and new example configurations
Comments suppressed due to low confidence (1)

benches/workflow_performance.rs:122

  • [nitpick] The benchmark includes a special case for node_count == 0, but this edge case might not provide meaningful performance insights. Consider whether benchmarking empty workflows adds value or if it should be excluded from performance tests.
    if node_count == 0 {

Comment thread src/workflow.rs
Comment thread src/workflow.rs

let handle = tokio::spawn(async move {
let task_start = std::time::Instant::now();
let result = instance.workflow.orchestrate(&store_clone).await;
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow orchestrate method takes a reference to the store, but each concurrent instance gets a clone of the store. This means concurrent workflows don't share state between instances. Consider documenting this behavior clearly or providing an option for shared state if that's a desired use case.

Copilot uses AI. Check for mistakes.
Comment thread src/workflow.rs
results
}
ConcurrentStrategy::WaitForQuota(quota) => {
let quota = quota.min(handles.len()).max(1);
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quota clamping logic (.min(handles.len()).max(1)) is duplicated in the WaitQuotaOrDuration branch. Consider extracting this into a helper function to avoid code duplication.

Suggested change
let quota = quota.min(handles.len()).max(1);
let quota = Self::clamp_quota(quota, handles.len());

Copilot uses AI. Check for mistakes.
Comment thread src/scheduler.rs
Comment on lines +582 to +591
// Since ConcurrentWorkflow.orchestrate consumes self, we need to extract it from Arc
match Arc::try_unwrap(concurrent_workflow) {
Ok(workflow) => {
let cloned_store = store.clone();
workflow.orchestrate(cloned_store).await.map(Some)
}
Err(_) => Err(CanoError::workflow(
"Failed to execute concurrent workflow: still shared",
)),
}
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Arc::try_unwrap in the scheduler context is problematic because the Arc is likely to be shared. This will consistently fail and prevent concurrent workflows from executing in the scheduler. Consider redesigning the API to avoid consuming the ConcurrentWorkflow or find an alternative approach.

Suggested change
// Since ConcurrentWorkflow.orchestrate consumes self, we need to extract it from Arc
match Arc::try_unwrap(concurrent_workflow) {
Ok(workflow) => {
let cloned_store = store.clone();
workflow.orchestrate(cloned_store).await.map(Some)
}
Err(_) => Err(CanoError::workflow(
"Failed to execute concurrent workflow: still shared",
)),
}
// Clone the ConcurrentWorkflow to avoid consuming the Arc
let workflow = concurrent_workflow.clone();
let cloned_store = store.clone();
workflow.orchestrate(cloned_store).await.map(Some)

Copilot uses AI. Check for mistakes.
Comment thread src/scheduler.rs
/// Type alias for the complex workflow data stored in the scheduler
type FlowData<TState, TStore, TParams> = (
Arc<Workflow<TState, TStore, TParams>>,
WorkflowType<TState, TStore, TParams>,
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The FlowData type alias becomes less readable with the WorkflowType enum. Consider using a struct instead of a tuple to improve code clarity and make the fields self-documenting.

Copilot uses AI. Check for mistakes.
@nassor nassor closed this Jul 26, 2025
@nassor nassor deleted the feature/concurrent-workflow branch July 26, 2025 20:02
Repository owner locked and limited conversation to collaborators Jul 26, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants