Skip to content

fix: Handle thread locks in Flow state serialization #2121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

devin-ai-integration[bot]
Copy link
Contributor

Fixes #2120

The issue was caused by trying to pickle thread locks during Flow state copying in method execution events. This PR:

  1. Adds proper state serialization in Flow events
  2. Updates event emission to use serialized state
  3. Adds test coverage for Flow with thread locks

Link to Devin run: https://app.devin.ai/sessions/3ae1b79464d6412cb42e8b2f69520aab
Requested by: Joe

- Add state serialization in Flow events to avoid pickling RLock objects
- Update event emission to use serialized state
- Add test case for Flow with thread locks

Fixes #2120

Co-Authored-By: Joe Moura <[email protected]>
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add "(aside)" to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@joaomdmoura
Copy link
Collaborator

Disclaimer: This review was made by a crew of AI Agents.

Code Review Comment: Handling Thread Locks in Flow State Serialization

Overview

The changes in this pull request effectively address the thread lock serialization issues in the Flow state management system, focusing on the handling of RLock objects during event emission. The modifications span three key files, resulting in significant improvements in state handling.

File Specific Insights

1. src/crewai/flow/flow.py

  • Positive Improvements:

    • Introduces a comprehensive state serialization mechanism to ensure proper event emission handling.
    • Supports serialization for both BaseModel and dictionary types.
    • Maintains a consistent method for state copying.
  • Issues and Recommendations:

    • Redundancy: There is duplicated state serialization logic. It's advisable to extract this into a helper method for better maintainability:

      def _serialize_state(self):
          return (
              type(self._state)(**self._state.model_dump())
              if isinstance(self._state, BaseModel)
              else dict(self._state)
          )
    • Error Handling: The lack of error handling for serialization failures poses a risk. Implement a try-catch block to manage potential issues:

      def _serialize_state(self):
          try:
              return (
                  type(self._state)(**self._state.model_dump())
                  if isinstance(self._state, BaseModel)
                  else dict(self._state)
              )
          except Exception as e:
              logger.error(f"State serialization failed: {str(e)}")
              return {}

2. src/crewai/flow/flow_events.py

  • Positive Improvements:

    • Enhances post-initialization handling for BaseModel states ensuring proper serialization during event handling.
    • Provides a consistent approach across event types, reinforcing the integrity of event data.
  • Issues and Recommendations:

    • Type Validation: Implement type validation to ensure only appropriate types are assigned to state properties:

      def __post_init__(self):
          super().__post_init__()
          if not isinstance(self.state, (dict, BaseModel)):
              raise ValueError(f"Invalid state type: {type(self.state)}")
    • Duplication in Event Classes: To reduce redundancy, abstract common state processing logic into a base class:

      class BaseStateEvent(Event):
          def _process_state(self):
              if isinstance(self.state, BaseModel):
                  self.state = type(self.state)(**self.state.model_dump())

3. tests/flow_test.py

  • Positive Improvements:

    • The added test cases explicitly validate correct handling of thread locks, reinforcing the robustness of the new logic.
    • The tests confirm that state transitions are accurately tracked and correctly implemented.
  • Issues and Recommendations:

    • Test Coverage: To enhance test robustness, consider adding edge-case tests for scenarios involving nested locks:

      def test_flow_with_nested_locks():
          # Implementation of the test...
    • Async Context Testing: There's a need for tests examining async context, ensuring that async flows are appropriately handled:

      @pytest.mark.asyncio
      async def test_flow_with_async_locks():
          # Implementation of the test...

General Recommendations

  1. Documentation: Enrich the codebase with docstrings explaining serialization strategies and considerations for thread safety.
  2. Performance Monitoring: It would be beneficial to implement caching for serialized states when no changes occur, alongside monitoring serialization overhead in scenarios with complex state objects.
  3. Improved Logging: Enhance logging mechanisms for steps in serialization processes, including state changes and any encountered errors.
  4. Robust Error Handling: Introduce graceful fallbacks for unserializable states and validate states before serialization to improve fault tolerance.

Security Considerations

  • Take precautions to safeguard sensitive state data during serialization.
  • Ensure that state data is validated prior to serialization to prevent potential injection attacks.

In conclusion, the proposed changes significantly enhance the handling of thread lock serialization issues while simultaneously improving overall code quality. Addressing the recommendations outlined above will foster greater maintainability, reliability, and security within the codebase.

devin-ai-integration bot and others added 27 commits February 13, 2025 12:14
- Add BaseStateEvent class for common state processing
- Add state serialization caching for performance
- Add tests for nested locks and async context
- Improve error handling and validation
- Enhance documentation

Co-Authored-By: Joe Moura <[email protected]>
- Add test for various thread-safe primitives
- Test nested dataclasses with complex state
- Verify serialization of async primitives

Co-Authored-By: Joe Moura <[email protected]>
This commit implements a method for exporting the state of a flow into a
JSON-serializable dictionary.

The idea is producing a human-readable version of state that can be
inspected or consumed by other systems, hence JSON and not pickling or
marshalling.

