Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ body:
attributes:
label: What version of camel are you using?
description: Run command `python3 -c 'print(__import__("camel").__version__)'` in your shell and paste the output here.
placeholder: E.g., 0.2.83a9
placeholder: E.g., 0.2.83a7
validations:
required: true

Expand Down
2 changes: 1 addition & 1 deletion camel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from camel.logger import disable_logging, enable_logging, set_log_level

__version__ = '0.2.83a9'
__version__ = '0.2.83a7'

__all__ = [
'__version__',
Expand Down
196 changes: 171 additions & 25 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4203,6 +4203,7 @@ def _stream_response(
(
stream_completed,
tool_calls_complete,
external_tool_requests,
) = yield from self._process_stream_chunks_with_accumulator(
response, # type: ignore[arg-type]
content_accumulator,
Expand All @@ -4216,6 +4217,59 @@ def _stream_response(
# Clear completed tool calls
accumulated_tool_calls.clear()

# If we have external tool requests, create final response
# and break
if external_tool_requests:
final_content = content_accumulator.get_full_content()
final_reasoning = (
content_accumulator.get_full_reasoning_content()
or None
)

# Convert external tool requests
# to OpenAI tool calls format
tool_calls_meta = []
for req in external_tool_requests:
tool_calls_meta.append(
{
'id': req.tool_call_id,
'type': 'function',
'function': {
'name': req.tool_name,
'arguments': json.dumps(req.args),
},
}
)

# Create message for response
final_message = BaseMessage(
role_name=self.role_name,
role_type=self.role_type,
meta_dict={"tool_calls": tool_calls_meta},
content=final_content,
reasoning_content=final_reasoning,
)

# Create final response with external tool requests
final_response = ChatAgentResponse(
msgs=[final_message],
terminated=False,
info={
"id": "",
"usage": step_token_usage.copy(),
"finish_reasons": ["tool_calls"],
"num_tokens": num_tokens,
"tool_calls": [],
"external_tool_call_requests": (
external_tool_requests
),
"streaming": True,
"partial": False,
},
)
yield final_response
break

# If we executed tools and not in
# single iteration mode, continue
if tool_call_records and (
Expand Down Expand Up @@ -4357,8 +4411,19 @@ def _process_stream_chunks_with_accumulator(
tool_call_records: List[ToolCallingRecord],
step_token_usage: Dict[str, int],
response_format: Optional[Type[BaseModel]] = None,
) -> Generator[ChatAgentResponse, None, Tuple[bool, bool]]:
r"""Process streaming chunks with content accumulator."""
) -> Generator[
ChatAgentResponse,
None,
Tuple[bool, bool, Optional[List[ToolCallRequest]]],
]:
r"""Process streaming chunks with content accumulator.

Returns:
Generator that yields ChatAgentResponse and returns a tuple of:
- stream_completed: whether the stream has finished
- tool_calls_complete: whether tool calls are complete
- external_tool_requests: list of external tool requests if any
"""

tool_calls_complete = False
stream_completed = False
Expand Down Expand Up @@ -4418,16 +4483,56 @@ def _process_stream_chunks_with_accumulator(

# If we have complete tool calls, execute them with
# sync status updates
external_tool_requests: Optional[List[ToolCallRequest]] = (
None
)
if accumulated_tool_calls:
# Execute tools synchronously with
# optimized status updates
for (
status_response
) in self._execute_tools_sync_with_status_accumulator(
external_tool_requests = yield from self._execute_tools_sync_with_status_accumulator( # noqa: E501
accumulated_tool_calls,
tool_call_records,
):
yield status_response
)

# If we found external tools, record the assistant's
# tool call message and return
if external_tool_requests:
# Create assistant message with tool calls
tool_calls_list = []
for req in external_tool_requests:
tool_calls_list.append(
{
'id': req.tool_call_id,
'type': 'function',
'function': {
'name': req.tool_name,
'arguments': json.dumps(req.args),
},
}
)

# Record assistant message with tool calls
assist_msg = FunctionCallingMessage(
role_name=self.role_name,
role_type=self.role_type,
meta_dict=None,
content="",
func_name=external_tool_requests[0].tool_name,
args=external_tool_requests[0].args,
tool_call_id=external_tool_requests[
0
].tool_call_id,
)
self.update_memory(
assist_msg, OpenAIBackendRole.ASSISTANT
)

# Return with external tool requests flag
return (
stream_completed,
True,
external_tool_requests,
)

# Log sending status instead of adding to content
if tool_call_records:
Expand Down Expand Up @@ -4518,7 +4623,7 @@ def _process_stream_chunks_with_accumulator(
# consuming remaining chunks to capture final metadata.
continue

return stream_completed, tool_calls_complete
return stream_completed, tool_calls_complete, None

def _accumulate_tool_calls(
self,
Expand Down Expand Up @@ -4625,45 +4730,86 @@ def _accumulate_tool_calls(
# Skip internal mapping key
if _index == '_index_to_key_map':
continue
# Check if tool call has all required fields
if (
tool_call_entry['id']
and tool_call_entry['function']['name']
and tool_call_entry['function']['arguments']
and tool_call_entry['function']['name'] in self._internal_tools
):
try:
# Try to parse arguments to check completeness
json.loads(tool_call_entry['function']['arguments'])
tool_call_entry['complete'] = True
any_complete = True
except json.JSONDecodeError:
# Arguments not complete yet
tool_call_entry['complete'] = False
# Check if it's either internal or external tool
function_name = tool_call_entry['function']['name']
is_known_tool = (
function_name in self._internal_tools
or function_name in self._external_tool_schemas
)

if is_known_tool:
try:
# Try to parse arguments to check completeness
json.loads(tool_call_entry['function']['arguments'])
tool_call_entry['complete'] = True
any_complete = True
except json.JSONDecodeError:
# Arguments not complete yet
tool_call_entry['complete'] = False

return any_complete

def _execute_tools_sync_with_status_accumulator(
self,
accumulated_tool_calls: Dict[str, Any],
tool_call_records: List[ToolCallingRecord],
) -> Generator[ChatAgentResponse, None, None]:
) -> Generator[ChatAgentResponse, None, Optional[List[ToolCallRequest]]]:
r"""Execute multiple tools synchronously with proper content
accumulation, using ThreadPoolExecutor for better timeout handling."""
accumulation, using ThreadPoolExecutor for better timeout handling.

Returns:
Generator that yields ChatAgentResponse for status updates, and
finally returns Optional[List[ToolCallRequest]] for external tools.
"""

tool_calls_to_execute = []
external_tool_call_requests: List[ToolCallRequest] = []

for _tool_call_index, tool_call_data in accumulated_tool_calls.items():
# Skip internal mapping key
if _tool_call_index == '_index_to_key_map':
continue
if tool_call_data.get('complete', False):
tool_calls_to_execute.append(tool_call_data)
function_name = tool_call_data['function']['name']

# Check if this is an external tool
if function_name in self._external_tool_schemas:
try:
args = json.loads(
tool_call_data['function']['arguments']
)
except json.JSONDecodeError:
args = tool_call_data['function']['arguments']

# Create external tool request
external_tool_call_requests.append(
ToolCallRequest(
tool_name=function_name,
args=args,
tool_call_id=tool_call_data['id'],
)
)
else:
# This is an internal tool
tool_calls_to_execute.append(tool_call_data)

# If we found external tool calls, return them immediately
if external_tool_call_requests:
return external_tool_call_requests
yield # Make this a generator

if not tool_calls_to_execute:
# No tools to execute, return immediately
return
# No internal tools to execute, return immediately
return None
yield # Make this a generator

# Execute tools using ThreadPoolExecutor for proper timeout handling
# Execute internal tools using ThreadPoolExecutor
# Use max_workers=len() for parallel execution, with min of 1
with concurrent.futures.ThreadPoolExecutor(
max_workers=max(1, len(tool_calls_to_execute))
Expand Down Expand Up @@ -4715,8 +4861,8 @@ def _execute_tools_sync_with_status_accumulator(
f"Error executing tool '{function_name}': {e}"
)

# Ensure this function remains a generator (required by type signature)
return
# No external tools found, return None
return None
yield # This line is never reached but makes this a generator function

def _execute_tool_from_stream_data(
Expand Down
52 changes: 49 additions & 3 deletions camel/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self, log_path: Optional[str], log_enabled: bool):
self._finish_reason: Optional[str] = None
self._usage: Optional[Dict[str, Any]] = None
self._logged = False
self._tool_calls_acc: Dict[int, Dict[str, Any]] = {}

def _collect(self, chunk: ChatCompletionChunk) -> None:
self._id = self._id or getattr(chunk, 'id', '')
Expand All @@ -87,8 +88,32 @@ def _collect(self, chunk: ChatCompletionChunk) -> None:
)
if chunk.choices:
choice = chunk.choices[0]
if choice.delta and choice.delta.content:
self._content += choice.delta.content
if choice.delta:
if choice.delta.content:
self._content += choice.delta.content
if choice.delta.tool_calls:
for tc in choice.delta.tool_calls:
idx = tc.index
if idx not in self._tool_calls_acc:
self._tool_calls_acc[idx] = {
'id': '',
'type': 'function',
'function': {'name': '', 'arguments': ''},
}

acc = self._tool_calls_acc[idx]
if tc.id:
acc['id'] = tc.id
if tc.type:
acc['type'] = tc.type
if tc.function:
if tc.function.name:
acc['function']['name'] += tc.function.name
if tc.function.arguments:
acc['function']['arguments'] += (
tc.function.arguments
)

if choice.finish_reason:
self._finish_reason = choice.finish_reason

Expand All @@ -103,14 +128,35 @@ def _log(self) -> None:
with open(self._log_path, "r+") as f:
data = json.load(f)
data["response_timestamp"] = datetime.now().isoformat()
data["response"] = {

response_data: Dict[
str,
Union[
str,
bool,
Dict[str, Any],
List[Dict[str, Any]],
None,
],
] = {
"id": self._id,
"model": self._model,
"content": self._content,
"finish_reason": self._finish_reason,
"usage": self._usage,
"streaming": True,
}

if self._tool_calls_acc:
# Sort by index and convert to list
tool_calls = [
self._tool_calls_acc[i]
for i in sorted(self._tool_calls_acc.keys())
]
response_data["tool_calls"] = tool_calls

data["response"] = response_data

f.seek(0)
json.dump(data, f, indent=4)
f.truncate()
Expand Down
Loading
Loading