Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP crew events emitter #2048

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open

WIP crew events emitter #2048

wants to merge 29 commits into from

Conversation

lorenzejay
Copy link
Collaborator

No description provided.

@joaomdmoura
Copy link
Collaborator

Disclaimer: This review was made by a crew of AI Agents.

Code Review Comment for crewAI Event Emitter Implementation

Summary

This PR introduces a robust event emitter system for crewAI, enhancing observability and monitoring of Crew, Agent, Task, and Flow execution with a structured event bus and various event types. The implementation adheres to modular design principles, ensuring a clean separation of concerns.

Key Code Improvements

  1. Event Handler Registration:

    • Current:
    @event_bus.on(CrewKickoffStarted)
    def handler(source, event):
        print(f"Crew '{event.crew_name}' started")
    • Suggested Improvement:
    @event_bus.on(CrewKickoffStarted)
    def handler(source: Any, event: CrewKickoffStarted) -> None:
        try:
            print(f"Crew '{event.crew_name}' started", event.timestamp)
        except Exception as e:
            logger.error(f"Error handling CrewKickoffStarted: {e}")
    • Rationale: Adding type hints and proper error handling increases code clarity and robustness.
  2. Context Manager for Agent Execution:

    • Current:
    emit(self, event=AgentExecutionStarted(agent=self, task=task))
    • Suggested Improvement:
    from contextlib import contextmanager
    
    @contextmanager
    def agent_execution_context(self, task):
        try:
            emit(self, AgentExecutionStarted(agent=self, task=task))
            yield
            emit(self, AgentExecutionCompleted(agent=self, task=task))
        except Exception as e:
            emit(self, AgentExecutionError(agent=self, task=task, error=str(e)))
            raise
    • Rationale: This pattern enhances error management, ensuring that all outcomes of the execution process are captured.
  3. Global Instance Configuration:

    • Current:
    event_bus = EventBus()
    • Suggested Improvement:
    from typing import Final
    from pydantic import BaseSettings
    
    class EventBusConfig(BaseSettings):
        max_listeners: int = 100
        async_dispatch: bool = False
    
    EVENT_BUS: Final[EventBus] = EventBus(EventBusConfig())
    • Rationale: Configuring the event bus with settings improves flexibility and maintainability.
  4. Event Data Validation:

    • Suggested Improvement:
    class BaseEvent(BaseModel):
        timestamp: datetime = Field(default_factory=datetime.now)
        type: str
    
        class Config:
            frozen = True
            arbitrary_types_allowed = True
    • Rationale: Implementing Pydantic for event validation adds a layer of reliability to event handling.
  5. Testing Improvements:

    • Suggested Improvement:
    def test_concurrent_event_emission():
        events = []
        @event_bus.on(CrewKickoffStarted)
        def handler(source, event): events.append(event)
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(emit_test_event) for _ in range(100)]
            concurrent.futures.wait(futures)
        
        assert len(events) == 100
    • Rationale: Expanding test scenarios validates the system's performance under load.

Historical Context

While this PR is robust in its current form, historical trends in related PRs suggest a consistent emphasis on error handling, event validation, and performance monitoring. The recent creation of files such as event_bus.py and structured event types (in agent_events.py, crew_events.py, and task_events.py) demonstrates an evolving architecture towards modular, accountable, and efficient event handling.

Implications for Related Files

  • Changes to src/crewai/utilities/events/__init__.py and other event files tie into the new event architecture, necessitating careful review to ensure event emissions are correctly harnessed across the codebase.
  • Maintain an eye towards future modifications that might enhance or impact this architectural change, keeping in mind the system's scalability.

Additional Recommendations

  • Explore adding performance monitoring tools for event dispatching.
  • Implement security measures like event sanitization and audit logging.
  • Enhance documentation to assist developers in understanding the event system's components and usage.

Overall, this PR lays a strong foundation for a more observant and responsive crewAI system, with critical enhancements recommended to bolster its robustness and future scalability.

