Skip to content

Commit 76828b9

Browse files
committed
fix
1 parent be5bd85 commit 76828b9

File tree

4 files changed

+263
-270
lines changed

4 files changed

+263
-270
lines changed

camel/societies/workforce/single_agent_worker.py

Lines changed: 31 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
import datetime
1818
import time
1919
from collections import deque
20-
from typing import Any, Dict, List, Optional, TypedDict
20+
from typing import Any, Dict, List, Optional
2121

2222
from colorama import Fore
2323

2424
from camel.agents import ChatAgent
2525
from camel.agents.chat_agent import AsyncStreamingChatAgentResponse
2626
from camel.logger import get_logger
27-
from camel.memories.records import MemoryRecord
2827
from camel.messages.base import BaseMessage
2928
from camel.societies.workforce.prompts import PROCESS_TASK_PROMPT
3029
from camel.societies.workforce.structured_output_handler import (
@@ -42,19 +41,6 @@
4241
logger = get_logger(__name__)
4342

4443

45-
class ExecutionContext(TypedDict, total=False):
46-
r"""Type definition for task execution context used in breakpoint resume.
47-
48-
Attributes:
49-
conversation_history: Serialized conversation history from previous
50-
execution attempts.
51-
retry_context: Message describing the failure reason for retry.
52-
"""
53-
54-
conversation_history: List[Dict[str, Any]]
55-
retry_context: str
56-
57-
5844
class AgentPool:
5945
r"""A pool of agent instances for efficient reuse.
6046
@@ -233,11 +219,11 @@ class SingleAgentWorker(Worker):
233219
conversations from all task executions are accumulated for
234220
potential workflow saving. Set to True if you plan to call
235221
save_workflow_memories(). (default: :obj:`False`)
236-
enable_breakpoint_resume (bool, optional): Whether to preserve and
237-
restore conversation history for retry attempts. When enabled,
238-
failed tasks save their conversation history to
239-
``task.additional_info`` and reuse it on retry. (default:
240-
:obj:`True`)
222+
enable_breakpoint_resume (bool, optional): Whether to retain the agent
223+
instance on task failure for reuse on retry. When enabled, the
224+
worker keeps the failed agent (with its conversation history
225+
intact) and reuses it directly on the next attempt instead of
226+
creating a fresh agent. (default: :obj:`True`)
241227
"""
242228

243229
def __init__(
@@ -268,6 +254,7 @@ def __init__(
268254
self.use_agent_pool = use_agent_pool
269255
self.enable_workflow_memory = enable_workflow_memory
270256
self.enable_breakpoint_resume = enable_breakpoint_resume
257+
self._failed_task_agents: Dict[str, ChatAgent] = {}
271258
self._shared_context_utility = context_utility
272259
self._context_utility: Optional[ContextUtility] = (
273260
None # Will be initialized when needed
@@ -294,57 +281,6 @@ def __init__(
294281
auto_scale=auto_scale_pool,
295282
)
296283

297-
def _ensure_execution_context(self, task: Task) -> ExecutionContext:
298-
r"""Ensure task.execution_context is a mutable dict."""
299-
if task.execution_context is None or not isinstance(
300-
task.execution_context, dict
301-
):
302-
task.execution_context = {}
303-
return task.execution_context # type: ignore[return-value]
304-
305-
def _serialize_conversation_history(
306-
self, agent: ChatAgent
307-
) -> List[Dict[str, Any]]:
308-
r"""Serialize agent's conversation history for storage.
309-
310-
Filters out retry_context messages to avoid duplication on subsequent
311-
retries.
312-
"""
313-
records = agent.memory.retrieve()
314-
result = []
315-
for record in records:
316-
meta = record.memory_record.message.meta_dict or {}
317-
if meta.get("type") == "retry_context":
318-
continue
319-
result.append(record.memory_record.to_dict())
320-
return result
321-
322-
def _restore_conversation_history(
323-
self, agent: ChatAgent, history: List[Dict[str, Any]]
324-
) -> None:
325-
r"""Restore conversation history to agent, skipping system messages."""
326-
for record_dict in history:
327-
record = MemoryRecord.from_dict(record_dict)
328-
329-
if record.role_at_backend == OpenAIBackendRole.SYSTEM:
330-
continue
331-
332-
restored_record = MemoryRecord(
333-
message=record.message,
334-
role_at_backend=record.role_at_backend,
335-
timestamp=record.timestamp,
336-
agent_id=agent.agent_id,
337-
extra_info=record.extra_info,
338-
)
339-
agent.memory.write_record(restored_record)
340-
341-
def _save_breakpoint_history(self, task: Task, agent: ChatAgent) -> None:
342-
r"""Save conversation history into task.execution_context."""
343-
execution_context = self._ensure_execution_context(task)
344-
execution_context["conversation_history"] = (
345-
self._serialize_conversation_history(agent)
346-
)
347-
348284
def _build_retry_message(self, task: Task) -> str:
349285
r"""Build a concise retry message describing the last failure."""
350286
failure_reason = (task.result or "Unknown error").strip()
@@ -439,19 +375,23 @@ async def _process_task(
439375
TaskState: `TaskState.DONE` if processed successfully, otherwise
440376
`TaskState.FAILED`.
441377
"""
442-
# Get agent efficiently (from pool or by cloning)
443-
worker_agent = await self._get_worker_agent()
378+
# Reuse the failed agent if breakpoint resume is enabled,
379+
# otherwise get a fresh agent from pool or by cloning.
380+
reusing_agent = False
381+
if (
382+
self.enable_breakpoint_resume
383+
and task.failure_count > 0
384+
and task.id in self._failed_task_agents
385+
):
386+
worker_agent = self._failed_task_agents.pop(task.id)
387+
reusing_agent = True
388+
else:
389+
worker_agent = await self._get_worker_agent()
444390
response_content = ""
445391

446392
try:
447-
if self.enable_breakpoint_resume and task.failure_count > 0:
448-
execution_context = self._ensure_execution_context(task)
449-
history = execution_context.get("conversation_history")
450-
if history:
451-
self._restore_conversation_history(worker_agent, history)
452-
393+
if reusing_agent:
453394
retry_message = self._build_retry_message(task)
454-
execution_context["retry_context"] = retry_message
455395
worker_agent.update_memory(
456396
BaseMessage.make_user_message(
457397
role_name="user",
@@ -592,8 +532,9 @@ async def _process_task(
592532
# Store error information in task result
593533
task.result = f"{type(e).__name__}: {e!s}"
594534
if self.enable_breakpoint_resume:
595-
self._save_breakpoint_history(task, worker_agent)
596-
await self._return_worker_agent(worker_agent)
535+
self._failed_task_agents[task.id] = worker_agent
536+
else:
537+
await self._return_worker_agent(worker_agent)
597538
return TaskState.FAILED
598539

599540
try:
@@ -663,7 +604,8 @@ async def _process_task(
663604

664605
if task_result.failed: # type: ignore[union-attr]
665606
if self.enable_breakpoint_resume:
666-
self._save_breakpoint_history(task, worker_agent)
607+
self._failed_task_agents[task.id] = worker_agent
608+
return TaskState.FAILED
667609
return TaskState.FAILED
668610

669611
if is_task_result_insufficient(task):
@@ -672,11 +614,15 @@ async def _process_task(
672614
f"task marked as failed"
673615
)
674616
if self.enable_breakpoint_resume:
675-
self._save_breakpoint_history(task, worker_agent)
617+
self._failed_task_agents[task.id] = worker_agent
618+
return TaskState.FAILED
676619
return TaskState.FAILED
677620
return TaskState.DONE
678621
finally:
679-
await self._return_worker_agent(worker_agent)
622+
# Only return the agent to the pool if it wasn't retained
623+
# for breakpoint resume
624+
if task.id not in self._failed_task_agents:
625+
await self._return_worker_agent(worker_agent)
680626

681627
async def _listen_to_channel(self):
682628
r"""Override to start cleanup task when pool is enabled."""

camel/tasks/task.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,6 @@ class Task(BaseModel):
239239
(default: :obj:`[]`)
240240
additional_info (Optional[Dict[str, Any]]): Additional information for
241241
the task. (default: :obj:`None`)
242-
execution_context (Optional[Dict[str, Any]]): Internal execution state
243-
used by workers for features like breakpoint resume. This stores
244-
conversation history and retry context to allow failed tasks to
245-
resume from where they left off. (default: :obj:`None`)
246242
image_list (Optional[List[Union[Image.Image, str]]]): Optional list
247243
of PIL Image objects or image URLs (strings) associated with the
248244
task. (default: :obj:`None`)
@@ -278,8 +274,6 @@ class Task(BaseModel):
278274

279275
additional_info: Optional[Dict[str, Any]] = None
280276

281-
execution_context: Optional[Dict[str, Any]] = None
282-
283277
image_list: Optional[List[Union[Image.Image, str]]] = None
284278

285279
image_detail: Literal["auto", "low", "high"] = "auto"

0 commit comments

Comments
 (0)