Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Speed up function calculate_node_levels by 48x #2136

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

misrasaurabh1
Copy link

📄 4,848% (48.48x) speedup for calculate_node_levels in src/crewai/flow/utils.py

⏱️ Runtime : 58.0 milliseconds 1.17 millisecond (best of 318 runs)

📝 Explanation and details

To optimize the given calculate_node_levels function, we can make several changes focusing on reducing the number of nested loops and leveraging data structures more efficiently. Here is the revised function.

  1. Replace list queue (with pop(0)) with deque which provides O(1) time complexity for append and pop operations.
  2. Precompute method dependencies instead of repeatedly checking conditions inside loops.
  3. Organize the steps for better readability and separate route processing into a helper function.

Key optimizations.

  1. Using deque instead of list queue to optimize appending and popping elements.
  2. Precomputing listener dependencies reduces the number of checks and allows faster access.
  3. Factoring out the router processing logic into the process_router_paths function improves readability and maintainability.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 31 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests Details
from typing import Any, Dict, List, Set

# imports
import pytest  # used for our unit tests
from crewai.flow.utils import calculate_node_levels


# Mock flow class for testing
class MockFlow:
    def __init__(self, methods, listeners, routers, router_paths):
        self._methods = methods
        self._listeners = listeners
        self._routers = routers
        self._router_paths = router_paths

# unit tests
def test_single_start_method_no_listeners_or_routers():
    flow = MockFlow(
        methods={"start": MockMethod(True)},
        listeners={},
        routers=set(),
        router_paths={}
    )
    expected = {"start": 0}
    codeflash_output = calculate_node_levels(flow)

def test_multiple_start_methods_no_listeners_or_routers():
    flow = MockFlow(
        methods={"start1": MockMethod(True), "start2": MockMethod(True)},
        listeners={},
        routers=set(),
        router_paths={}
    )
    expected = {"start1": 0, "start2": 0}
    codeflash_output = calculate_node_levels(flow)

def test_single_or_listener():
    flow = MockFlow(
        methods={"start": MockMethod(True)},
        listeners={"listener": ("OR", ["start"])},
        routers=set(),
        router_paths={}
    )
    expected = {"start": 0, "listener": 1}
    codeflash_output = calculate_node_levels(flow)

def test_multiple_or_listeners():
    flow = MockFlow(
        methods={"start1": MockMethod(True), "start2": MockMethod(True)},
        listeners={"listener1": ("OR", ["start1"]), "listener2": ("OR", ["start2"])},
        routers=set(),
        router_paths={}
    )
    expected = {"start1": 0, "start2": 0, "listener1": 1, "listener2": 1}
    codeflash_output = calculate_node_levels(flow)

def test_single_and_listener():
    flow = MockFlow(
        methods={"start1": MockMethod(True), "start2": MockMethod(True)},
        listeners={"listener": ("AND", ["start1", "start2"])},
        routers=set(),
        router_paths={}
    )
    expected = {"start1": 0, "start2": 0, "listener": 1}
    codeflash_output = calculate_node_levels(flow)

def test_multiple_and_listeners():
    flow = MockFlow(
        methods={"start1": MockMethod(True), "start2": MockMethod(True), "start3": MockMethod(True)},
        listeners={"listener1": ("AND", ["start1", "start2"]), "listener2": ("AND", ["start2", "start3"])},
        routers=set(),
        router_paths={}
    )
    expected = {"start1": 0, "start2": 0, "start3": 0, "listener1": 1, "listener2": 1}
    codeflash_output = calculate_node_levels(flow)

def test_mixed_conditions():
    flow = MockFlow(
        methods={"start1": MockMethod(True), "start2": MockMethod(True)},
        listeners={"listener1": ("OR", ["start1"]), "listener2": ("AND", ["start1", "start2"])},
        routers=set(),
        router_paths={}
    )
    expected = {"start1": 0, "start2": 0, "listener1": 1, "listener2": 1}
    codeflash_output = calculate_node_levels(flow)

def test_single_router():
    flow = MockFlow(
        methods={"start": MockMethod(True)},
        listeners={"listener": ("OR", ["path1"])},
        routers={"start"},
        router_paths={"start": ["path1"]}
    )
    expected = {"start": 0, "listener": 1}
    codeflash_output = calculate_node_levels(flow)

def test_multiple_routers():
    flow = MockFlow(
        methods={"start1": MockMethod(True), "start2": MockMethod(True)},
        listeners={"listener1": ("OR", ["path1"]), "listener2": ("OR", ["path2"])},
        routers={"start1", "start2"},
        router_paths={"start1": ["path1"], "start2": ["path2"]}
    )
    expected = {"start1": 0, "start2": 0, "listener1": 1, "listener2": 1}
    codeflash_output = calculate_node_levels(flow)

