Skip to content

Commit b3e50a0

Browse files
authored
feat: add streaming callback (#120)
- fix issue with usage tracking - fix timeout on tool calls - remove logs from benchmark runs
1 parent 0b6b535 commit b3e50a0

8 files changed

Lines changed: 341 additions & 42 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ benchmark = [
3333
agents = [
3434
"haystack-ai",
3535
"mcp-haystack",
36-
"anthropic-haystack",
36+
"anthropic-haystack>=2.7.0",
3737
"langfuse-haystack"
3838
]
3939

src/deepset_mcp/agents/generalist/generalist_agent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ def get_agent(benchmark_config: BenchmarkConfig) -> Agent:
1818
"DEEPSET_WORKSPACE": benchmark_config.deepset_workspace,
1919
"DEEPSET_API_KEY": benchmark_config.deepset_api_key,
2020
},
21-
)
21+
),
22+
invocation_timeout=300.0,
2223
)
2324
prompt = (Path(__file__).parent / "system_prompt.md").read_text()
2425
generator = AnthropicChatGenerator(

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,6 @@ async def get_logs(
223223
params=params,
224224
)
225225

226-
logger.warning(resp.json)
227-
228226
raise_for_status(resp)
229227

230228
if resp.json is not None:

src/deepset_mcp/benchmark/runner/agent_benchmark_runner.py

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import json
33
import logging
4-
import sys
54
from collections.abc import Callable
65
from datetime import datetime
76
from pathlib import Path
@@ -18,6 +17,7 @@
1817
load_test_case_by_name,
1918
)
2019
from deepset_mcp.benchmark.runner.models import AgentConfig, TestCaseConfig
20+
from deepset_mcp.benchmark.runner.streaming import StreamingCallbackManager
2121
from deepset_mcp.benchmark.runner.teardown_actions import teardown_test_case_async
2222
from deepset_mcp.benchmark.runner.tracing import enable_tracing
2323

@@ -31,7 +31,7 @@ def __init__(
3131
self,
3232
agent_config: AgentConfig,
3333
benchmark_config: BenchmarkConfig,
34-
streaming: bool = False,
34+
streaming: bool = True,
3535
):
3636
"""
3737
Initialize the benchmark runner.
@@ -66,7 +66,6 @@ def __init__(
6666
f"{self.agent_config.display_name}-{self.commit_hash}_{self.run_timestamp.strftime('%Y%m%d_%H%M%S')}"
6767
)
6868

69-
# TODO: streaming is WIP; wait until https://github.com/deepset-ai/haystack-core-integrations/issues/1947 is fixed
7069
def _create_streaming_callback(self, test_case_name: str) -> Callable[[StreamingChunk], Any]:
7170
"""
7271
Create a streaming callback function for a specific test case.
@@ -77,33 +76,10 @@ def _create_streaming_callback(self, test_case_name: str) -> Callable[[Streaming
7776
Returns:
7877
Callback function for streaming
7978
"""
79+
callback = StreamingCallbackManager()
8080

81-
async def streaming_callback(chunk: StreamingChunk) -> None:
82-
"""Handle streaming chunks from the agent."""
83-
if hasattr(chunk, "content") and chunk.content:
84-
# meta content_block type=tool_use
85-
# meta type (content_block_start)
86-
# meta delta type=input_json_delta
87-
# meta delta message_delta
88-
# meta delta stop_reason=tool_use
89-
# Print with test case context, using a subtle prefix
90-
content = chunk.content
91-
# Handle newlines by adding the prefix to each new line
92-
lines = content.split("\n")
93-
for i, line in enumerate(lines):
94-
if i == 0:
95-
print(f"{line}", end="")
96-
elif line.strip(): # Only print non-empty lines with prefix
97-
print(f"\n[{test_case_name}] {line}", end="")
98-
else:
99-
print() # Just print the newline for empty lines
100-
101-
# If the content ends with a newline, print it
102-
if content.endswith("\n"):
103-
print()
104-
105-
# Ensure output is flushed immediately
106-
sys.stdout.flush()
81+
async def streaming_callback(chunk: StreamingChunk) -> Any:
82+
return await callback(chunk)
10783

10884
return streaming_callback
10985

@@ -477,8 +453,10 @@ def _extract_assistant_message_stats(messages: list[ChatMessage]) -> dict[str, s
477453
meta = message.meta
478454
if "usage" in meta:
479455
usage = meta["usage"]
480-
total_prompt_tokens += usage.get("prompt_tokens", 0)
481-
total_completion_tokens += usage.get("completion_tokens", 0)
456+
prompt_tokens = usage.get("prompt_tokens")
457+
total_prompt_tokens += prompt_tokens if prompt_tokens is not None else 0
458+
completion_tokens = usage.get("completion_tokens")
459+
total_completion_tokens += completion_tokens if completion_tokens is not None else 0
482460

483461
# Extract model (should be consistent across messages)
484462
if "model" in meta and model is None:

src/deepset_mcp/benchmark/runner/cli_agent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def run_agent_single(
5353

5454
try:
5555
results, _ = run_agent_benchmark(
56-
agent_config=agent_cfg, test_case_name=test_case, benchmark_config=benchmark_cfg, streaming=False
56+
agent_config=agent_cfg, test_case_name=test_case, benchmark_config=benchmark_cfg, streaming=True
5757
)
5858

5959
result = results[0]
@@ -117,7 +117,7 @@ def run_agent_all(
117117
test_case_name=None, # Run all
118118
benchmark_config=benchmark_cfg,
119119
concurrency=concurrency,
120-
streaming=False,
120+
streaming=True,
121121
)
122122

123123
# Display summary statistics

0 commit comments

Comments
 (0)