- Migrate from global `emit` function to `event_bus.emit`
- Add new event types for task failures, tool usage, and agent execution
- Update event listeners and event bus to support more granular event tracking
- Remove deprecated event emission methods
- Improve event type consistency and add more detailed event information
@lorenzejay lorenzejay marked this pull request as ready for review February 11, 2025 22:32
- Emit AgentExecutionStarted and AgentExecutionError events
- Update CrewAgentExecutor to use event_bus for tracking agent execution
- Refactor error handling to include event emission
- Minor code formatting improvements in task.py and crew_agent_executor.py
- Fix a typo in test file
- Move event_bus import to correct module paths
- Introduce BaseEventListener abstract base class
- Add AgentOpsListener for third-party event tracking
- Update event listener initialization and setup
- Clean up event-related imports and exports
- Improve type annotations for event bus and event types
- Add null checks for agent and task in event emissions
- Update import paths for base tool and base agent
- Refactor event listener type hints
- Remove unnecessary print statements
- Update test configurations to match new event handling
- Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent)
- Update import statements and references across multiple files
- Remove deprecated events.py module
- Enhance event type hints and configurations
- Clean up unnecessary event-related code
- Set default model to "gpt-4o-mini" in CrewEvaluator when no model is specified
- Reorder event-related imports in task.py to follow standard import conventions
- Update event bus initialization method return type hint
- Export event_bus in events/__init__.py
- Update tool usage to use `.get()` method when checking tool name
- Remove unnecessary `__all__` export list in events/__init__.py
- Remove `event_emitter` from Flow class and replace with `event_bus.emit()`
- Update Flow and Agent tests to use event_bus event listeners
- Remove redundant event emissions in Flow methods
- Add debug print statements in Flow execution
- Simplify event tracking in test cases
- Add crew name to failed event types (CrewKickoffFailedEvent, CrewTrainFailedEvent, CrewTestFailedEvent)
- Update Task events to remove redundant task and context attributes
- Refactor EventListener to use Logger for consistent event logging
- Add new event types for Crew train and test events
- Improve event bus event tracking in test cases
- Remove telemetry-related imports and private attributes from Task class
- Remove `_telemetry` attribute from Flow class
- Update event handling to emit events without direct telemetry tracking
- Simplify task and flow execution by removing explicit telemetry spans
- Move telemetry-related event handling to EventListener
- Remove unused imports from various event and flow-related files
- Reorder event imports to follow standard conventions
- Remove unnecessary event type references
- Simplify import statements in event and flow modules
- Enhance test_crew_verbose_output to check specific listener log messages
- Modify test_kickoff_for_each_invalid_input to use Pydantic validation error
- Improve test coverage for crew logging and input validation
- Replace task and agent completion icons from 👍 to ✅
- Enhance readability of test output logging
- Maintain consistent test coverage for crew verbose output
- Introduce new MethodExecutionFailedEvent in flow_events module
- Update Flow class to catch and emit method execution failures
- Add event listener for method execution failure events
- Update event-related imports to include new event type
- Enhance test coverage for method execution failure handling
- Modify Flow class to re-raise exceptions after emitting MethodExecutionFailedEvent
- Reorder MethodExecutionFailedEvent import to maintain consistent import style
- Uncomment pytest.raises() in test_events to verify exception handling
- Ensure test validates MethodExecutionFailedEvent emission during flow kickoff
- Introduce event listeners for ToolUsageFinishedEvent and ToolUsageErrorEvent
- Log tool usage events with descriptive emoji icons (✅ and ❌)
- Update event_listener to track and log tool usage lifecycle
- Reorganize imports for tool usage events and other event types
- Maintain consistent import ordering and remove unused imports
- Ensure clean and organized import structure in event_listener module
- Modify AgentOpsListener to handle crew-level events
- Initialize and end AgentOps session at crew kickoff and completion
- Create agents for each crew member during session initialization
- Improve session management and event recording
- Clean up and simplify event handling logic
- Modify test to assert single error event with correct attributes
- Use pytest.raises() to verify error event generation
- Simplify error event validation in test case
- Add string type hints for AgentOps classes to resolve potential import issues
- Clean up unnecessary whitespace and improve code indentation
- Simplify initialization and event handling logic
- Modify test to assert 75 events instead of a single error event
- Remove pytest.raises() check, allowing crew kickoff to complete
- Adjust event validation to support broader event tracking
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants