Skip to content

Commit f15dace

Browse files
chore: fix_stream_parallel_tool_call (#3662)
Co-authored-by: Wendong-Fan <[email protected]> Co-authored-by: Wendong-Fan <[email protected]>
1 parent b041b9e commit f15dace

File tree

5 files changed

+169
-63
lines changed

5 files changed

+169
-63
lines changed

camel/agents/chat_agent.py

Lines changed: 112 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,12 @@ class ChatAgent(BaseAgent):
448448
step_timeout (Optional[float], optional): Timeout in seconds for the
449449
entire step operation. If None, no timeout is applied.
450450
(default: :obj:`None`)
451-
stream_accumulate (bool, optional): When True, partial streaming
452-
updates return accumulated content (current behavior). When False,
453-
partial updates return only the incremental delta. (default:
454-
:obj:`True`)
451+
stream_accumulate (Optional[bool], optional): When True, partial
452+
streaming updates return accumulated content. When False, partial
453+
updates return only the incremental delta (recommended).
454+
If None, defaults to False with a deprecation warning for users
455+
who previously relied on the old default (True).
456+
(default: :obj:`None`, which behaves as :obj:`False`)
455457
summary_window_ratio (float, optional): Maximum fraction of the total
456458
context window that can be occupied by summary information. Used
457459
to limit how much of the model's context is reserved for
@@ -501,7 +503,7 @@ def __init__(
501503
retry_attempts: int = 3,
502504
retry_delay: float = 1.0,
503505
step_timeout: Optional[float] = Constants.TIMEOUT_THRESHOLD,
504-
stream_accumulate: bool = True,
506+
stream_accumulate: Optional[bool] = None,
505507
summary_window_ratio: float = 0.6,
506508
) -> None:
507509
if isinstance(model, ModelManager):
@@ -615,7 +617,13 @@ def __init__(
615617
self.step_timeout = step_timeout
616618
self._context_utility: Optional[ContextUtility] = None
617619
self._context_summary_agent: Optional["ChatAgent"] = None
618-
self.stream_accumulate = stream_accumulate
620+
621+
# Store whether user explicitly set stream_accumulate
622+
# Warning will be issued only when streaming is actually used
623+
self._stream_accumulate_explicit = stream_accumulate is not None
624+
self.stream_accumulate = (
625+
stream_accumulate if stream_accumulate is not None else False
626+
)
619627
self._last_tool_call_record: Optional[ToolCallingRecord] = None
620628
self._last_tool_call_signature: Optional[str] = None
621629
self.summary_window_ratio = summary_window_ratio
@@ -4020,6 +4028,28 @@ def _get_token_count(self, content: str) -> int:
40204028
# Conservative estimate: ~3 chars per token
40214029
return len(content) // 3
40224030

4031+
def _warn_stream_accumulate_deprecation(self) -> None:
4032+
r"""Issue deprecation warning for stream_accumulate default change.
4033+
4034+
Only warns once per agent instance, and only if the user didn't
4035+
explicitly set stream_accumulate.
4036+
"""
4037+
if not self._stream_accumulate_explicit:
4038+
import warnings
4039+
4040+
warnings.warn(
4041+
"The default value of 'stream_accumulate' has changed from "
4042+
"True to False. In streaming mode, each chunk now returns "
4043+
"only the incremental delta instead of accumulated content. "
4044+
"To suppress this warning, explicitly set "
4045+
"stream_accumulate=False (recommended) or stream_accumulate="
4046+
"True if you need the old behavior.",
4047+
DeprecationWarning,
4048+
stacklevel=5,
4049+
)
4050+
# Only warn once per agent instance
4051+
self._stream_accumulate_explicit = True
4052+
40234053
def _stream_response(
40244054
self,
40254055
openai_messages: List[OpenAIMessage],
@@ -4028,6 +4058,8 @@ def _stream_response(
40284058
) -> Generator[ChatAgentResponse, None, None]:
40294059
r"""Internal method to handle streaming responses with tool calls."""
40304060

4061+
self._warn_stream_accumulate_deprecation()
4062+
40314063
tool_call_records: List[ToolCallingRecord] = []
40324064
accumulated_tool_calls: Dict[str, Any] = {}
40334065
step_token_usage = self._create_token_usage_tracker()
@@ -4346,12 +4378,20 @@ def _process_stream_chunks_with_accumulator(
43464378
content_accumulator.get_full_reasoning_content()
43474379
or None
43484380
)
4381+
# In delta mode, final response content should be empty
4382+
# since all content was already yielded incrementally
4383+
display_content = (
4384+
final_content if self.stream_accumulate else ""
4385+
)
4386+
display_reasoning = (
4387+
final_reasoning if self.stream_accumulate else None
4388+
)
43494389
final_message = BaseMessage(
43504390
role_name=self.role_name,
43514391
role_type=self.role_type,
43524392
meta_dict={},
4353-
content=final_content,
4354-
reasoning_content=final_reasoning,
4393+
content=display_content,
4394+
reasoning_content=display_reasoning,
43554395
)
43564396

43574397
if response_format:
@@ -4402,21 +4442,60 @@ def _accumulate_tool_calls(
44024442
bool: True if any tool call is complete, False otherwise.
44034443
"""
44044444

4445+
index_map_key = '_index_to_key_map'
4446+
if index_map_key not in accumulated_tool_calls:
4447+
accumulated_tool_calls[index_map_key] = {}
4448+
index_map = accumulated_tool_calls[index_map_key]
4449+
44054450
for delta_tool_call in tool_call_deltas:
4406-
index = delta_tool_call.index
4451+
index = getattr(delta_tool_call, 'index', None)
44074452
tool_call_id = getattr(delta_tool_call, 'id', None)
44084453

4454+
# Determine entry key
4455+
if index is not None:
4456+
index_str = str(index)
4457+
if tool_call_id:
4458+
# New ID provided: check if it differs from current mapping
4459+
current_key = index_map.get(index_str)
4460+
if current_key is None:
4461+
# First time seeing this index, use tool_call_id as key
4462+
entry_key = tool_call_id
4463+
elif current_key in accumulated_tool_calls:
4464+
existing_id = accumulated_tool_calls[current_key].get(
4465+
'id'
4466+
)
4467+
if existing_id and existing_id != tool_call_id:
4468+
# ID changed: use new ID as key
4469+
entry_key = tool_call_id
4470+
else:
4471+
# No existing ID or same ID: keep current key
4472+
entry_key = current_key
4473+
else:
4474+
entry_key = current_key
4475+
# Update mapping
4476+
index_map[index_str] = entry_key
4477+
else:
4478+
# No ID in this chunk: use existing mapping or index as
4479+
# string
4480+
entry_key = index_map.get(index_str, index_str)
4481+
if index_str not in index_map:
4482+
index_map[index_str] = entry_key
4483+
elif tool_call_id is not None:
4484+
entry_key = tool_call_id
4485+
else:
4486+
entry_key = '0' # Default fallback as string
4487+
44094488
# Initialize tool call entry if not exists
4410-
if index not in accumulated_tool_calls:
4411-
accumulated_tool_calls[index] = {
4489+
if entry_key not in accumulated_tool_calls:
4490+
accumulated_tool_calls[entry_key] = {
44124491
'id': '',
44134492
'type': 'function',
44144493
'function': {'name': '', 'arguments': ''},
44154494
'extra_content': None,
44164495
'complete': False,
44174496
}
44184497

4419-
tool_call_entry = accumulated_tool_calls[index]
4498+
tool_call_entry = accumulated_tool_calls[entry_key]
44204499

44214500
# Accumulate tool call data
44224501
if tool_call_id:
@@ -4448,6 +4527,9 @@ def _accumulate_tool_calls(
44484527
# Check if any tool calls are complete
44494528
any_complete = False
44504529
for _index, tool_call_entry in accumulated_tool_calls.items():
4530+
# Skip internal mapping key
4531+
if _index == '_index_to_key_map':
4532+
continue
44514533
if (
44524534
tool_call_entry['id']
44534535
and tool_call_entry['function']['name']
@@ -4475,6 +4557,9 @@ def _execute_tools_sync_with_status_accumulator(
44754557

44764558
tool_calls_to_execute = []
44774559
for _tool_call_index, tool_call_data in accumulated_tool_calls.items():
4560+
# Skip internal mapping key
4561+
if _tool_call_index == '_index_to_key_map':
4562+
continue
44784563
if tool_call_data.get('complete', False):
44794564
tool_calls_to_execute.append(tool_call_data)
44804565

@@ -4936,6 +5021,8 @@ async def _astream_response(
49365021
) -> AsyncGenerator[ChatAgentResponse, None]:
49375022
r"""Async method to handle streaming responses with tool calls."""
49385023

5024+
self._warn_stream_accumulate_deprecation()
5025+
49395026
tool_call_records: List[ToolCallingRecord] = []
49405027
accumulated_tool_calls: Dict[str, Any] = {}
49415028
step_token_usage = self._create_token_usage_tracker()
@@ -5310,12 +5397,20 @@ async def _aprocess_stream_chunks_with_accumulator(
53105397
content_accumulator.get_full_reasoning_content()
53115398
or None
53125399
)
5400+
# In delta mode, final response content should be empty
5401+
# since all content was already yielded incrementally
5402+
display_content = (
5403+
final_content if self.stream_accumulate else ""
5404+
)
5405+
display_reasoning = (
5406+
final_reasoning if self.stream_accumulate else None
5407+
)
53135408
final_message = BaseMessage(
53145409
role_name=self.role_name,
53155410
role_type=self.role_type,
53165411
meta_dict={},
5317-
content=final_content,
5318-
reasoning_content=final_reasoning,
5412+
content=display_content,
5413+
reasoning_content=display_reasoning,
53195414
)
53205415

53215416
if response_format:
@@ -5363,6 +5458,9 @@ async def _execute_tools_async_with_status_accumulator(
53635458
# statuses immediately
53645459
tool_tasks = []
53655460
for _tool_call_index, tool_call_data in accumulated_tool_calls.items():
5461+
# Skip internal mapping key
5462+
if _tool_call_index == '_index_to_key_map':
5463+
continue
53665464
if tool_call_data.get('complete', False):
53675465
function_name = tool_call_data['function']['name']
53685466
try:

examples/agents/chatagent_stream.py

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,70 +11,73 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
14+
1415
from camel.agents import ChatAgent
1516
from camel.models import ModelFactory
17+
from camel.toolkits import MathToolkit
1618
from camel.types import ModelPlatformType, ModelType
1719

1820
# Create a streaming-capable model backend
1921
streaming_model = ModelFactory.create(
2022
model_platform=ModelPlatformType.DEFAULT,
21-
model_type=ModelType.GPT_4O_MINI,
23+
model_type=ModelType.DEFAULT,
2224
model_config_dict={
2325
"stream": True,
2426
"stream_options": {"include_usage": True},
2527
},
2628
)
2729

28-
agent_accumulated = ChatAgent(
29-
system_message="You are a helpful assistant that provides detailed "
30-
"and informative responses.",
30+
# Initialize MathToolkit for parallel calculation demo
31+
math_toolkit = MathToolkit()
32+
33+
# Create agent with math tools for parallel tool call demonstration
34+
# stream_accumulate=False means each chunk returns only delta (incremental)
35+
# content
36+
agent_with_tools = ChatAgent(
37+
system_message="You are a helpful math assistant. When asked to perform "
38+
"multiple calculations, use the math tools to compute each one. "
39+
"Always use the tools for calculations.",
3140
model=streaming_model,
41+
tools=math_toolkit.get_tools(),
42+
stream_accumulate=False, # Recommended: get delta content per chunk
3243
)
3344

34-
# Example user message
35-
user_message = "How many Rs are there in the word 'strawberry'?"
45+
# User message that triggers parallel tool calls
46+
user_message = (
47+
"Please calculate the following three operations simultaneously:\n"
48+
"1. 123.45 + 678.90\n"
49+
"2. 100 * 3.14159\n"
50+
"3. 1000 / 7"
51+
)
3652

37-
# Accumulated streaming mode (default)
38-
streaming_response = agent_accumulated.step(user_message)
53+
# Stream the response with tool calls
54+
streaming_response = agent_with_tools.step(user_message)
3955

4056
for chunk_response in streaming_response:
4157
message = chunk_response.msgs[0]
42-
reasoning_text = message.reasoning_content
43-
if reasoning_text:
44-
print(reasoning_text, end="", flush=True)
4558

46-
content_text = message.content
47-
if content_text:
48-
print(content_text, end="", flush=True)
49-
usage = streaming_response.info.get("usage", {})
50-
print(
51-
f"\n\nUsage: prompt={usage.get('prompt_tokens')}, "
52-
f"completion={usage.get('completion_tokens')}, "
53-
f"total={usage.get('total_tokens')}"
54-
)
55-
print("\n\n---\nDelta streaming mode (stream_accumulate=False):\n")
56-
57-
# Delta streaming mode (only new content per chunk)
58-
agent_delta = ChatAgent(
59-
system_message="You are a helpful assistant that provides concise "
60-
"and informative responses.",
61-
model=streaming_model,
62-
stream_accumulate=False,
63-
)
64-
65-
streaming_response_delta = agent_delta.step(user_message)
66-
67-
for chunk_response in streaming_response_delta:
68-
message = chunk_response.msgs[0]
69-
reasoning_delta = message.reasoning_content or ""
70-
if reasoning_delta:
71-
print(reasoning_delta, end="", flush=True)
59+
# Print reasoning content if available (for models that support it)
60+
if message.reasoning_content:
61+
print(message.reasoning_content, end="", flush=True)
7262

63+
# Print main content (delta mode - each chunk contains only new content)
7364
if message.content:
7465
print(message.content, end="", flush=True)
66+
67+
# Print usage statistics
7568
usage = streaming_response.info.get("usage", {})
7669
print(
7770
f"\n\nUsage: prompt={usage.get('prompt_tokens')}, "
7871
f"completion={usage.get('completion_tokens')}, "
7972
f"total={usage.get('total_tokens')}"
8073
)
74+
75+
# Print tool call records if any
76+
tool_calls = streaming_response.info.get("tool_calls", [])
77+
if tool_calls:
78+
print(f"\nTool calls made: {len(tool_calls)}")
79+
for i, tool_call in enumerate(tool_calls, 1):
80+
print(
81+
f" {i}. {tool_call.tool_name}({tool_call.args}) = "
82+
f"{tool_call.result}"
83+
)

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ dev_tools = [
188188
"langfuse>=2.60.5",
189189
]
190190
model_platforms = [
191-
"litellm>=1.38.1,<2",
191+
"litellm>=1.38.1,<1.80.12",
192192
"mistralai>=1.1.0,<2",
193193
"reka-api>=3.0.8,<4",
194194
"anthropic>=0.47.0,<0.50.0",
@@ -383,7 +383,7 @@ all = [
383383
"markitdown>=0.1.1; python_version >= '3.13'",
384384
"nebula3-python==3.8.2",
385385
"rank-bm25>=0.2.2,<0.3",
386-
"litellm>=1.38.1,<2",
386+
"litellm>=1.38.1,<1.80.12",
387387
"mistralai>=1.1.0,<2",
388388
"fish-audio-sdk>=1.0.0",
389389
"anthropic>=0.47.0,<0.50.0",

0 commit comments

Comments
 (0)