I consider it an export because it's a one-way process, meaning it
cannot be loaded back into Python because of complex types.
* Added functionality to have any llm run test functionality

* Fixed lint issues

* Fixed Linting issues

* Fixed unit test case

* Fixed unit test

* Fixed test case

* Fixed unit test case

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <[email protected]>
Co-authored-by: Brandon Hancock (bhancock_ai) <[email protected]>
* updating prompts

* fix issue

* clean up thoughts as well

* drop trailing set
* feat: add prompt observability code

* feat: improve logic for llm call

* feat: add tests for traces

* feat: remove unused improt

* feat: add function to clear and add task traces

* feat: fix import

* feat:  chagne time

* feat: fix type checking issues

* feat: add fixed time to fix test

* feat: fix datetime test issue

* feat: add add task traces function

* feat: add same logic as entp

* feat: add start_time as reference for duplication of tool call

* feat: add max_depth

* feat: add protocols file to properly import on LLM

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <[email protected]>
Co-authored-by: Brandon Hancock (bhancock_ai) <[email protected]>
* WIP crew events emitter

* Refactor event handling and introduce new event types

- Migrate from global `emit` function to `event_bus.emit`
- Add new event types for task failures, tool usage, and agent execution
- Update event listeners and event bus to support more granular event tracking
- Remove deprecated event emission methods
- Improve event type consistency and add more detailed event information

* Add event emission for agent execution lifecycle

- Emit AgentExecutionStarted and AgentExecutionError events
- Update CrewAgentExecutor to use event_bus for tracking agent execution
- Refactor error handling to include event emission
- Minor code formatting improvements in task.py and crew_agent_executor.py
- Fix a typo in test file

* Refactor event system and add third-party event listeners

- Move event_bus import to correct module paths
- Introduce BaseEventListener abstract base class
- Add AgentOpsListener for third-party event tracking
- Update event listener initialization and setup
- Clean up event-related imports and exports

* Enhance event system type safety and error handling

- Improve type annotations for event bus and event types
- Add null checks for agent and task in event emissions
- Update import paths for base tool and base agent
- Refactor event listener type hints
- Remove unnecessary print statements
- Update test configurations to match new event handling

* Refactor event classes to improve type safety and naming consistency

- Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent)
- Update import statements and references across multiple files
- Remove deprecated events.py module
- Enhance event type hints and configurations
- Clean up unnecessary event-related code

* Add default model for CrewEvaluator and fix event import order

- Set default model to "gpt-4o-mini" in CrewEvaluator when no model is specified
- Reorder event-related imports in task.py to follow standard import conventions
- Update event bus initialization method return type hint
- Export event_bus in events/__init__.py

* Fix tool usage and event import handling

- Update tool usage to use `.get()` method when checking tool name
- Remove unnecessary `__all__` export list in events/__init__.py

* Refactor Flow and Agent event handling to use event_bus

- Remove `event_emitter` from Flow class and replace with `event_bus.emit()`
- Update Flow and Agent tests to use event_bus event listeners
- Remove redundant event emissions in Flow methods
- Add debug print statements in Flow execution
- Simplify event tracking in test cases

* Enhance event handling for Crew, Task, and Event classes

- Add crew name to failed event types (CrewKickoffFailedEvent, CrewTrainFailedEvent, CrewTestFailedEvent)
- Update Task events to remove redundant task and context attributes
- Refactor EventListener to use Logger for consistent event logging
- Add new event types for Crew train and test events
- Improve event bus event tracking in test cases

* Remove telemetry and tracing dependencies from Task and Flow classes

- Remove telemetry-related imports and private attributes from Task class
- Remove `_telemetry` attribute from Flow class
- Update event handling to emit events without direct telemetry tracking
- Simplify task and flow execution by removing explicit telemetry spans
- Move telemetry-related event handling to EventListener

* Clean up unused imports and event-related code

- Remove unused imports from various event and flow-related files
- Reorder event imports to follow standard conventions
- Remove unnecessary event type references
- Simplify import statements in event and flow modules

* Update crew test to validate verbose output and kickoff_for_each method

- Enhance test_crew_verbose_output to check specific listener log messages
- Modify test_kickoff_for_each_invalid_input to use Pydantic validation error
- Improve test coverage for crew logging and input validation

* Update crew test verbose output with improved emoji icons

- Replace task and agent completion icons from 👍 to ✅
- Enhance readability of test output logging
- Maintain consistent test coverage for crew verbose output

* Add MethodExecutionFailedEvent to handle flow method execution failures

- Introduce new MethodExecutionFailedEvent in flow_events module
- Update Flow class to catch and emit method execution failures
- Add event listener for method execution failure events
- Update event-related imports to include new event type
- Enhance test coverage for method execution failure handling

* Propagate method execution failures in Flow class

- Modify Flow class to re-raise exceptions after emitting MethodExecutionFailedEvent
- Reorder MethodExecutionFailedEvent import to maintain consistent import style

* Enable test coverage for Flow method execution failure event

- Uncomment pytest.raises() in test_events to verify exception handling
- Ensure test validates MethodExecutionFailedEvent emission during flow kickoff

* Add event handling for tool usage events