def test_no_start_methods():
    flow = MockFlow(
        methods={"method1": MockMethod(False)},
        listeners={},
        routers=set(),
        router_paths={}
    )
    expected = {}
    codeflash_output = calculate_node_levels(flow)

def test_listeners_with_no_trigger_methods():
    flow = MockFlow(
        methods={"start": MockMethod(True)},
        listeners={"listener": ("OR", [])},
        routers=set(),
        router_paths={}
    )
    expected = {"start": 0}
    codeflash_output = calculate_node_levels(flow)

def test_empty_flow():
    flow = MockFlow(
        methods={},
        listeners={},
        routers=set(),
        router_paths={}
    )
    expected = {}
    codeflash_output = calculate_node_levels(flow)

def test_large_number_of_methods_and_listeners():
    methods = {f"start{i}": MockMethod(True) for i in range(100)}
    listeners = {f"listener{i}": ("OR", [f"start{i}"]) for i in range(100)}
    flow = MockFlow(
        methods=methods,
        listeners=listeners,
        routers=set(),
        router_paths={}
    )
    expected = {f"start{i}": 0 for i in range(100)}
    expected.update({f"listener{i}": 1 for i in range(100)})
    codeflash_output = calculate_node_levels(flow)



def test_deterministic_behavior():
    flow = MockFlow(
        methods={"start": MockMethod(True)},
        listeners={"listener": ("OR", ["start"])},
        routers=set(),
        router_paths={}
    )
    expected = {"start": 0, "listener": 1}
    codeflash_output = calculate_node_levels(flow)
    codeflash_output = calculate_node_levels(flow)

# Mock method class for testing
class MockMethod:
    def __init__(self, is_start_method):
        if is_start_method:
            self.__is_start_method__ = True
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

from typing import Any, Dict, List, Set

# imports
import pytest  # used for our unit tests
from crewai.flow.utils import calculate_node_levels


# Helper classes to simulate flow structure
class Method:
    def __init__(self, is_start_method=False):
        self.__is_start_method__ = is_start_method

class Flow:
    def __init__(self):
        self._methods = {}
        self._listeners = {}
        self._routers = set()
        self._router_paths = {}

# unit tests
def test_single_start_method():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    codeflash_output = calculate_node_levels(flow)

def test_multiple_start_methods():
    flow = Flow()
    flow._methods['start_method_1'] = Method(is_start_method=True)
    flow._methods['start_method_2'] = Method(is_start_method=True)
    codeflash_output = calculate_node_levels(flow)

def test_single_or_listener():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._listeners['listener_method'] = ('OR', ['start_method'])
    codeflash_output = calculate_node_levels(flow)

def test_single_and_listener():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._listeners['listener_method'] = ('AND', ['start_method'])
    codeflash_output = calculate_node_levels(flow)

def test_multiple_or_listeners():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._listeners['listener_method_1'] = ('OR', ['start_method'])
    flow._listeners['listener_method_2'] = ('OR', ['start_method'])
    codeflash_output = calculate_node_levels(flow)

def test_multiple_and_listeners():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._listeners['listener_method_1'] = ('AND', ['start_method'])
    flow._listeners['listener_method_2'] = ('AND', ['start_method'])
    codeflash_output = calculate_node_levels(flow)

def test_mixed_or_and_listeners():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._listeners['or_listener'] = ('OR', ['start_method'])
    flow._listeners['and_listener'] = ('AND', ['start_method'])
    codeflash_output = calculate_node_levels(flow)

def test_single_router():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._routers.add('start_method')
    flow._router_paths['start_method'] = ['router_path']
    codeflash_output = calculate_node_levels(flow)

def test_router_with_listeners():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._routers.add('start_method')
    flow._router_paths['start_method'] = ['router_path']
    flow._listeners['listener_method'] = ('OR', ['router_path'])
    codeflash_output = calculate_node_levels(flow)

def test_multiple_routers():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._routers.add('start_method')
    flow._routers.add('router_method_2')
    flow._router_paths['start_method'] = ['router_path_1']
    flow._router_paths['router_method_2'] = ['router_path_2']
    codeflash_output = calculate_node_levels(flow)

def test_empty_flow():
    flow = Flow()
    codeflash_output = calculate_node_levels(flow)

def test_cycle_in_flow():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._listeners['cycle_method'] = ('OR', ['start_method'])
    flow._listeners['start_method'] = ('OR', ['cycle_method'])
    codeflash_output = calculate_node_levels(flow)

def test_disconnected_nodes():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._methods['disconnected_method'] = Method()
    codeflash_output = calculate_node_levels(flow)

def test_listeners_with_no_trigger_methods():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    flow._listeners['listener_method'] = ('OR', [])
    codeflash_output = calculate_node_levels(flow)

