diff --git a/camel/agents/chat_agent.py b/camel/agents/chat_agent.py index 93774568d6..eed8d27f05 100644 --- a/camel/agents/chat_agent.py +++ b/camel/agents/chat_agent.py @@ -1187,7 +1187,6 @@ def update_memory( timestamp (Optional[float], optional): Custom timestamp for the memory record. If `None`, the current time will be used. (default: :obj:`None`) - (default: obj:`None`) """ record = MemoryRecord( message=message, @@ -3552,7 +3551,9 @@ def _record_tool_calling( base_timestamp = current_time_ns / 1_000_000_000 # Convert to seconds self.update_memory( - assist_msg, OpenAIBackendRole.ASSISTANT, timestamp=base_timestamp + assist_msg, + OpenAIBackendRole.ASSISTANT, + timestamp=base_timestamp, ) # Add minimal increment to ensure function message comes after @@ -3562,6 +3563,41 @@ def _record_tool_calling( timestamp=base_timestamp + 1e-6, ) + # Process tool output through the architecture if tool has output + # manager + if ( + hasattr(self, '_internal_tools') + and func_name in self._internal_tools + ): + tool = self._internal_tools[func_name] + if hasattr(tool, 'func') and hasattr(tool.func, '__self__'): + toolkit_instance = tool.func.__self__ + if hasattr(toolkit_instance, 'process_tool_output'): + try: + toolkit_instance.process_tool_output( + tool_name=func_name, + tool_call_id=tool_call_id, + raw_result=result, + agent_id=self.agent_id, + timestamp=base_timestamp + 1e-6, + ) + except Exception as e: + # Determine log level based on exception type + if isinstance( + e, (AttributeError, ValueError, TypeError) + ): + logger.warning( + f"Error in tool output processing for " + f"{func_name}: {e.__class__.__name__}: {e}" + ) + else: + logger.error( + f"Unexpected error in tool output " + f"processing for {func_name}: " + f"{e.__class__.__name__}: {e}", + exc_info=True, + ) + # Record information about this tool call tool_record = ToolCallingRecord( tool_name=func_name, diff --git a/camel/toolkits/base.py b/camel/toolkits/base.py index ba702c1d2c..ded9d92392 100644 --- a/camel/toolkits/base.py +++ b/camel/toolkits/base.py @@ -12,10 +12,15 @@ # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= -from typing import TYPE_CHECKING, List, Literal, Optional +from typing import TYPE_CHECKING, Any, List, Literal, Optional from camel.logger import get_logger from camel.toolkits import FunctionTool +from camel.toolkits.output_processors import ( + ToolOutputContext, + ToolOutputManager, + ToolOutputProcessor, +) from camel.utils import AgentOpsMeta, with_timeout if TYPE_CHECKING: @@ -43,6 +48,55 @@ def __init__(self, timeout: Optional[float] = None): raise ValueError("Timeout must be a positive number.") self.timeout = timeout + # Initialize output management + self.output_manager = ToolOutputManager() + + def register_output_processor( + self, processor: ToolOutputProcessor + ) -> None: + r"""Register an output processor for tool results. + + Args: + processor: The output processor to register. + """ + self.output_manager.register_processor(processor) + + def process_tool_output( + self, + tool_name: str, + tool_call_id: str, + raw_result: Any, + agent_id: str, + timestamp: Optional[float] = None, + ) -> ToolOutputContext: + r"""Process tool output through registered processors. + + Args: + tool_name (str): Name of the tool that produced the output. + tool_call_id (str): Unique identifier for the tool call. + raw_result (Any): Raw output from the tool. + agent_id (str): ID of the agent that made the tool call. + timestamp (Optional[float]): Timestamp of the tool call. + + Returns: + Processed tool output context. + + Raises: + ValueError: If required parameters are missing or invalid. + """ + if not tool_name or not tool_call_id or not agent_id: + raise ValueError( + "tool_name, tool_call_id, and agent_id are required" + ) + + return self.output_manager.process_tool_output( + tool_name=tool_name, + tool_call_id=tool_call_id, + raw_result=raw_result, + agent_id=agent_id, + timestamp=timestamp, + ) + # Add timeout to all callable methods in the toolkit def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) diff --git a/camel/toolkits/hybrid_browser_toolkit/hybrid_browser_toolkit_ts.py b/camel/toolkits/hybrid_browser_toolkit/hybrid_browser_toolkit_ts.py index b5f8465deb..0c5f182273 100644 --- a/camel/toolkits/hybrid_browser_toolkit/hybrid_browser_toolkit_ts.py +++ b/camel/toolkits/hybrid_browser_toolkit/hybrid_browser_toolkit_ts.py @@ -21,6 +21,7 @@ from camel.models import BaseModelBackend from camel.toolkits.base import BaseToolkit, RegisteredAgentToolkit from camel.toolkits.function_tool import FunctionTool +from camel.toolkits.output_processors import SnapshotCleaningProcessor from camel.utils.commons import dependencies_required from .config_loader import ConfigLoader @@ -99,42 +100,43 @@ def __init__( cdp_url: Optional[str] = None, cdp_keep_current_page: bool = False, full_visual_mode: bool = False, + clean_snapshots: bool = False, ) -> None: r"""Initialize the HybridBrowserToolkit. Args: headless (bool): Whether to run browser in headless mode. - Defaults to True. + Defaults to True. user_data_dir (Optional[str]): Directory for user data - persistence. Defaults to None. + persistence. Defaults to None. stealth (bool): Whether to enable stealth mode. Defaults to - False. + False. web_agent_model (Optional[BaseModelBackend]): Model for web - agent operations. Defaults to None. + agent operations. Defaults to None. cache_dir (str): Directory for caching. Defaults to "tmp/". enabled_tools (Optional[List[str]]): List of enabled tools. - Defaults to None. + Defaults to None. browser_log_to_file (bool): Whether to log browser actions to - file. Defaults to False. + file. Defaults to False. log_dir (Optional[str]): Custom directory path for log files. - If None, defaults to "browser_log". Defaults to None. + If None, defaults to "browser_log". Defaults to None. session_id (Optional[str]): Session identifier. Defaults to None. default_start_url (str): Default URL to start with. Defaults - to "https://google.com/". + to "https://google.com/". default_timeout (Optional[int]): Default timeout in - milliseconds. Defaults to None. + milliseconds. Defaults to None. short_timeout (Optional[int]): Short timeout in milliseconds. - Defaults to None. + Defaults to None. navigation_timeout (Optional[int]): Navigation timeout in - milliseconds. Defaults to None. + milliseconds. Defaults to None. network_idle_timeout (Optional[int]): Network idle timeout in - milliseconds. Defaults to None. + milliseconds. Defaults to None. screenshot_timeout (Optional[int]): Screenshot timeout in - milliseconds. Defaults to None. + milliseconds. Defaults to None. page_stability_timeout (Optional[int]): Page stability timeout - in milliseconds. Defaults to None. + in milliseconds. Defaults to None. dom_content_loaded_timeout (Optional[int]): DOM content loaded - timeout in milliseconds. Defaults to None. + timeout in milliseconds. Defaults to None. viewport_limit (bool): Whether to filter page snapshot elements to only those visible in the current viewport. When True, only elements within the current viewport @@ -142,15 +144,19 @@ def __init__( When False (default), all elements on the page are included. Defaults to False. connect_over_cdp (bool): Whether to connect to an existing - browser via Chrome DevTools Protocol. Defaults to False. - cdp_url (Optional[str]): WebSocket endpoint URL for CDP - connection (e.g., 'ws://localhost:9222/devtools/browser/...'). - Required when connect_over_cdp is True. Defaults to None. - cdp_keep_current_page (bool): When True and using CDP mode, - won't create new pages but use the existing one. Defaults to False. - full_visual_mode (bool): When True, browser actions like click, - browser_open, visit_page, etc. will not return snapshots. - Defaults to False. + browser via Chrome DevTools Protocol. Defaults to False. + cdp_url (Optional[str]): WebSocket endpoint URL for CDP + connection (e.g., 'ws://localhost:9222/devtools/browser/...'). + Required when connect_over_cdp is True. Defaults to None. + cdp_keep_current_page (bool): When True and using CDP mode, + won't create new pages but use the existing one. Defaults to + False. full_visual_mode (bool): When True, browser actions + like click, browser_open, visit_page, etc. will not return + snapshots. Defaults to False. + clean_snapshots (bool): When True, automatically cleans verbose + DOM snapshots to reduce context usage while preserving + essential information. Removes redundant markers and references + from browser tool outputs. Defaults to False. """ super().__init__() RegisteredAgentToolkit.__init__(self) @@ -203,6 +209,7 @@ def __init__( self._session_id = toolkit_config.session_id or "default" self._viewport_limit = browser_config.viewport_limit self._full_visual_mode = browser_config.full_visual_mode + self._clean_snapshots = clean_snapshots self._default_timeout = browser_config.default_timeout self._short_timeout = browser_config.short_timeout @@ -229,6 +236,15 @@ def __init__( logger.info(f"Enabled tools: {self.enabled_tools}") + # Setup snapshot cleaning if enabled + if self._clean_snapshots: + snapshot_processor = SnapshotCleaningProcessor() + self.register_output_processor(snapshot_processor) + logger.info( + "Snapshot cleaning enabled - DOM snapshots will " + "be automatically cleaned" + ) + self._ws_wrapper: Optional[WebSocketBrowserWrapper] = None self._ws_config = self.config_loader.to_ws_config() @@ -245,6 +261,38 @@ async def _get_ws_wrapper(self) -> WebSocketBrowserWrapper: raise RuntimeError("Failed to initialize WebSocket wrapper") return self._ws_wrapper + def _clean_snapshot_if_enabled( + self, snapshot: str, tool_name: str = "browser_ts" + ) -> str: + r"""Clean snapshot content if snapshot cleaning is enabled. + + Args: + snapshot: The raw snapshot content to clean. + tool_name: The name of the tool that generated the snapshot. + + Returns: + The cleaned snapshot if cleaning is enabled, otherwise the + original snapshot. + """ + if not self._clean_snapshots or not snapshot: + return snapshot + + try: + # Process through the output manager + processed_context = self.process_tool_output( + tool_name=tool_name, + tool_call_id="snapshot_clean", + raw_result=snapshot, + agent_id=getattr(self, '_session_id', 'default'), + ) + + return processed_context.raw_result + except Exception as e: + logger.warning( + f"Failed to clean snapshot: {e}, returning original" + ) + return snapshot + def __del__(self): r"""Cleanup browser resources on garbage collection.""" try: @@ -408,6 +456,12 @@ async def browser_visit_page(self, url: str) -> Dict[str, Any]: ws_wrapper = await self._get_ws_wrapper() result = await ws_wrapper.visit_page(url) + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_visit_page" + ) + tab_info = await ws_wrapper.get_tab_info() result.update( { @@ -453,6 +507,12 @@ async def browser_back(self) -> Dict[str, Any]: ws_wrapper = await self._get_ws_wrapper() result = await ws_wrapper.back() + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_back" + ) + tab_info = await ws_wrapper.get_tab_info() result.update( { @@ -498,6 +558,12 @@ async def browser_forward(self) -> Dict[str, Any]: ws_wrapper = await self._get_ws_wrapper() result = await ws_wrapper.forward() + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_forward" + ) + tab_info = await ws_wrapper.get_tab_info() result.update( { @@ -546,7 +612,11 @@ async def browser_get_page_snapshot(self) -> str: """ try: ws_wrapper = await self._get_ws_wrapper() - return await ws_wrapper.get_page_snapshot(self._viewport_limit) + result = await ws_wrapper.get_page_snapshot(self._viewport_limit) + # Clean snapshot if enabled + return self._clean_snapshot_if_enabled( + result, "browser_get_page_snapshot" + ) except Exception as e: logger.error(f"Failed to get page snapshot: {e}") return f"Error capturing snapshot: {e}" @@ -687,9 +757,14 @@ async def browser_click(self, *, ref: str) -> Dict[str, Any]: tab_info = await ws_wrapper.get_tab_info() + # Clean snapshot if enabled + cleaned_snapshot = self._clean_snapshot_if_enabled( + result.get("snapshot", ""), "browser_click" + ) + response = { "result": result.get("result", ""), - "snapshot": result.get("snapshot", ""), + "snapshot": cleaned_snapshot, "tabs": tab_info, "current_tab": next( ( @@ -768,6 +843,13 @@ async def browser_type( ) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_type" + ) + result.update( { "tabs": tab_info, @@ -816,6 +898,13 @@ async def browser_select(self, *, ref: str, value: str) -> Dict[str, Any]: result = await ws_wrapper.select(ref, value) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_select" + ) + result.update( { "tabs": tab_info, @@ -864,6 +953,13 @@ async def browser_scroll( result = await ws_wrapper.scroll(direction, amount) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_scroll" + ) + result.update( { "tabs": tab_info, @@ -911,6 +1007,13 @@ async def browser_enter(self) -> Dict[str, Any]: result = await ws_wrapper.enter() tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_enter" + ) + result.update( { "tabs": tab_info, @@ -962,6 +1065,13 @@ async def browser_mouse_control( result = await ws_wrapper.mouse_control(control, x, y) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_mouse_control" + ) + result.update( { "tabs": tab_info, @@ -1010,6 +1120,13 @@ async def browser_mouse_drag( result = await ws_wrapper.mouse_drag(from_ref, to_ref) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_mouse_drag" + ) + result.update( { "tabs": tab_info, @@ -1058,6 +1175,13 @@ async def browser_press_key(self, *, keys: List[str]) -> Dict[str, Any]: result = await ws_wrapper.press_key(keys) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_press_key" + ) + result.update( { "tabs": tab_info, @@ -1106,6 +1230,13 @@ async def browser_switch_tab(self, *, tab_id: str) -> Dict[str, Any]: result = await ws_wrapper.switch_tab(tab_id) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_switch_tab" + ) + result.update( { "tabs": tab_info, @@ -1155,6 +1286,13 @@ async def browser_close_tab(self, *, tab_id: str) -> Dict[str, Any]: result = await ws_wrapper.close_tab(tab_id) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_close_tab" + ) + result.update( { "tabs": tab_info, @@ -1260,6 +1398,13 @@ async def browser_console_exec(self, code: str) -> Dict[str, Any]: result = await ws_wrapper.console_exec(code) tab_info = await ws_wrapper.get_tab_info() + + # Clean snapshot if enabled + if "snapshot" in result: + result["snapshot"] = self._clean_snapshot_if_enabled( + result["snapshot"], "browser_console_exec" + ) + result.update( { "tabs": tab_info, @@ -1413,6 +1558,7 @@ def clone_for_new_session( dom_content_loaded_timeout=self._dom_content_loaded_timeout, viewport_limit=self._viewport_limit, full_visual_mode=self._full_visual_mode, + clean_snapshots=self._clean_snapshots, ) def get_tools(self) -> List[FunctionTool]: diff --git a/camel/toolkits/hybrid_browser_toolkit_py/hybrid_browser_toolkit.py b/camel/toolkits/hybrid_browser_toolkit_py/hybrid_browser_toolkit.py index 3f918e5ecc..cb873d3609 100644 --- a/camel/toolkits/hybrid_browser_toolkit_py/hybrid_browser_toolkit.py +++ b/camel/toolkits/hybrid_browser_toolkit_py/hybrid_browser_toolkit.py @@ -25,6 +25,7 @@ from camel.models import BaseModelBackend from camel.toolkits.base import BaseToolkit, RegisteredAgentToolkit from camel.toolkits.function_tool import FunctionTool +from camel.toolkits.output_processors import SnapshotCleaningProcessor from camel.utils import sanitize_filename from camel.utils.commons import dependencies_required @@ -106,6 +107,7 @@ def __init__( page_stability_timeout: Optional[int] = None, dom_content_loaded_timeout: Optional[int] = None, viewport_limit: bool = False, + clean_snapshots: bool = False, ) -> None: r"""Initialize the HybridBrowserToolkit. @@ -195,6 +197,10 @@ def __init__( visible in the current viewport. When False, return all elements on the page regardless of visibility. Defaults to `False`. + clean_snapshots (bool): When True, automatically cleans verbose + DOM snapshots to reduce context usage while preserving + essential information. Removes redundant markers and references + from browser tool outputs. Defaults to `False`. """ super().__init__() RegisteredAgentToolkit.__init__(self) @@ -208,6 +214,7 @@ def __init__( self._default_start_url = default_start_url or "https://google.com/" self._session_id = session_id or "default" self._viewport_limit = viewport_limit + self._clean_snapshots = clean_snapshots # Store timeout configuration self._default_timeout = default_timeout @@ -271,6 +278,15 @@ def __init__( logger.info(f"Enabled tools: {self.enabled_tools}") + # Setup snapshot cleaning if enabled + if self._clean_snapshots: + snapshot_processor = SnapshotCleaningProcessor() + self.register_output_processor(snapshot_processor) + logger.info( + "Snapshot cleaning enabled - DOM snapshots will " + "be automatically cleaned" + ) + # Log initialization if file logging is enabled if self.log_to_file: logger.info( @@ -355,6 +371,38 @@ def _load_unified_analyzer(self) -> str: except FileNotFoundError: raise FileNotFoundError(f"Script not found: {script_path}") + def _clean_snapshot_if_enabled( + self, snapshot: str, tool_name: str = "browser" + ) -> str: + r"""Clean snapshot content if snapshot cleaning is enabled. + + Args: + snapshot: The raw snapshot content to clean. + tool_name: The name of the tool that generated the snapshot. + + Returns: + The cleaned snapshot if cleaning is enabled, otherwise the + original snapshot. + """ + if not self._clean_snapshots or not snapshot: + return snapshot + + try: + # Process through the output manager + processed_context = self.process_tool_output( + tool_name=tool_name, + tool_call_id="snapshot_clean", + raw_result=snapshot, + agent_id=getattr(self, '_session_id', 'default'), + ) + + return processed_context.raw_result + except Exception as e: + logger.warning( + f"Failed to clean snapshot: {e}, returning original" + ) + return snapshot + def _validate_ref(self, ref: str, method_name: str) -> None: r"""Validate ref parameter.""" if not ref or not isinstance(ref, str): @@ -995,6 +1043,11 @@ async def _exec_with_snapshot( f"{len(after_snapshot)} chars" ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_action" + ) + # Get tab information for output tab_info = await self._get_tab_info_for_output() @@ -1281,6 +1334,10 @@ async def browser_visit_page(self, url: str) -> Dict[str, Any]: snapshot = await session.get_snapshot( force_refresh=True, diff_only=False ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_visit_page" + ) except Exception as e: logger.warning(f"Failed to capture snapshot: {e}") @@ -1327,6 +1384,10 @@ async def browser_back(self) -> Dict[str, Any]: snapshot = await self._session.get_snapshot( force_refresh=True, diff_only=False ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_back" + ) snapshot_time = time.time() - snapshot_start logger.info( f"Back navigation snapshot captured in {snapshot_time:.2f}s" @@ -1392,6 +1453,10 @@ async def browser_forward(self) -> Dict[str, Any]: snapshot = await self._session.get_snapshot( force_refresh=True, diff_only=False ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_forward" + ) snapshot_time = time.time() - snapshot_start logger.info( f"Forward navigation snapshot captured in " @@ -1446,12 +1511,17 @@ async def browser_get_page_snapshot(self) -> str: ) snapshot_text = analysis_data.get("snapshotText", "") - return ( + result = ( snapshot_text if snapshot_text else self._format_snapshot_from_analysis(analysis_data) ) + # Clean snapshot if enabled + return self._clean_snapshot_if_enabled( + result, "browser_get_page_snapshot" + ) + @dependencies_required('PIL') @action_logger async def browser_get_som_screenshot( @@ -1930,6 +2000,10 @@ async def _await_enter(): snapshot = await self._session.get_snapshot( force_refresh=True, diff_only=False ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_wait_user" + ) tab_info = await self._get_tab_info_for_output() return {"result": result_msg, "snapshot": snapshot, **tab_info} @@ -2098,6 +2172,10 @@ async def browser_console_exec(self, code: str) -> Dict[str, Any]: snapshot = await self._session.get_snapshot( force_refresh=True, diff_only=False ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_console_exec" + ) snapshot_time = time.time() - snapshot_start logger.info( f"Code execution snapshot captured in " f"{snapshot_time:.2f}s" @@ -2217,6 +2295,8 @@ def clone_for_new_session( screenshot_timeout=self._screenshot_timeout, page_stability_timeout=self._page_stability_timeout, dom_content_loaded_timeout=self._dom_content_loaded_timeout, + viewport_limit=self._viewport_limit, + clean_snapshots=self._clean_snapshots, ) @action_logger @@ -2246,6 +2326,10 @@ async def browser_switch_tab(self, *, tab_id: str) -> Dict[str, Any]: snapshot = await session.get_snapshot( force_refresh=True, diff_only=False ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_switch_tab" + ) tab_info = await self._get_tab_info_for_output() result = { @@ -2294,6 +2378,10 @@ async def browser_close_tab(self, *, tab_id: str) -> Dict[str, Any]: snapshot = await session.get_snapshot( force_refresh=True, diff_only=False ) + # Clean snapshot if enabled + snapshot = self._clean_snapshot_if_enabled( + snapshot, "browser_close_tab" + ) except Exception: snapshot = "" # No active tab diff --git a/camel/toolkits/output_processors.py b/camel/toolkits/output_processors.py new file mode 100644 index 0000000000..d92a7ef3dd --- /dev/null +++ b/camel/toolkits/output_processors.py @@ -0,0 +1,396 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= + +import json +import re +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +from camel.logger import get_logger + +logger = get_logger(__name__) + + +@dataclass +class ToolOutputContext: + r"""Tool output context information. + + Contains all necessary information about a tool's output + to enable flexible processing. + """ + + tool_name: str + tool_call_id: str + raw_result: Any + agent_id: str + timestamp: float + metadata: Dict[str, Any] = field(default_factory=dict) + + +class ToolOutputProcessor(ABC): + r"""Abstract base class for tool output processors. + + Tool output processors enable custom handling of tool execution results, + such as cleaning, formatting, caching, or other transformations. + """ + + @abstractmethod + def can_process(self, context: ToolOutputContext) -> bool: + r"""Determine if this processor can handle the given tool output. + + Args: + context: Tool output context information. + + Returns: + True if this processor can handle the output, False otherwise. + """ + pass + + @abstractmethod + def process(self, context: ToolOutputContext) -> ToolOutputContext: + r"""Process the tool output and return modified context. + + Args: + context: Tool output context to process. + + Returns: + Modified tool output context. + """ + pass + + +class SnapshotCleaningProcessor(ToolOutputProcessor): + r"""Processor for cleaning snapshot data from browser tools. + + This processor removes verbose DOM markers and references from + snapshot outputs while preserving essential information. + """ + + def __init__(self, enable_cleaning: bool = True): + r"""Initialize snapshot cleaning processor. + + Args: + enable_cleaning: Whether to enable snapshot cleaning. + """ + self.enable_cleaning = enable_cleaning + + def can_process(self, context: ToolOutputContext) -> bool: + r"""Check if this tool output contains snapshot data to clean.""" + if not self.enable_cleaning: + return False + + # Check if it's a browser tool or contains snapshot data + return ( + 'browser' in context.tool_name.lower() + or self._contains_snapshot_data(context.raw_result) + ) + + def process(self, context: ToolOutputContext) -> ToolOutputContext: + r"""Clean snapshot data from the tool output.""" + if not self.enable_cleaning: + return context + + original_size = len(str(context.raw_result)) + cleaned_result = self._clean_snapshot_content(context.raw_result) + cleaned_size = len(str(cleaned_result)) + + # Create new context with cleaned data + new_context = ToolOutputContext( + tool_name=context.tool_name, + tool_call_id=context.tool_call_id, + raw_result=cleaned_result, + agent_id=context.agent_id, + timestamp=context.timestamp, + metadata={ + **context.metadata, + 'original_size': original_size, + 'cleaned_size': cleaned_size, + 'processor': 'SnapshotCleaningProcessor', + }, + ) + + logger.debug( + f"Cleaned snapshot output for {context.tool_name}: " + f"{original_size} -> {cleaned_size} chars" + ) + + return new_context + + def _contains_snapshot_data(self, result: Any) -> bool: + r"""Check if the result contains snapshot data.""" + result_str = str(result) + return '- ' in result_str and '[ref=' in result_str + + def _clean_snapshot_content(self, content: Any) -> Any: + r"""Clean snapshot content by removing prefixes, references, and + deduplicating lines. + + This method identifies snapshot lines (containing element keywords or + references) and cleans them while preserving non-snapshot content. + It also handles JSON-formatted tool outputs with snapshot fields. + + Args: + content: The original snapshot content. + + Returns: + The cleaned content with deduplicated lines. + """ + if isinstance(content, str): + return self._clean_text_snapshot(content) + + try: + # Try to parse as JSON and clean nested snapshot fields + if isinstance(content, (dict, list)): + # Already parsed JSON data + cleaned_data = self._clean_json_snapshot(content) + return ( + json.dumps(cleaned_data, ensure_ascii=False, indent=2) + if cleaned_data != content + else content + ) + else: + # Try to parse string as JSON + data = json.loads(str(content)) + cleaned_data = self._clean_json_snapshot(data) + return ( + json.dumps(cleaned_data, ensure_ascii=False, indent=2) + if cleaned_data != data + else str(content) + ) + except (json.JSONDecodeError, TypeError): + return self._clean_text_snapshot(str(content)) + + def _clean_json_snapshot(self, data: Any) -> Any: + r"""Clean JSON data containing snapshot fields.""" + if isinstance(data, dict): + result = {} + modified = False + + for key, value in data.items(): + if key == 'snapshot' and isinstance(value, str): + # Clean the snapshot field + try: + decoded_value = value.encode().decode('unicode_escape') + except (UnicodeDecodeError, AttributeError): + decoded_value = value + + if self._needs_cleaning(decoded_value): + result[key] = self._clean_text_snapshot(decoded_value) + modified = True + else: + result[key] = value + else: + cleaned_value = self._clean_json_snapshot(value) + result[key] = cleaned_value + if cleaned_value != value: + modified = True + + return result if modified else data + elif isinstance(data, list): + list_result: List[Any] = [] + modified = False + for item in data: + cleaned_item = self._clean_json_snapshot(item) + list_result.append(cleaned_item) + if cleaned_item != item: + modified = True + return list_result if modified else data + else: + return data + + def _needs_cleaning(self, text: str) -> bool: + r"""Check if text needs cleaning based on snapshot markers.""" + return ( + '- ' in text + and '[ref=' in text + or any( + elem + ':' in text + for elem in [ + 'generic', + 'img', + 'banner', + 'list', + 'listitem', + 'search', + 'navigation', + ] + ) + ) + + def _clean_text_snapshot(self, content: str) -> str: + r"""Clean plain text snapshot content. + + This method: + - Removes indentation and empty lines + - Deduplicates lines + - Cleans snapshot-specific markers + + Args: + content: The snapshot text to clean. + + Returns: + Cleaned content with deduplicated lines. + """ + lines = content.split('\n') + cleaned_lines = [] + seen = set() + + for line in lines: + stripped_line = line.strip() + + if not stripped_line: + continue + + # Skip metadata lines (like "- /url:", "- /ref:") + if re.match(r'^-?\s*/\w+\s*:', stripped_line): + continue + + is_snapshot_line = '[ref=' in stripped_line or re.match( + r'^(?:-\s+)?\w+(?:[\s:]|$)', stripped_line + ) + + if is_snapshot_line: + cleaned = self._clean_snapshot_line(stripped_line) + if cleaned and cleaned not in seen: + cleaned_lines.append(cleaned) + seen.add(cleaned) + else: + if stripped_line not in seen: + cleaned_lines.append(stripped_line) + seen.add(stripped_line) + + return '\n'.join(cleaned_lines) + + def _clean_snapshot_line(self, line: str) -> str: + r"""Clean a single snapshot line by removing prefixes and references. + + This method handles snapshot lines in the format: + - [prefix] "quoted text" [attributes] [ref=...]: description + + It preserves: + - Quoted text content (including brackets inside quotes) + - Description text after the colon + + It removes: + - Line prefixes (e.g., "- button", "- tooltip", "generic:") + - Attribute markers (e.g., [disabled], [ref=e47]) + - Lines with only element types + - All indentation + + Args: + line: The original line content. + + Returns: + The cleaned line content, or empty string if line should be + removed. + """ + original = line.strip() + if not original: + return '' + + # Check if line is just an element type marker + if re.match(r'^(?:-\s+)?\w+\s*:?\s*$', original): + return '' + + # Remove element type prefix + line = re.sub(r'^(?:-\s+)?\w+[\s:]+', '', original) + + # Remove bracket markers while preserving quoted text + quoted_parts = [] + + def save_quoted(match): + quoted_parts.append(match.group(0)) + return f'__QUOTED_{len(quoted_parts)-1}__' + + line = re.sub(r'"[^"]*"', save_quoted, line) + line = re.sub(r'\s*\[[^\]]+\]\s*', ' ', line) + + for i, quoted in enumerate(quoted_parts): + line = line.replace(f'__QUOTED_{i}__', quoted) + + # Clean up formatting + line = re.sub(r'\s+', ' ', line).strip() + line = re.sub(r'\s*:\s*', ': ', line) + line = line.lstrip(': ').strip() + + return '' if not line else line + + +class ToolOutputManager: + r"""Manages tool output processing through registered processors. + + Coordinates multiple output processors to handle tool results. + """ + + def __init__(self): + """Initialize tool output manager.""" + self.processors: List[ToolOutputProcessor] = [] + + def register_processor(self, processor: ToolOutputProcessor) -> None: + """Register an output processor. + + Args: + processor: The processor to register. + """ + if processor not in self.processors: + self.processors.append(processor) + logger.debug( + f"Registered processor: {processor.__class__.__name__}" + ) + + def process_tool_output( + self, + tool_name: str, + tool_call_id: str, + raw_result: Any, + agent_id: str, + timestamp: Optional[float] = None, + ) -> ToolOutputContext: + r"""Process tool output through registered processors. + + Args: + tool_name: Name of the tool that produced the output. + tool_call_id: Unique identifier for the tool call. + raw_result: Raw output from the tool. + agent_id: ID of the agent that made the tool call. + timestamp: Timestamp of the tool call. + + Returns: + Processed tool output context. + """ + if timestamp is None: + timestamp = time.time() + + context = ToolOutputContext( + tool_name=tool_name, + tool_call_id=tool_call_id, + raw_result=raw_result, + agent_id=agent_id, + timestamp=timestamp, + ) + + # Apply all applicable processors + for processor in self.processors: + try: + if processor.can_process(context): + context = processor.process(context) + except Exception as e: + logger.warning( + f"Processor {processor.__class__.__name__} " + f"failed for tool {tool_name}: {e}" + ) + + return context diff --git a/examples/toolkits/hybrid_browser_toolkit_example.py b/examples/toolkits/hybrid_browser_toolkit_example.py index 28b7a3d62e..0b45dd66e9 100644 --- a/examples/toolkits/hybrid_browser_toolkit_example.py +++ b/examples/toolkits/hybrid_browser_toolkit_example.py @@ -84,11 +84,26 @@ # Limit snapshot to current viewport to reduce context ) print(f"Custom tools: {web_toolkit_custom.enabled_tools}") -# Use the custom toolkit for the actual task + +# Example 4: Use HybridBrowserToolkit with snapshot cleaning +# This example shows how to enable automatic cleaning of verbose DOM snapshots +# to reduce context usage while preserving essential information + +web_toolkit_with_cleaning = HybridBrowserToolkit( + headless=False, + user_data_dir=USER_DATA_DIR, + enabled_tools=custom_tools, + browser_log_to_file=True, + stealth=True, + viewport_limit=True, + clean_snapshots=True, # Enable automatic snapshot cleaning +) + +# Use the toolkit with cleaning for the actual task agent = ChatAgent( model=model_backend, - tools=[*web_toolkit_custom.get_tools()], - toolkits_to_register_agent=[web_toolkit_custom], + tools=[*web_toolkit_with_cleaning.get_tools()], + toolkits_to_register_agent=[web_toolkit_with_cleaning], max_iteration=10, ) @@ -109,13 +124,13 @@ async def main() -> None: response = await agent.astep(TASK_PROMPT) print("Task:", TASK_PROMPT) print(f"Using user data directory: {USER_DATA_DIR}") - print(f"Enabled tools: {web_toolkit_custom.enabled_tools}") + print(f"Enabled tools: {web_toolkit_with_cleaning.enabled_tools}") print("\nResponse from agent:") print(response.msgs[0].content if response.msgs else "") finally: # Ensure browser is closed properly print("\nClosing browser...") - await web_toolkit_custom.browser_close() + await web_toolkit_with_cleaning.browser_close() print("Browser closed successfully.") diff --git a/test/toolkits/test_hybrid_browser_toolkit.py b/test/toolkits/test_hybrid_browser_toolkit.py index eb521e3219..2e13077d05 100644 --- a/test/toolkits/test_hybrid_browser_toolkit.py +++ b/test/toolkits/test_hybrid_browser_toolkit.py @@ -19,6 +19,7 @@ import pytest from camel.toolkits.hybrid_browser_toolkit import HybridBrowserToolkit +from camel.toolkits.output_processors import SnapshotCleaningProcessor TEST_URL = "https://example.com" TEST_FILE_URL = "file:///test.html" @@ -634,3 +635,355 @@ async def test_simple_async_creation(self, browser_toolkit_fixture): assert "tabs" in result assert "current_tab" in result assert "total_tabs" in result + + +def add_snapshot_cleaning_to_toolkit(toolkit, enable_cleaning=True): + """Add snapshot cleaning to an existing HybridBrowserToolkit instance. + + Args: + toolkit: An existing HybridBrowserToolkit instance + enable_cleaning: Whether to enable snapshot cleaning + + Returns: + The same toolkit instance with snapshot cleaning enabled + """ + if enable_cleaning: + processor = SnapshotCleaningProcessor(enable_cleaning=True) + toolkit.register_output_processor(processor) + return toolkit + + +class TestHybridBrowserToolkitWithCleaning: + """Test cases for HybridBrowserToolkit with snapshot cleaning.""" + + def setup_method(self): + """Set up test fixtures.""" + # Mock the WebSocket wrapper to avoid actual browser operations + self.mock_ws_wrapper = AsyncMock() + self.mock_ws_wrapper.start = AsyncMock() + self.mock_ws_wrapper.get_page_snapshot = AsyncMock() + self.mock_ws_wrapper.click = AsyncMock() + self.mock_ws_wrapper.visit_page = AsyncMock() + self.mock_ws_wrapper.get_tab_info = AsyncMock() + + def test_toolkit_initialization_with_cleaning(self): + """Test that toolkit initializes with cleaning enabled.""" + with ( + patch( + 'camel.toolkits.hybrid_browser_toolkit.ws_wrapper.WebSocketBrowserWrapper', + return_value=mock_ws_wrapper, + ), + ): + # Create standard toolkit and add cleaning + toolkit = HybridBrowserToolkit(mode="typescript", headless=True) + toolkit = add_snapshot_cleaning_to_toolkit( + toolkit, enable_cleaning=True + ) + + # Should have registered the snapshot cleaning processor + processors = toolkit.output_manager.processors + assert len(processors) > 0 + + # Check that it's specifically a SnapshotCleaningProcessor + has_snapshot_processor = any( + isinstance(p, SnapshotCleaningProcessor) for p in processors + ) + assert has_snapshot_processor + + def test_toolkit_initialization_without_cleaning(self): + """Test that toolkit can be initialized without cleaning.""" + with ( + patch( + 'camel.toolkits.hybrid_browser_toolkit.ws_wrapper.WebSocketBrowserWrapper', + return_value=mock_ws_wrapper, + ), + ): + # Create standard toolkit without adding cleaning + toolkit = HybridBrowserToolkit(mode="typescript", headless=True) + + # Should have no processors + processors = toolkit.output_manager.processors + assert len(processors) == 0 + + @pytest.mark.asyncio + async def test_page_snapshot_with_cleaning(self): + """Test page snapshot with cleaning in a realistic scenario.""" + with patch( + 'camel.toolkits.hybrid_browser_toolkit.hybrid_browser_toolkit_ts.WebSocketBrowserWrapper' + ) as mock_ws_class: + # Setup mock + mock_ws_instance = AsyncMock() + mock_ws_class.return_value = mock_ws_instance + + # Mock realistic snapshot data with ref markers + mock_snapshot = """ + - button "Login" [ref=1] [class=btn primary] + - textbox "Username" [ref=2] [placeholder=Enter username] + - textbox "Password" [ref=3] [type=password] + - link "Forgot Password?" [ref=4] [href=/forgot] + - generic "Footer" [ref=5] [class=footer-content] + - link "Privacy Policy" [ref=6] [href=/privacy] + - link "Terms of Service" [ref=7] [href=/terms] + """ + + mock_ws_instance.get_page_snapshot.return_value = ( + mock_snapshot.strip() + ) + mock_ws_instance.start = AsyncMock() + + # Create toolkit with cleaning enabled + toolkit = HybridBrowserToolkit(mode="typescript", headless=True) + toolkit = add_snapshot_cleaning_to_toolkit( + toolkit, enable_cleaning=True + ) + + # Mock the _get_ws_wrapper method to return our mock + toolkit._get_ws_wrapper = AsyncMock(return_value=mock_ws_instance) + + # Get the raw snapshot + raw_snapshot = await toolkit.browser_get_page_snapshot() + + # Process it through the cleaning system + cleaned_context = toolkit.process_tool_output( + tool_name="browser_get_page_snapshot", + tool_call_id="test_snapshot_001", + raw_result=raw_snapshot, + agent_id="test_agent", + ) + + cleaned_snapshot = cleaned_context.raw_result + + # Verify cleaning worked + assert '[ref=' not in cleaned_snapshot + assert '[class=' not in cleaned_snapshot + assert '[href=' not in cleaned_snapshot + assert '[placeholder=' not in cleaned_snapshot + assert '[type=' not in cleaned_snapshot + + # Verify content is preserved + assert '"Login"' in cleaned_snapshot + assert '"Username"' in cleaned_snapshot + assert '"Password"' in cleaned_snapshot + assert '"Forgot Password?"' in cleaned_snapshot + assert '"Privacy Policy"' in cleaned_snapshot + + @pytest.mark.asyncio + async def test_browser_click_with_cleaning(self): + """Test browser click with snapshot cleaning.""" + with patch( + 'camel.toolkits.hybrid_browser_toolkit.hybrid_browser_toolkit_ts.WebSocketBrowserWrapper' + ) as mock_ws_class: + # Setup mock + mock_ws_instance = AsyncMock() + mock_ws_class.return_value = mock_ws_instance + + # Mock click response with snapshot + mock_click_response = { + "result": "Clicked successfully", + "snapshot": """ + - button "Submit" [ref=10] [disabled] + - generic "Success message: Form submitted!" [ref=11] + [class=alert success] + - link "Continue" [ref=12] [href=/dashboard] + """, + } + + mock_tab_info = [ + { + "id": "tab1", + "title": "Test Page", + "url": "https://example.com", + "is_current": True, + } + ] + + mock_ws_instance.click.return_value = mock_click_response + mock_ws_instance.get_tab_info.return_value = mock_tab_info + mock_ws_instance.start = AsyncMock() + + # Create toolkit + toolkit = HybridBrowserToolkit(mode="typescript", headless=True) + toolkit = add_snapshot_cleaning_to_toolkit( + toolkit, enable_cleaning=True + ) + toolkit._get_ws_wrapper = AsyncMock(return_value=mock_ws_instance) + + # Perform click + raw_result = await toolkit.browser_click(ref="1") + + # Process through cleaning + cleaned_context = toolkit.process_tool_output( + tool_name="browser_click", + tool_call_id="test_click_001", + raw_result=raw_result, + agent_id="test_agent", + ) + + cleaned_result = cleaned_context.raw_result + + # Check that snapshot field in the result was cleaned + if ( + isinstance(cleaned_result, dict) + and 'snapshot' in cleaned_result + ): + cleaned_snapshot = cleaned_result['snapshot'] + assert '[ref=' not in cleaned_snapshot + assert '[disabled]' not in cleaned_snapshot + assert '[class=' not in cleaned_snapshot + assert '[href=' not in cleaned_snapshot + + # Content should be preserved + assert '"Submit"' in cleaned_snapshot + assert '"Success message: Form submitted!"' in cleaned_snapshot + assert '"Continue"' in cleaned_snapshot + + def test_tool_registration(self): + """Test that the toolkit's tools are properly registered.""" + with ( + patch( + 'camel.toolkits.hybrid_browser_toolkit.ws_wrapper.WebSocketBrowserWrapper', + return_value=mock_ws_wrapper, + ), + ): + toolkit = HybridBrowserToolkit(mode="typescript", headless=True) + toolkit = add_snapshot_cleaning_to_toolkit( + toolkit, enable_cleaning=True + ) + + tools = toolkit.get_tools() + + # Should have default tools + assert len(tools) > 0 + + # Check for essential browser tools + tool_names = [tool.get_function_name() for tool in tools] + essential_tools = [ + 'browser_open', + 'browser_click', + 'browser_type', + 'browser_visit_page', + ] + + for essential_tool in essential_tools: + assert essential_tool in tool_names + + @pytest.mark.asyncio + async def test_performance_with_large_snapshot(self): + """Test performance with large snapshot data.""" + import time + + # Create a large snapshot for testing + large_snapshot_parts = [] + for i in range(500): # 500 elements + large_snapshot_parts.append( + f'- button "Button {i}" [ref={i}] [class=btn-{i}] ' + f'[data-id={i}]' + ) + + large_snapshot = '\n'.join(large_snapshot_parts) + + with ( + patch( + 'camel.toolkits.hybrid_browser_toolkit.ws_wrapper.WebSocketBrowserWrapper', + return_value=mock_ws_wrapper, + ), + ): + toolkit = HybridBrowserToolkit(mode="typescript", headless=True) + toolkit = add_snapshot_cleaning_to_toolkit( + toolkit, enable_cleaning=True + ) + + # Time the processing + start_time = time.time() + + cleaned_context = toolkit.process_tool_output( + tool_name="browser_get_page_snapshot", + tool_call_id="performance_test", + raw_result=large_snapshot, + agent_id="test_agent", + ) + + processing_time = time.time() - start_time + + # Should complete in reasonable time (< 1 second for 500 elements) + assert processing_time < 1.0 + + # Check that cleaning worked + cleaned_result = cleaned_context.raw_result + assert '[ref=' not in cleaned_result + assert '[class=' not in cleaned_result + assert '[data-id=' not in cleaned_result + + # Content should still be there + assert '"Button 0"' in cleaned_result + assert '"Button 499"' in cleaned_result + + # Size should be significantly reduced + original_size = len(large_snapshot) + cleaned_size = len(cleaned_result) + reduction_ratio = (original_size - cleaned_size) / original_size + + # Should have at least 30% size reduction + assert reduction_ratio > 0.3 + + def test_backward_compatibility(self): + """Test that existing HybridBrowserToolkit works unchanged.""" + with ( + patch( + 'camel.toolkits.hybrid_browser_toolkit.ws_wrapper.WebSocketBrowserWrapper', + return_value=mock_ws_wrapper, + ), + ): + # Standard HybridBrowserToolkit should work without any issues + standard_toolkit = HybridBrowserToolkit( + mode="typescript", headless=True + ) + + # Should have the output manager (from BaseToolkit) + assert hasattr(standard_toolkit, 'output_manager') + + # But no processors registered by default + assert len(standard_toolkit.output_manager.processors) == 0 + + # Should be able to manually add processor + processor = SnapshotCleaningProcessor(enable_cleaning=True) + standard_toolkit.register_output_processor(processor) + + # Now should have one processor + assert len(standard_toolkit.output_manager.processors) == 1 + + +class TestIntegrationExample: + """Integration test showing how to use in real scenarios.""" + + def test_usage_example_documentation(self): + """Document the proper usage pattern.""" + with ( + patch( + 'camel.toolkits.hybrid_browser_toolkit.ws_wrapper.WebSocketBrowserWrapper', + return_value=mock_ws_wrapper, + ), + ): + # Example 1: Add cleaning to existing toolkit instance + toolkit1 = HybridBrowserToolkit(mode="typescript", headless=True) + toolkit1 = add_snapshot_cleaning_to_toolkit( + toolkit1, enable_cleaning=True + ) + + assert len(toolkit1.output_manager.processors) == 1 + + # Example 2: Create toolkit and add cleaning in one step + toolkit2 = add_snapshot_cleaning_to_toolkit( + HybridBrowserToolkit(mode="typescript", headless=True), + enable_cleaning=True, + ) + + assert len(toolkit2.output_manager.processors) == 1 + + # Example 3: Manual processor registration + toolkit3 = HybridBrowserToolkit(mode="typescript", headless=True) + toolkit3.register_output_processor( + SnapshotCleaningProcessor(enable_cleaning=True) + ) + + assert len(toolkit3.output_manager.processors) == 1