- Introduce event listeners for ToolUsageFinishedEvent and ToolUsageErrorEvent
- Log tool usage events with descriptive emoji icons (✅ and ❌)
- Update event_listener to track and log tool usage lifecycle

* Reorder and clean up event imports in event_listener

- Reorganize imports for tool usage events and other event types
- Maintain consistent import ordering and remove unused imports
- Ensure clean and organized import structure in event_listener module

* moving to dedicated eventlistener

* dont forget crew level

* Refactor AgentOps event listener for crew-level tracking

- Modify AgentOpsListener to handle crew-level events
- Initialize and end AgentOps session at crew kickoff and completion
- Create agents for each crew member during session initialization
- Improve session management and event recording
- Clean up and simplify event handling logic

* Update test_events to validate tool usage error event handling

- Modify test to assert single error event with correct attributes
- Use pytest.raises() to verify error event generation
- Simplify error event validation in test case

* Improve AgentOps listener type hints and formatting

- Add string type hints for AgentOps classes to resolve potential import issues
- Clean up unnecessary whitespace and improve code indentation
- Simplify initialization and event handling logic

* Update test_events to validate multiple tool usage events

- Modify test to assert 75 events instead of a single error event
- Remove pytest.raises() check, allowing crew kickoff to complete
- Adjust event validation to support broader event tracking

* Rename event_bus to crewai_event_bus for improved clarity and specificity

- Replace all references to `event_bus` with `crewai_event_bus`
- Update import statements across multiple files
- Remove the old `event_bus.py` file
- Maintain existing event handling functionality

* Enhance EventListener with singleton pattern and color configuration

- Implement singleton pattern for EventListener to ensure single instance
- Add default color configuration using EMITTER_COLOR from constants
- Modify log method calls to use default color and remove redundant color parameters
- Improve initialization logic to prevent multiple initializations

* Add FlowPlotEvent and update event bus to support flow plotting

- Introduce FlowPlotEvent to track flow plotting events
- Replace Telemetry method with event bus emission in Flow.plot()
- Update event bus to support new FlowPlotEvent type
- Add test case to validate flow plotting event emission

* Remove RunType enum and clean up crew events module

- Delete unused RunType enum from crew_events.py
- Simplify crew_events.py by removing unnecessary enum definition
- Improve code clarity by removing unneeded imports

* Enhance event handling for tool usage and agent execution

- Add new events for tool usage: ToolSelectionErrorEvent, ToolValidateInputErrorEvent
- Improve error tracking and event emission in ToolUsage and LLM classes
- Update AgentExecutionStartedEvent to use task_prompt instead of inputs
- Add comprehensive test coverage for new event types and error scenarios

* Refactor event system and improve crew testing

- Extract base CrewEvent class to a new base_events.py module
- Update event imports across multiple event-related files
- Modify CrewTestStartedEvent to use eval_llm instead of openai_model_name
- Add LLM creation validation in crew testing method
- Improve type handling and event consistency

* Refactor task events to use base CrewEvent

- Move CrewEvent import from crew_events to base_events
- Remove unnecessary blank lines in task_events.py
- Simplify event class structure for task-related events

* Update AgentExecutionStartedEvent to use task_prompt

- Modify test_events.py to use task_prompt instead of inputs
- Simplify event input validation in test case
- Align with recent event system refactoring

* Improve type hinting for TaskCompletedEvent handler

- Add explicit type annotation for TaskCompletedEvent in event_listener.py
- Enhance type safety for event handling in EventListener

* Improve test_validate_tool_input_invalid_input with mock objects

- Add explicit mock objects for agent and action in test case
- Ensure proper string values for mock agent and action attributes
- Simplify test setup for ToolUsage validation method

* Remove ToolUsageStartedEvent emission in tool usage process

- Remove unnecessary event emission for tool usage start
- Simplify tool usage event handling
- Eliminate redundant event data preparation step

* refactor: clean up and organize imports in llm and flow modules

* test: Improve flow persistence test cases and logging
* imporve HITL

* fix failing test

* fix failing test part 2

* Drop extra logs that were causing confusion

---------

Co-authored-by: Lorenze Jay <[email protected]>
* Check the right property

* Fix failing tests

* Update cassettes

* Update cassettes again

* Update cassettes again 2

* Update cassettes again 3

* fix other test that fails in ci/cd

* Fix issues pointed out by lorenze
- Add state serialization in Flow events to avoid pickling RLock objects
- Update event emission to use serialized state
- Add test case for Flow with thread locks

Fixes #2120

Co-Authored-By: Joe Moura <[email protected]>
- Add BaseStateEvent class for common state processing
- Add state serialization caching for performance
- Add tests for nested locks and async context
- Improve error handling and validation
- Enhance documentation

Co-Authored-By: Joe Moura <[email protected]>
- Add test for various thread-safe primitives
- Test nested dataclasses with complex state
- Verify serialization of async primitives

Co-Authored-By: Joe Moura <[email protected]>
Copy link
Contributor Author

Closing due to inactivity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Flow using v0.102.0 throws "cannot pickle '_thread.RLock' object"
8 participants