Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 127 additions & 68 deletions camel/societies/workforce/single_agent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from camel.agents import ChatAgent
from camel.agents.chat_agent import AsyncStreamingChatAgentResponse
from camel.logger import get_logger
from camel.messages.base import BaseMessage
from camel.societies.workforce.prompts import PROCESS_TASK_PROMPT
from camel.societies.workforce.structured_output_handler import (
StructuredOutputHandler,
Expand All @@ -34,6 +35,7 @@
WorkflowMemoryManager,
)
from camel.tasks.task import Task, TaskState, is_task_result_insufficient
from camel.types import OpenAIBackendRole
from camel.utils.context_utils import ContextUtility

logger = get_logger(__name__)
Expand Down Expand Up @@ -217,6 +219,11 @@ class SingleAgentWorker(Worker):
conversations from all task executions are accumulated for
potential workflow saving. Set to True if you plan to call
save_workflow_memories(). (default: :obj:`False`)
enable_breakpoint_resume (bool, optional): Whether to retain the agent
instance on task failure for reuse on retry. When enabled, the
worker keeps the failed agent (with its conversation history
intact) and reuses it directly on the next attempt instead of
creating a fresh agent. (default: :obj:`True`)
"""

def __init__(
Expand All @@ -230,6 +237,7 @@ def __init__(
use_structured_output_handler: bool = True,
context_utility: Optional[ContextUtility] = None,
enable_workflow_memory: bool = False,
enable_breakpoint_resume: bool = True,
) -> None:
node_id = worker.agent_id
super().__init__(
Expand All @@ -245,6 +253,8 @@ def __init__(
self.worker = worker
self.use_agent_pool = use_agent_pool
self.enable_workflow_memory = enable_workflow_memory
self.enable_breakpoint_resume = enable_breakpoint_resume
self._failed_task_agents: Dict[str, ChatAgent] = {}
self._shared_context_utility = context_utility
self._context_utility: Optional[ContextUtility] = (
None # Will be initialized when needed
Expand All @@ -271,6 +281,17 @@ def __init__(
auto_scale=auto_scale_pool,
)

def _build_retry_message(self, task: Task) -> str:
r"""Build a concise retry message describing the last failure."""
failure_reason = (task.result or "Unknown error").strip()
if len(failure_reason) > 500:
failure_reason = f"{failure_reason[:500]}..."
return (
f"Retry attempt {task.failure_count + 1}. "
f"Previous attempt failed with: {failure_reason}. "
"Continue from the previous context and address the failure."
)

def reset(self) -> Any:
r"""Resets the worker to its initial state."""
super().reset()
Expand Down Expand Up @@ -354,11 +375,32 @@ async def _process_task(
TaskState: `TaskState.DONE` if processed successfully, otherwise
`TaskState.FAILED`.
"""
# Get agent efficiently (from pool or by cloning)
worker_agent = await self._get_worker_agent()
# Reuse the failed agent if breakpoint resume is enabled,
# otherwise get a fresh agent from pool or by cloning.
reusing_agent = False
if (
self.enable_breakpoint_resume
and task.failure_count > 0
and task.id in self._failed_task_agents
):
worker_agent = self._failed_task_agents.pop(task.id)
reusing_agent = True
else:
worker_agent = await self._get_worker_agent()
response_content = ""

try:
if reusing_agent:
retry_message = self._build_retry_message(task)
worker_agent.update_memory(
BaseMessage.make_user_message(
role_name="user",
content=retry_message,
meta_dict={"type": "retry_context"},
),
OpenAIBackendRole.USER,
)

