Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 303 additions & 60 deletions camel/agents/chat_agent.py

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions camel/agents/mcp_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
cast,
)

from camel.agents.chat_agent import ChatAgent
from camel.agents.chat_agent import AsyncStreamingChatAgentResponse, ChatAgent
from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.models.base_model import BaseModelBackend
Expand Down Expand Up @@ -345,11 +345,16 @@ async def astep(
await self.connect()

if self.function_calling_available:
return await super().astep(input_message, *args, **kwargs)
response = await super().astep(input_message, *args, **kwargs)
if isinstance(response, AsyncStreamingChatAgentResponse):
return await response
return response
else:
task = f"## Task:\n {input_message}"
input_message = str(self._text_tools) + task
response = await super().astep(input_message, *args, **kwargs)
if isinstance(response, AsyncStreamingChatAgentResponse):
response = await response
raw_content = response.msgs[0].content if response.msgs else ""
content = (
raw_content
Expand Down
80 changes: 37 additions & 43 deletions camel/models/anthropic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,14 @@
BaseTokenCounter,
api_keys_required,
dependencies_required,
get_current_agent_session_id,
update_langfuse_trace,
update_current_observation,
)

ANTHROPIC_BETA_FOR_STRUCTURED_OUTPUTS = "structured-outputs-2025-11-13"

if os.environ.get("LANGFUSE_ENABLED", "False").lower() == "true":
try:
from langfuse.decorators import observe
except ImportError:
from camel.utils import observe
elif os.environ.get("TRACEROOT_ENABLED", "False").lower() == "true":
try:
from traceroot import trace as observe # type: ignore[import]
from langfuse import observe
except ImportError:
from camel.utils import observe
else:
Expand Down Expand Up @@ -213,7 +207,7 @@ def token_counter(self) -> BaseTokenCounter:
"""
if not self._token_counter:
self._token_counter = AnthropicTokenCounter(
model=str(self.model_type),
model=self._get_model_name(),
api_key=self._api_key,
base_url=self._url,
)
Expand Down Expand Up @@ -665,7 +659,25 @@ def _convert_openai_tools_to_anthropic(

return anthropic_tools

@observe()
def _start_generation_observation(
self,
messages: List[OpenAIMessage],
tools: Optional[List[Dict[str, Any]]],
) -> str:
r"""Initialize Langfuse generation observation and trace metadata."""
model_name = self._get_model_name()
update_current_observation(
input={
"messages": messages,
"tools": tools,
},
model=model_name,
model_parameters=self.model_config_dict,
)
self._log_and_trace()
return model_name

@observe(name="anthropic_model_run", as_type="generation")
def _run(
self,
messages: List[OpenAIMessage],
Expand Down Expand Up @@ -696,17 +708,7 @@ def _run(
stacklevel=2,
)

# Update Langfuse trace with current agent session and metadata
agent_session_id = get_current_agent_session_id()
if agent_session_id:
update_langfuse_trace(
session_id=agent_session_id,
metadata={
"agent_id": agent_session_id,
"model_type": str(self.model_type),
},
tags=["CAMEL-AI", str(self.model_type)],
)
model_name = self._start_generation_observation(messages, tools)

# Strip trailing whitespace from messages
processed_messages = strip_trailing_whitespace_from_messages(messages)
Expand All @@ -718,7 +720,7 @@ def _run(

# Prepare request parameters
request_params: Dict[str, Any] = {
"model": str(self.model_type),
"model": model_name,
"messages": anthropic_messages,
"max_tokens": self.model_config_dict.get("max_tokens", 4096),
}
Expand Down Expand Up @@ -782,15 +784,17 @@ def _run(
if is_streaming:
# Return streaming response
stream = create_func(**request_params, stream=True)
return self._wrap_anthropic_stream(stream, str(self.model_type))
return self._wrap_anthropic_stream(stream, model_name)
else:
# Return non-streaming response
response = create_func(**request_params)
return self._convert_anthropic_to_openai_response(
response, str(self.model_type)
openai_response = self._convert_anthropic_to_openai_response(
response, model_name
)
update_current_observation(usage_details=openai_response.usage)
return openai_response

@observe()
@observe(name="anthropic_model_run_async", as_type="generation")
async def _arun(
self,
messages: List[OpenAIMessage],
Expand Down Expand Up @@ -821,17 +825,7 @@ async def _arun(
stacklevel=2,
)

# Update Langfuse trace with current agent session and metadata
agent_session_id = get_current_agent_session_id()
if agent_session_id:
update_langfuse_trace(
session_id=agent_session_id,
metadata={
"agent_id": agent_session_id,
"model_type": str(self.model_type),
},
tags=["CAMEL-AI", str(self.model_type)],
)
model_name = self._start_generation_observation(messages, tools)

# Strip trailing whitespace from messages
processed_messages = strip_trailing_whitespace_from_messages(messages)
Expand All @@ -843,7 +837,7 @@ async def _arun(

# Prepare request parameters
request_params: Dict[str, Any] = {
"model": str(self.model_type),
"model": model_name,
"messages": anthropic_messages,
"max_tokens": self.model_config_dict.get("max_tokens", 4096),
}
Expand Down Expand Up @@ -907,15 +901,15 @@ async def _arun(
if is_streaming:
# Return streaming response
stream = await create_func(**request_params, stream=True)
return self._wrap_anthropic_async_stream(
stream, str(self.model_type)
)
return self._wrap_anthropic_async_stream(stream, model_name)
else:
# Return non-streaming response
response = await create_func(**request_params)
return self._convert_anthropic_to_openai_response(
response, str(self.model_type)
openai_response = self._convert_anthropic_to_openai_response(
response, model_name
)
update_current_observation(usage_details=openai_response.usage)
return openai_response

def _wrap_anthropic_stream(
self, stream: Any, model: str
Expand Down
23 changes: 9 additions & 14 deletions camel/models/azure_openai_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@

if os.environ.get("LANGFUSE_ENABLED", "False").lower() == "true":
try:
from langfuse.decorators import observe
except ImportError:
from camel.utils import observe
elif os.environ.get("TRACEROOT_ENABLED", "False").lower() == "true":
try:
from traceroot import trace as observe # type: ignore[import]
from langfuse import observe
except ImportError:
from camel.utils import observe
else:
Expand Down Expand Up @@ -244,7 +239,7 @@ def token_counter(self) -> BaseTokenCounter:
self._token_counter = OpenAITokenCounter(self.model_type)
return self._token_counter

@observe()
@observe(name="azure_openai_model_run")
def _run(
self,
messages: List[OpenAIMessage],
Expand Down Expand Up @@ -290,7 +285,7 @@ def _run(

return result

@observe()
@observe(name="azure_openai_model_run_async")
async def _arun(
self,
messages: List[OpenAIMessage],
Expand Down Expand Up @@ -348,7 +343,7 @@ def _request_chat_completion(

return self._client.chat.completions.create(
messages=messages,
model=str(self.model_type),
model=self._get_model_name(),
**request_config,
)

Expand All @@ -361,7 +356,7 @@ async def _arequest_chat_completion(

return await self._async_client.chat.completions.create(
messages=messages,
model=str(self.model_type),
model=self._get_model_name(),
**request_config,
)

Expand All @@ -379,7 +374,7 @@ def _request_parse(

return self._client.beta.chat.completions.parse(
messages=messages,
model=str(self.model_type),
model=self._get_model_name(),
**request_config,
)

Expand All @@ -397,7 +392,7 @@ async def _arequest_parse(

return await self._async_client.beta.chat.completions.parse(
messages=messages,
model=str(self.model_type),
model=self._get_model_name(),
**request_config,
)

Expand All @@ -418,7 +413,7 @@ def _request_stream_parse(
# Use the beta streaming API for structured outputs
return self._client.beta.chat.completions.stream(
messages=messages,
model=str(self.model_type),
model=self._get_model_name(),
response_format=response_format,
**request_config,
)
Expand All @@ -440,7 +435,7 @@ async def _arequest_stream_parse(
# Use the beta streaming API for structured outputs
return self._async_client.beta.chat.completions.stream(
messages=messages,
model=str(self.model_type),
model=self._get_model_name(),
response_format=response_format,
**request_config,
)
Expand Down
Loading
Loading