Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add proper role to human feedback messages for LiteLLM #2113

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ crew_tasks_output.json
.mypy_cache
.ruff_cache
.venv
agentops.log
agentops.log
tests/cassettes/
177 changes: 140 additions & 37 deletions src/crewai/agents/crew_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.exceptions.feedback_processing_exception import (
FeedbackProcessingError,
)
from crewai.utilities.logger import Logger
from crewai.utilities.training_handler import CrewTrainingHandler

Expand Down Expand Up @@ -487,17 +490,40 @@ def _format_answer(self, answer: str) -> Union[AgentAction, AgentFinish]:
return CrewAgentParser(agent=self.agent).parse(answer)

def _format_msg(self, prompt: str, role: str = "user") -> Dict[str, str]:
"""Format a message with role and content.

Args:
prompt (str): The message content
role (str): The message role (default: "user")

Returns:
Dict[str, str]: Formatted message with role and content

Raises:
FeedbackProcessingError: If prompt is empty or exceeds max length
"""
if not prompt or not prompt.strip():
raise FeedbackProcessingError("Feedback message cannot be empty")
if len(prompt) > 8192: # Standard context window size
raise FeedbackProcessingError("Feedback message exceeds maximum length")

prompt = prompt.rstrip()
return {"role": role, "content": prompt}

def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Handle human feedback with different flows for training vs regular use.

This method processes human feedback by either handling it as training data
or as regular feedback requiring potential multiple iterations.

Args:
formatted_answer: The initial AgentFinish result to get feedback on
formatted_answer (AgentFinish): The initial AgentFinish result to get feedback on

Returns:
AgentFinish: The final answer after processing feedback

Raises:
FeedbackProcessingError: If feedback processing fails
"""
human_feedback = self._ask_human_input(formatted_answer.output)

Expand All @@ -511,44 +537,97 @@ def _is_training_mode(self) -> bool:
return bool(self.crew and self.crew._train)

def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
self,
initial_answer: AgentFinish,
feedback: str,
) -> AgentFinish:
"""Process feedback for training scenarios with single iteration."""
self._printer.print(
content="\nProcessing training feedback.\n",
color="yellow",
)
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(
self._format_msg(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
"""Process feedback for training scenarios with single iteration.

Args:
initial_answer (AgentFinish): The initial answer to improve
feedback (str): The feedback to process

Returns:
AgentFinish: The improved answer after processing feedback

Raises:
FeedbackProcessingError: If feedback processing fails
"""
try:
self._printer.print(
content="\nProcessing training feedback.\n",
color="yellow",
)
)
improved_answer = self._invoke_loop()
self._handle_crew_training_output(improved_answer)
self.ask_for_human_input = False
return improved_answer
self._handle_crew_training_output(initial_answer, feedback)
improved_answer = self._process_feedback_iteration(feedback)
self._handle_crew_training_output(improved_answer)
self.ask_for_human_input = False
return improved_answer
except Exception as e:
error_msg = f"Failed to process training feedback: {str(e)}"
self._printer.print(
content=error_msg,
color="red"
)
raise FeedbackProcessingError(error_msg, original_error=e)

def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
self,
current_answer: AgentFinish,
initial_feedback: str,
) -> AgentFinish:
"""Process feedback for regular use with potential multiple iterations."""
feedback = initial_feedback
answer = current_answer
"""Process feedback for regular use with potential multiple iterations.

while self.ask_for_human_input:
response = self._get_llm_feedback_response(feedback)
This method handles the iterative feedback process where the agent continues
to improve its answer based on user feedback until no more changes are needed.

if not self._feedback_requires_changes(response):
self.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output)
Args:
current_answer (AgentFinish): The current answer from the agent
initial_feedback (str): The initial feedback from the user

return answer
Returns:
AgentFinish: The final answer after processing all feedback iterations

Raises:
FeedbackProcessingError: If feedback processing or validation fails
"""
try:
feedback = initial_feedback
answer = current_answer

while self.ask_for_human_input:
# Add feedback message with user role using standard formatter
feedback_msg = self._i18n.slice("feedback_message").format(feedback=feedback)
self.messages.append(self._format_msg(feedback_msg, role="user"))

response = self._get_llm_feedback_response(feedback)
if not self._feedback_requires_changes(response):
self.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output)

return answer
except Exception as e:
error_msg = f"Failed to process feedback: {str(e)}"
self._printer.print(
content=error_msg,
color="red"
)
raise FeedbackProcessingError(error_msg, original_error=e)

def _get_llm_feedback_response(self, feedback: str) -> Optional[str]:
"""Get LLM classification of whether feedback requires changes."""
"""Get LLM classification of whether feedback requires changes.