dependency_tasks_info = self._get_dep_tasks_info(dependencies)
prompt = str(
PROCESS_TASK_PROMPT.format(
Expand Down Expand Up @@ -489,81 +531,98 @@ async def _process_task(
)
# Store error information in task result
task.result = f"{type(e).__name__}: {e!s}"
if self.enable_breakpoint_resume:
self._failed_task_agents[task.id] = worker_agent
else:
await self._return_worker_agent(worker_agent)
return TaskState.FAILED
finally:
# Return agent to pool or let it be garbage collected
await self._return_worker_agent(worker_agent)

# Populate additional_info with worker attempt details
if task.additional_info is None:
task.additional_info = {}

# Create worker attempt details with descriptive keys
worker_attempt_details = {
"agent_id": getattr(
worker_agent, "agent_id", worker_agent.role_name
),
"original_worker_id": getattr(
self.worker, "agent_id", self.worker.role_name
),
"timestamp": str(datetime.datetime.now()),
"description": f"Attempt by "
f"{getattr(worker_agent, 'agent_id', worker_agent.role_name)} "
f"(from pool/clone of "
f"{getattr(self.worker, 'agent_id', self.worker.role_name)}) "
f"to process task: {task.content}",
"response_content": response_content[:50],
"tool_calls": str(
final_response.info.get("tool_calls")
if isinstance(response, AsyncStreamingChatAgentResponse)
else response.info.get("tool_calls")
)[:50],
"total_tokens": total_tokens,
}

# Store the worker attempt in additional_info
if "worker_attempts" not in task.additional_info:
task.additional_info["worker_attempts"] = []
task.additional_info["worker_attempts"].append(worker_attempt_details)
try:
# Populate additional_info with worker attempt details
if task.additional_info is None:
task.additional_info = {}

# Create worker attempt details with descriptive keys
worker_attempt_details = {
"agent_id": getattr(
worker_agent, "agent_id", worker_agent.role_name
),
"original_worker_id": getattr(
self.worker, "agent_id", self.worker.role_name
),
"timestamp": str(datetime.datetime.now()),
"description": f"Attempt by "
f"{getattr(worker_agent, 'agent_id', worker_agent.role_name)} "
f"(from pool/clone of "
f"{getattr(self.worker, 'agent_id', self.worker.role_name)}) "
f"to process task: {task.content}",
"response_content": response_content[:50],
"tool_calls": str(
final_response.info.get("tool_calls")
if isinstance(response, AsyncStreamingChatAgentResponse)
else response.info.get("tool_calls")
)[:50],
"total_tokens": total_tokens,
}

# Store the worker attempt in additional_info
if "worker_attempts" not in task.additional_info:
task.additional_info["worker_attempts"] = []
task.additional_info["worker_attempts"].append(
worker_attempt_details
)

# Store the actual token usage for this specific task
task.additional_info["token_usage"] = {"total_tokens": total_tokens}
# Store the actual token usage for this specific task
task.additional_info["token_usage"] = {
"total_tokens": total_tokens
}

print(f"======\n{Fore.GREEN}Response from {self}:{Fore.RESET}")
logger.info(f"Response from {self}:")
print(f"======\n{Fore.GREEN}Response from {self}:{Fore.RESET}")
logger.info(f"Response from {self}:")

if not self.use_structured_output_handler:
# Handle native structured output parsing
if task_result is None:
logger.error(
"Error in worker step execution: Invalid task result"
)
task_result = TaskResult(
content="Failed to generate valid task result.",
failed=True,
)
if not self.use_structured_output_handler:
# Handle native structured output parsing
if task_result is None:
logger.error(
"Error in worker step execution: Invalid task result"
)
task_result = TaskResult(
content="Failed to generate valid task result.",
failed=True,
)

color = Fore.RED if task_result.failed else Fore.GREEN # type: ignore[union-attr]
print(
f"\n{color}{task_result.content}{Fore.RESET}\n======", # type: ignore[union-attr]
)
if task_result.failed: # type: ignore[union-attr]
logger.error(f"{task_result.content}") # type: ignore[union-attr]
else:
logger.info(f"{task_result.content}") # type: ignore[union-attr]
color = Fore.RED if task_result.failed else Fore.GREEN # type: ignore[union-attr]
print(
f"\n{color}{task_result.content}{Fore.RESET}\n======", # type: ignore[union-attr]
)
if task_result.failed: # type: ignore[union-attr]
logger.error(f"{task_result.content}") # type: ignore[union-attr]
else:
logger.info(f"{task_result.content}") # type: ignore[union-attr]

task.result = task_result.content # type: ignore[union-attr]
task.result = task_result.content # type: ignore[union-attr]

if task_result.failed: # type: ignore[union-attr]
return TaskState.FAILED
if task_result.failed: # type: ignore[union-attr]
if self.enable_breakpoint_resume:
self._failed_task_agents[task.id] = worker_agent
return TaskState.FAILED
return TaskState.FAILED

if is_task_result_insufficient(task):
logger.warning(
f"Task {task.id}: Content validation failed - "
f"task marked as failed"
)
return TaskState.FAILED
return TaskState.DONE
if is_task_result_insufficient(task):
logger.warning(
f"Task {task.id}: Content validation failed - "
f"task marked as failed"
)
if self.enable_breakpoint_resume:
self._failed_task_agents[task.id] = worker_agent
return TaskState.FAILED
return TaskState.FAILED
return TaskState.DONE
finally:
# Only return the agent to the pool if it wasn't retained
# for breakpoint resume
if task.id not in self._failed_task_agents:
await self._return_worker_agent(worker_agent)

async def _listen_to_channel(self):
r"""Override to start cleanup task when pool is enabled."""
Expand Down
Loading
Loading