-
-
Notifications
You must be signed in to change notification settings - Fork 554
[FEAT][Implement MCP into agent.py] #819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
13f81a6
631ad71
9355f06
9859fcf
d7c8616
6e2d79d
d710308
2e2ebc4
10e056a
6954a9a
1ac9c57
4e457e3
c4c81a4
04ed17d
e051e2d
f495299
64c7952
a418c6f
d51b450
ef2abb0
7c00cf1
2787e96
61f9256
cb85838
3fe1ec1
d30ca92
db5ccd4
f854c8a
0761165
a64dab8
db37461
9faf83b
a877cc8
c75fa97
885a50c
3da7bd6
3273747
f07ca75
c8954eb
60d5145
cb6eca6
86cb09a
757ad59
9e62c12
8d079f6
a8ad884
e8cd786
3899a22
9436375
6bb6136
40cd524
ae4b0b5
80146d8
f69682f
26ef5ed
0f787de
50a11f3
b172ce0
63bbc34
b9ddc14
610064b
4b12eca
7febc47
b8dda6f
0a3d619
c16448f
fd723e7
7ec5796
f7055a6
be9d06e
24e6d1d
28186aa
868f096
9666c17
e7e72b2
da6ba0d
07f4c65
702d9f2
cb6aae8
4284eab
049869e
7c550d9
f4a134e
66ccd9b
f52bcbc
8b01167
a228faf
c4eb44e
eef1a53
eedcab4
2cdf71e
cc56f43
824dea0
41ccffc
fe7faf9
77324e6
977623a
b35ace1
0654a29
802a489
725f304
ba76575
552f5cc
242cfcd
1a075ca
8919dc6
9cd53bb
ddd1b60
c136911
d7e8849
9207a1b
9a682a2
cb136d7
a55589f
b4fc45e
e34b446
96c2669
fd50b02
58e2f5c
effa590
0a028ce
b8c712c
901f868
fc91b11
0ca3f03
3e159f5
e730e57
f8d422f
ff19580
9eeb625
47e509e
f5c2de1
248a943
c1af0f2
2802a0a
2f1c779
b398753
eb9d337
dba38f2
10936b5
0229e65
24482a9
8e6bf76
81affd6
55965be
9e23325
c490e82
d46da9c
925709d
d75bbed
1a84b24
16a2321
c7206ba
4b6f9a5
d9f1144
ea66e78
a612352
f61ada7
616c575
1f54917
8fd41d2
812115a
b567b30
51bb308
e8912f3
ee9230f
1a96821
3aad5ac
3383b59
c3d3ee9
ffaeef8
4464c90
6d16ea5
1c98fbb
9e5f755
9806d5e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -404,4 +404,3 @@ flycheck_*.el | |
|
||
# network security | ||
/network-security.data | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,284 @@ | ||
import asyncio | ||
import concurrent.futures | ||
import json | ||
import os | ||
import psutil | ||
Check failureCode scanning / Pyre Undefined import Error
Undefined import [21]: Could not find a module corresponding to import psutil.
|
||
import datetime | ||
from pathlib import Path | ||
from typing import List, Dict, Any, Optional | ||
from swarms.structs.agent import Agent | ||
from loguru import logger | ||
Check failureCode scanning / Pyre Undefined import Error
Undefined import [21]: Could not find a module corresponding to import loguru.
|
||
|
||
|
||
class AgentBenchmark: | ||
def __init__( | ||
self, | ||
num_iterations: int = 5, | ||
output_dir: str = "benchmark_results", | ||
): | ||
self.num_iterations = num_iterations | ||
self.output_dir = Path(output_dir) | ||
self.output_dir.mkdir(exist_ok=True) | ||
|
||
# Use process pool for CPU-bound tasks | ||
self.process_pool = concurrent.futures.ProcessPoolExecutor( | ||
max_workers=min(os.cpu_count(), 4) | ||
Check failureCode scanning / Pyre Incompatible parameter type Error
Incompatible parameter type [6]: In call min, for 1st positional argument, expected Variable[SupportsRichComparisonT (bound to Union[SupportsDunderGT[typing.Any], SupportsDunderLT[typing.Any]])] but got Optional[int].
|
||
) | ||
|
||
# Use thread pool for I/O-bound tasks | ||
self.thread_pool = concurrent.futures.ThreadPoolExecutor( | ||
max_workers=min(os.cpu_count() * 2, 8) | ||
Check failureCode scanning / Pyre Unsupported operand Error
Unsupported operand [58]: \* is not supported for operand types Optional[int] and int.
|
||
) | ||
|
||
self.default_queries = [ | ||
"Conduct an analysis of the best real undervalued ETFs", | ||
"What are the top performing tech stocks this quarter?", | ||
"Analyze current market trends in renewable energy sector", | ||
"Compare Bitcoin and Ethereum investment potential", | ||
"Evaluate the risk factors in emerging markets", | ||
] | ||
|
||
self.agent = self._initialize_agent() | ||
self.process = psutil.Process() | ||
|
||
# Cache for storing repeated query results | ||
self._query_cache = {} | ||
|
||
def _initialize_agent(self) -> Agent: | ||
return Agent( | ||
agent_name="Financial-Analysis-Agent", | ||
agent_description="Personal finance advisor agent", | ||
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, | ||
max_loops=1, | ||
model_name="gpt-4o-mini", | ||
dynamic_temperature_enabled=True, | ||
interactive=False, | ||
) | ||
|
||
def _get_system_metrics(self) -> Dict[str, float]: | ||
# Optimized system metrics collection | ||
return { | ||
"cpu_percent": self.process.cpu_percent(), | ||
"memory_mb": self.process.memory_info().rss / 1024 / 1024, | ||
} | ||
|
||
def _calculate_statistics( | ||
self, values: List[float] | ||
) -> Dict[str, float]: | ||
if not values: | ||
return {} | ||
|
||
sorted_values = sorted(values) | ||
n = len(sorted_values) | ||
mean_val = sum(values) / n | ||
|
||
stats = { | ||
"mean": mean_val, | ||
"median": sorted_values[n // 2], | ||
"min": sorted_values[0], | ||
"max": sorted_values[-1], | ||
} | ||
|
||
# Only calculate stdev if we have enough values | ||
if n > 1: | ||
stats["std_dev"] = ( | ||
sum((x - mean_val) ** 2 for x in values) / n | ||
) ** 0.5 | ||
|
||
return {k: round(v, 3) for k, v in stats.items()} | ||
|
||
async def process_iteration( | ||
self, query: str, iteration: int | ||
) -> Dict[str, Any]: | ||
"""Process a single iteration of a query""" | ||
try: | ||
# Check cache for repeated queries | ||
cache_key = f"{query}_{iteration}" | ||
if cache_key in self._query_cache: | ||
return self._query_cache[cache_key] | ||
|
||
iteration_start = datetime.datetime.now() | ||
pre_metrics = self._get_system_metrics() | ||
|
||
# Run the agent | ||
try: | ||
self.agent.run(query) | ||
success = True | ||
except Exception as e: | ||
str(e) | ||
success = False | ||
|
||
execution_time = ( | ||
datetime.datetime.now() - iteration_start | ||
).total_seconds() | ||
post_metrics = self._get_system_metrics() | ||
|
||
result = { | ||
"execution_time": execution_time, | ||
"success": success, | ||
"pre_metrics": pre_metrics, | ||
"post_metrics": post_metrics, | ||
"iteration_data": { | ||
"iteration": iteration + 1, | ||
"execution_time": round(execution_time, 3), | ||
"success": success, | ||
"system_metrics": { | ||
"pre": pre_metrics, | ||
"post": post_metrics, | ||
}, | ||
}, | ||
} | ||
|
||
# Cache the result | ||
self._query_cache[cache_key] = result | ||
return result | ||
|
||
except Exception as e: | ||
logger.error(f"Error in iteration {iteration}: {e}") | ||
raise | ||
|
||
async def run_benchmark( | ||
self, queries: Optional[List[str]] = None | ||
) -> Dict[str, Any]: | ||
"""Run the benchmark asynchronously""" | ||
queries = queries or self.default_queries | ||
benchmark_data = { | ||
"metadata": { | ||
"timestamp": datetime.datetime.now().isoformat(), | ||
"num_iterations": self.num_iterations, | ||
"agent_config": { | ||
"model_name": self.agent.model_name, | ||
"max_loops": self.agent.max_loops, | ||
}, | ||
}, | ||
"results": {}, | ||
} | ||
|
||
async def process_query(query: str): | ||
query_results = { | ||
"execution_times": [], | ||
"system_metrics": [], | ||
"iterations": [], | ||
} | ||
|
||
# Process iterations concurrently | ||
tasks = [ | ||
self.process_iteration(query, i) | ||
for i in range(self.num_iterations) | ||
] | ||
iteration_results = await asyncio.gather(*tasks) | ||
|
||
for result in iteration_results: | ||
query_results["execution_times"].append( | ||
result["execution_time"] | ||
) | ||
query_results["system_metrics"].append( | ||
result["post_metrics"] | ||
) | ||
query_results["iterations"].append( | ||
result["iteration_data"] | ||
) | ||
|
||
# Calculate statistics | ||
query_results["statistics"] = { | ||
"execution_time": self._calculate_statistics( | ||
query_results["execution_times"] | ||
), | ||
"memory_usage": self._calculate_statistics( | ||
[ | ||
m["memory_mb"] | ||
for m in query_results["system_metrics"] | ||
] | ||
), | ||
"cpu_usage": self._calculate_statistics( | ||
[ | ||
m["cpu_percent"] | ||
for m in query_results["system_metrics"] | ||
] | ||
), | ||
} | ||
Check failureCode scanning / Pyre Incompatible parameter type Error
Incompatible parameter type [6]: In call dict.\_\_setitem\_\_, for 2nd positional argument, expected List[typing.Any] but got Dict[str, Dict[str, float]].
|
||
|
||
return query, query_results | ||
|
||
# Execute all queries concurrently | ||
query_tasks = [process_query(query) for query in queries] | ||
query_results = await asyncio.gather(*query_tasks) | ||
|
||
for query, results in query_results: | ||
benchmark_data["results"][query] = results | ||
|
||
return benchmark_data | ||
|
||
def save_results(self, benchmark_data: Dict[str, Any]) -> str: | ||
"""Save benchmark results efficiently""" | ||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | ||
filename = ( | ||
self.output_dir / f"benchmark_results_{timestamp}.json" | ||
) | ||
|
||
# Write results in a single operation | ||
with open(filename, "w") as f: | ||
json.dump(benchmark_data, f, indent=2) | ||
|
||
logger.info(f"Benchmark results saved to: {filename}") | ||
return str(filename) | ||
|
||
def print_summary(self, results: Dict[str, Any]): | ||
"""Print a summary of the benchmark results""" | ||
print("\n=== Benchmark Summary ===") | ||
for query, data in results["results"].items(): | ||
print(f"\nQuery: {query[:50]}...") | ||
stats = data["statistics"]["execution_time"] | ||
print(f"Average time: {stats['mean']:.2f}s") | ||
print( | ||
f"Memory usage (avg): {data['statistics']['memory_usage']['mean']:.1f}MB" | ||
) | ||
print( | ||
f"CPU usage (avg): {data['statistics']['cpu_usage']['mean']:.1f}%" | ||
) | ||
|
||
async def run_with_timeout( | ||
self, timeout: int = 300 | ||
) -> Dict[str, Any]: | ||
"""Run benchmark with timeout""" | ||
try: | ||
return await asyncio.wait_for( | ||
self.run_benchmark(), timeout | ||
) | ||
except asyncio.TimeoutError: | ||
Check failureCode scanning / Pyre Undefined attribute Error
Undefined attribute [16]: Module builtins has no attribute TimeoutError.
|
||
logger.error( | ||
f"Benchmark timed out after {timeout} seconds" | ||
) | ||
raise | ||
|
||
def cleanup(self): | ||
"""Cleanup resources""" | ||
self.process_pool.shutdown() | ||
self.thread_pool.shutdown() | ||
self._query_cache.clear() | ||
|
||
|
||
async def main(): | ||
try: | ||
# Create and run benchmark | ||
benchmark = AgentBenchmark(num_iterations=1) | ||
|
||
# Run benchmark with timeout | ||
results = await benchmark.run_with_timeout(timeout=300) | ||
|
||
# Save results | ||
benchmark.save_results(results) | ||
|
||
# Print summary | ||
benchmark.print_summary(results) | ||
|
||
except Exception as e: | ||
logger.error(f"Benchmark failed: {e}") | ||
finally: | ||
# Cleanup resources | ||
benchmark.cleanup() | ||
|
||
|
||
if __name__ == "__main__": | ||
# Run the async main function | ||
asyncio.run(main()) |
Check failure
Code scanning / Bearer
Unsanitized dynamic input in file path Error