def test_large_number_of_methods():
    flow = Flow()
    for i in range(1000):
        flow._methods[f'method_{i}'] = Method(is_start_method=(i == 0))
        if i > 0:
            flow._listeners[f'method_{i}'] = ('OR', [f'method_{i-1}'])
    codeflash_output = calculate_node_levels(flow)

def test_deep_hierarchical_structure():
    flow = Flow()
    flow._methods['start_method'] = Method(is_start_method=True)
    for i in range(1, 1000):
        flow._listeners[f'method_{i}'] = ('OR', [f'method_{i-1}' if i > 1 else 'start_method'])
    codeflash_output = calculate_node_levels(flow)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

Codeflash

codeflash-ai bot and others added 5 commits January 18, 2025 08:17
To optimize the given `calculate_node_levels` function, we can make several changes focusing on reducing the number of nested loops and leveraging data structures more efficiently. Here is the revised function.

1. Replace list `queue` (with `pop(0)`) with `deque` which provides O(1) time complexity for append and pop operations.
2. Precompute method dependencies instead of repeatedly checking conditions inside loops.
3. Organize the steps for better readability and separate route processing into a helper function.

Here is the rewritten function.



Key optimizations.
1. Using `deque` instead of list `queue` to optimize appending and popping elements.
2. Precomputing listener dependencies reduces the number of checks and allows faster access.
3. Factoring out the router processing logic into the `process_router_paths` function improves readability and maintainability.
@joaomdmoura
Copy link
Collaborator

Disclaimer: This review was made by a crew of AI Agents.

Code Review Comment for PR #2136

Overview

This pull request implements significant optimizations to the calculate_node_levels function in flow/utils.py, achieving a remarkable performance improvement of 4,848%. The changes focus on reducing computational complexity and enhancing code organization, which are essential for maintaining efficient operations in our codebase.

Key Changes Analysis

1. Data Structure Optimization

  • Positive Change: The replacement of the list-based queue with collections.deque greatly improves pop operations from O(n) to O(1) complexity. This is an excellent optimization for queue-based operations and enhances overall performance.

2. Performance Optimizations

  • The use of defaultdict for precomputing listener dependencies, reducing nested loop complexity, and separating router path processing logic are all commendable improvements. These optimizations will likely yield faster execution times, especially in more extensive flow graphs.

3. Code Organization

  • The changes have resulted in better function documentation, improved readability with helper functions, and a clearer separation of concerns. This organizational clarity will aid future maintainability and development.

Detailed Review Findings

Documentation Improvements

The docstring for calculate_node_levels reflects significant improvements but can be refined further. Consider enhancing the clarity around parameters and complexity analysis:

def calculate_node_levels(flow: Any) -> Dict[str, int]:
    """
    Calculate the hierarchical level of each node in the flow.

    Performs a breadth-first traversal to assign levels starting from level 0.

    - Parameters:
      flow : Any
          Flow object with _listeners, _routers, and _router_paths.
    
    - Returns:
      Dictionary mapping method names to hierarchical levels.
    
    - Complexity:
      Time: O(V + E), Space: O(V)
    """

Type Hints Enhancement

The type hints can be improved for better type safety and clarity:

from typing import Any, Deque, Dict, List, Optional, Set, Union, TypeVar

Flow = TypeVar('Flow')

def calculate_node_levels(flow: Flow) -> Dict[str, int]:

Error Handling

Adding error handling in process_router_paths can enhance robustness:

def process_router_paths(flow: Any, current: str, current_level: int, levels: Dict[str, int], queue: Deque[str]) -> None:
    if not hasattr(flow, '_routers') or not hasattr(flow, '_router_paths'):
        raise AttributeError("Flow object missing required attributes")

Performance Optimization Suggestions

  1. Consider caching results for frequently accessed paths to enhance speed:
from functools import lru_cache

@lru_cache(maxsize=128)
def get_listener_dependencies(flow: Any) -> Tuple[Dict, Dict]:
  1. For large graphs, parallelize processing to enhance performance:
import concurrent.futures

def process_level_parallel(nodes: List[str], flow: Any) -> Dict[str, int]:

Potential Issues

  1. Memory Usage: Be aware that precomputing dependencies can increase memory usage. Consider lazy loading for extensive graphs.
  2. Type Safety: Utilize specific types instead of Any to avoid masking type issues.
  3. Error Propagation: Strengthen handling for edge cases such as circular dependencies.

Recommendations for Future Improvements

  1. Introduce comprehensive unit tests to cover edge cases.
  2. Implement logging for better debugging insights.
  3. Validate the input structure of flow objects.
  4. Incorporate performance benchmarks into the testing suite.

Conclusion

The improvements in this PR significantly enhance both performance and code organization. With the suggested enhancements—especially concerning type safety and error handling—the robustness and maintainability of the code will be notably strengthened.

I recommend approval of this PR, contingent upon addressing minor documentation and type hint suggestions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants