Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,10 @@ class ChatAgent(BaseAgent):
directly return the request instead of processing it.
(default: :obj:`None`)
response_terminators (List[ResponseTerminator], optional): List of
:obj:`ResponseTerminator` bind to one chat agent.
:obj:`ResponseTerminator` to check if task is complete. When set,
the agent will keep prompting the model until a terminator signals
completion. Note: You must define the termination signal (e.g.,
a keyword) in your system prompt so the model knows what to output.
(default: :obj:`None`)
scheduling_strategy (str): name of function that defines how to select
the next model in ModelManager. (default: :str:`round_robin`)
Expand Down Expand Up @@ -2792,6 +2795,11 @@ def _step_impl(
response_format: Optional[Type[BaseModel]] = None,
) -> ChatAgentResponse:
r"""Implementation of non-streaming step logic."""
# Set agent_id in context-local storage for logging
from camel.utils.agent_context import set_current_agent_id

set_current_agent_id(self.agent_id)

# Set Langfuse session_id using agent_id for trace grouping
try:
from camel.utils.langfuse import set_current_agent_session_id
Expand Down Expand Up @@ -2923,6 +2931,43 @@ def _step_impl(
# If we're still here, continue the loop
continue

# No tool calls - check if we should terminate based on terminators
if self.response_terminators:
# Check terminators to see if task is complete
termination_results = [
terminator.is_terminated(response.output_messages)
for terminator in self.response_terminators
]
should_terminate = any(
terminated for terminated, _ in termination_results
)

if should_terminate:
# Task is complete, exit the loop
break

# Task not complete - prompt the model to continue
if (
self.max_iteration is not None
and iteration_count >= self.max_iteration
):
logger.warning(
f"Max iteration {self.max_iteration} reached without "
"termination signal"
)
break

# Add a continuation prompt to memory as a user message
continue_message = BaseMessage(
role_name="user",
role_type=RoleType.USER,
content="Please continue.",
meta_dict={},
)
self.update_memory(continue_message, OpenAIBackendRole.USER)
continue

# No terminators configured, use original behavior
break

self._format_response_if_needed(response, response_format)
Expand Down Expand Up @@ -2986,6 +3031,10 @@ async def astep(
asyncio.TimeoutError: If the step operation exceeds the configured
timeout.
"""
# Set agent_id in context-local storage for logging
from camel.utils.agent_context import set_current_agent_id

set_current_agent_id(self.agent_id)

try:
from camel.utils.langfuse import set_current_agent_session_id
Expand Down Expand Up @@ -3023,6 +3072,10 @@ async def _astep_non_streaming_task(
response_format: Optional[Type[BaseModel]] = None,
) -> ChatAgentResponse:
r"""Internal async method for non-streaming astep logic."""
# Set agent_id in context-local storage for logging
from camel.utils.agent_context import set_current_agent_id

set_current_agent_id(self.agent_id)

try:
from camel.utils.langfuse import set_current_agent_session_id
Expand Down Expand Up @@ -3153,6 +3206,43 @@ async def _astep_non_streaming_task(
# If we're still here, continue the loop
continue

# No tool calls - check if we should terminate based on terminators
if self.response_terminators:
# Check terminators to see if task is complete
termination_results = [
terminator.is_terminated(response.output_messages)
for terminator in self.response_terminators
]
should_terminate = any(
terminated for terminated, _ in termination_results
)

if should_terminate:
# Task is complete, exit the loop
break

# Task not complete - prompt the model to continue
if (
self.max_iteration is not None
and iteration_count >= self.max_iteration
):
logger.warning(
f"Max iteration {self.max_iteration} reached without "
"termination signal"
)
break

# Add a continuation prompt to memory as a user message
continue_message = BaseMessage(
role_name="user",
role_type=RoleType.USER,
content="Please continue.",
meta_dict={},
)
self.update_memory(continue_message, OpenAIBackendRole.USER)
continue

# No terminators configured, use original behavior
break

await self._aformat_response_if_needed(response, response_format)
Expand Down Expand Up @@ -3240,10 +3330,15 @@ def _record_final_output(self, output_messages: List[BaseMessage]) -> None:
r"""Log final messages or warnings about multiple responses."""
if len(output_messages) == 1:
self.record_message(output_messages[0])
elif len(output_messages) == 0:
logger.warning(
"No messages returned in `step()`. The model returned an "
"empty response."
)
else:
logger.warning(
"Multiple messages returned in `step()`. Record "
"selected message manually using `record_message()`."
f"{len(output_messages)} messages returned in `step()`. "
"Record selected message manually using `record_message()`."
)

@observe()
Expand Down
19 changes: 17 additions & 2 deletions camel/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,29 @@ def _log_request(self, messages: List[OpenAIMessage]) -> Optional[str]:
import json
from datetime import datetime

os.makedirs(self._log_dir, exist_ok=True)
from camel.utils.agent_context import get_current_agent_id

agent_id = get_current_agent_id()

# Remove _context_summarizer suffix to keep all logs in one directory
log_agent_id = agent_id
if agent_id and agent_id.endswith("_context_summarizer"):
log_agent_id = agent_id[: -len("_context_summarizer")]

log_subdir = (
os.path.join(self._log_dir, log_agent_id)
if log_agent_id
else self._log_dir
)
os.makedirs(log_subdir, exist_ok=True)

timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
log_file_path = os.path.join(self._log_dir, f"conv_{timestamp}.json")
log_file_path = os.path.join(log_subdir, f"conv_{timestamp}.json")

log_entry = {
"request_timestamp": datetime.now().isoformat(),
"model": str(self.model_type),
"agent_id": agent_id,
"request": {"messages": messages},
}

Expand Down
4 changes: 2 additions & 2 deletions camel/types/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,8 +1329,6 @@ def token_limit(self) -> int:
ModelType.CEREBRAS_GPT_OSS_120B,
ModelType.CEREBRAS_LLAMA_3_3_70B,
ModelType.CEREBRAS_QWEN_3_32B,
ModelType.DEEPSEEK_CHAT,
ModelType.DEEPSEEK_REASONER,
ModelType.PPIO_DEEPSEEK_R1_TURBO,
ModelType.PPIO_DEEPSEEK_V3_TURBO,
ModelType.PPIO_DEEPSEEK_R1_COMMUNITY,
Expand Down Expand Up @@ -1373,6 +1371,8 @@ def token_limit(self) -> int:
}:
return 100_000
elif self in {
ModelType.DEEPSEEK_CHAT,
ModelType.DEEPSEEK_REASONER,
ModelType.GPT_4O,
ModelType.GPT_4O_MINI,
ModelType.GPT_4_TURBO,
Expand Down
3 changes: 3 additions & 0 deletions camel/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========

from .agent_context import get_current_agent_id, set_current_agent_id
from .commons import (
AgentOpsMeta,
BatchProcessor,
Expand Down Expand Up @@ -113,4 +114,6 @@
"observe",
"update_current_observation",
"get_langfuse_status",
"get_current_agent_id",
"set_current_agent_id",
]
41 changes: 41 additions & 0 deletions camel/utils/agent_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
from contextvars import ContextVar
from typing import Optional

_agent_id_var: ContextVar[Optional[str]] = ContextVar('agent_id', default=None)


def set_current_agent_id(agent_id: str) -> None:
r"""Set the current agent ID in context-local storage.

This is safe to use in both sync and async contexts.
In async contexts, each coroutine maintains its own value.

Args:
agent_id (str): The agent ID to set.
"""
_agent_id_var.set(agent_id)


def get_current_agent_id() -> Optional[str]:
r"""Get the current agent ID from context-local storage.

This is safe to use in both sync and async contexts.
In async contexts, returns the value for the current coroutine.

Returns:
Optional[str]: The agent ID if set, None otherwise.
"""
return _agent_id_var.get()
24 changes: 16 additions & 8 deletions camel/utils/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
import os
import threading
from contextvars import ContextVar
from typing import Any, Dict, List, Optional

from camel.logger import get_logger
from camel.utils import dependencies_required

logger = get_logger(__name__)
# Thread-local storage for agent session IDs
_local = threading.local()

_agent_session_id_var: ContextVar[Optional[str]] = ContextVar(
'agent_session_id', default=None
)

# Global flag to track if Langfuse has been configured
_langfuse_configured = False
Expand Down Expand Up @@ -85,6 +87,7 @@ def configure_langfuse(
if not enabled:
_langfuse_configured = False
logger.info("Langfuse tracing disabled for CAMEL models")
return

logger.debug(
f"Configuring Langfuse - enabled: {enabled}, "
Expand Down Expand Up @@ -118,23 +121,28 @@ def is_langfuse_available() -> bool:


def set_current_agent_session_id(session_id: str) -> None:
r"""Set the session ID for the current agent in thread-local storage.
r"""Set the session ID for the current agent in context-local storage.

This is safe to use in both sync and async contexts.
In async contexts, each coroutine maintains its own value.

Args:
session_id(str): The session ID to set for the current agent.
"""

_local.agent_session_id = session_id
_agent_session_id_var.set(session_id)


def get_current_agent_session_id() -> Optional[str]:
r"""Get the session ID for the current agent from thread-local storage.
r"""Get the session ID for the current agent from context-local storage.

This is safe to use in both sync and async contexts.
In async contexts, returns the value for the current coroutine.

Returns:
Optional[str]: The session ID for the current agent.
"""
if is_langfuse_available():
return getattr(_local, 'agent_session_id', None)
return _agent_session_id_var.get()
return None


Expand Down
Loading
Loading