Args:
feedback (str): The feedback to classify

Returns:
Optional[str]: The LLM's response indicating if changes are needed

Raises:
FeedbackProcessingError: If LLM call fails after max retries
"""
prompt = self._i18n.slice("human_feedback_classification").format(
feedback=feedback
)
Expand All @@ -561,21 +640,45 @@ def _get_llm_feedback_response(self, feedback: str) -> Optional[str]:
except Exception as error:
self._log_feedback_error(retry, error)

error_msg = f"Failed to get LLM feedback response after {MAX_LLM_RETRY} retries"
self._log_max_retries_exceeded()
return None
raise FeedbackProcessingError(error_msg)

def _feedback_requires_changes(self, response: Optional[str]) -> bool:
"""Determine if feedback response indicates need for changes."""
"""Determine if feedback response indicates need for changes.

Args:
response (Optional[str]): The LLM's response to feedback classification

Returns:
bool: True if feedback requires changes, False otherwise
"""
return response == "true" if response else False

def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
self.messages.append(
self._format_msg(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
"""Process a single feedback iteration.

Args:
feedback (str): The feedback to process from the user

Returns:
AgentFinish: The processed agent response after incorporating feedback
"""
try:
# Add feedback instructions with user role
self.messages.append(
self._format_msg(
self._i18n.slice("feedback_instructions").format(feedback=feedback),
role="user"
)
)
)
return self._invoke_loop()
return self._invoke_loop()
except Exception as e:
self._printer.print(
content=f"Error processing feedback iteration: {str(e)}",
color="red"
)
raise

def _log_feedback_error(self, retry_count: int, error: Exception) -> None:
"""Log feedback processing errors."""
Expand Down
5 changes: 5 additions & 0 deletions src/crewai/utilities/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Exceptions module for CrewAI."""

from .feedback_processing_exception import FeedbackProcessingError

__all__ = ["FeedbackProcessingError"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Optional


class FeedbackProcessingError(Exception):
"""Exception raised when feedback processing fails."""
def __init__(self, message: str, original_error: Optional[Exception] = None):
self.original_error = original_error
super().__init__(message)
52 changes: 51 additions & 1 deletion tests/agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
from unittest import mock
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

Expand All @@ -19,6 +19,9 @@
from crewai.tools.tool_usage_events import ToolUsageFinished
from crewai.utilities import RPMController
from crewai.utilities.events import Emitter
from crewai.utilities.exceptions.feedback_processing_exception import (
FeedbackProcessingError,
)


def test_agent_llm_creation_with_env_vars():
Expand Down Expand Up @@ -1001,6 +1004,53 @@ def ask_human_input_side_effect(*args, **kwargs):
assert mock_human_input.call_count == 2 # Should have asked for feedback twice
assert output.strip().lower() == "hello" # Final output should be 'Hello'

# Verify message format for human feedback
messages = agent.agent_executor.messages
feedback_messages = [m for m in messages if "Feedback:" in m.get("content", "")]
assert len(feedback_messages) == 2 # Two feedback messages
for msg in feedback_messages:
assert msg["role"] == "user" # All feedback messages should have user role


@pytest.fixture
def mock_executor():
"""Create a mock executor for testing."""
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory"
)
task = Task(
description="Test task",
expected_output="Test output",
human_input=True,
agent=agent
)
executor = CrewAgentExecutor(
agent=agent,
task=task,
llm=agent.llm,
crew=None,
prompt="",
max_iter=1,
tools=[],
tools_names=[],
stop_words=[],
tools_description="",
tools_handler=None
)
return executor

def test_empty_feedback_handling(mock_executor):
"""Test that empty feedback is properly handled."""
with pytest.raises(FeedbackProcessingError):
mock_executor._format_msg("")

def test_long_feedback_handling(mock_executor):
"""Test that very long feedback is properly handled."""
very_long_feedback = "x" * 10000
with pytest.raises(FeedbackProcessingError):
mock_executor._format_msg(very_long_feedback)

def test_interpolate_inputs():
agent = Agent(
Expand Down
Loading
Loading