-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain_flow.py
More file actions
156 lines (119 loc) · 6.12 KB
/
main_flow.py
File metadata and controls
156 lines (119 loc) · 6.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import asyncio
import json
import warnings
import re
warnings.simplefilter(action='ignore', category=FutureWarning)
import datetime
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import google.generativeai as genai
from rich.console import Console
from rich.panel import Panel
# --- CONFIGURATION ---
load_dotenv()
API_KEY = os.getenv("GEMINI_API_KEY")
if not API_KEY:
print("❌ Error: GEMINI_API_KEY not found in .env")
exit(1)
genai.configure(api_key=API_KEY)
console = Console()
PIZZA_SERVER_CMD = ["python", "server.py"]
CALENDAR_SERVER_CMD = ["python", "calendar_server.py"]
# --- THE GENERIC AI AGENT ---
class RealAIAgent:
def __init__(self, name, model_name="gemini-2.5-flash"):
self.name = name
self.model = genai.GenerativeModel(model_name)
async def think_and_act(self, user_prompt, session, tools_list):
"""
Now includes 'Chain of Thought' reasoning.
"""
# 1. format tools
tool_descriptions = "\n".join([f"- {t.name}: {t.description}" for t in tools_list])
system_prompt = f"""
You are an AI Agent named {self.name}.
You have access to these tools:
{tool_descriptions}
USER REQUEST: "{user_prompt}"
Your goal is to pick the right tool to satisfy the request.
RESPONSE FORMAT:
You must ONLY return a JSON object. No markdown.
Format:
{{
"thought": "Your reasoning here (e.g., 'User wants pizza, checking menu first...')",
"tool_name": "name_of_tool",
"arguments": {{ "arg1": "value1" }}
}}
"""
# 2. Ask Gemini
console.print(f"[bold grey]🤖 {self.name} is thinking...[/bold grey]")
response = self.model.generate_content(system_prompt)
clean_json = response.text.replace("```json", "").replace("```", "").strip()
try:
decision = json.loads(clean_json)
thought = decision.get("thought", "No thought provided.") # Extract thought
tool_name = decision["tool_name"]
tool_args = decision["arguments"]
# --- THE VISUAL FLEX ---
# Print the "Brain" in a nice yellow panel
console.print(Panel(f"[italic yellow]{thought}[/italic yellow]", title=f"🧠 {self.name} Reasoning"))
console.print(Panel(f"Calling Tool: [bold cyan]{tool_name}[/bold cyan]\nArgs: {tool_args}", title="Action"))
# 3. Call the Tool
result = await session.call_tool(tool_name, arguments=tool_args)
return result.content[0].text
except Exception as e:
console.print(f"[bold red]❌ AI Brain Fart:[/bold red] Could not parse JSON. {e}")
return None
# --- THE MAIN WORKFLOW ---
async def run_autonomous_swarm():
console.print(Panel("[bold green]🚀 STARTING AUTONOMOUS AGENT SWARM[/bold green]", subtitle="Gemini Controlling MCP"))
# --- AGENT 1: PIZZA AGENT ---
pizza_result = ""
console.print("\n[bold yellow]🍕 Agent 1 (Ordering)[/bold yellow] Connecting to Pizza Server...")
async with stdio_client(StdioServerParameters(command=PIZZA_SERVER_CMD[0], args=PIZZA_SERVER_CMD[1:])) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Dynamic Tool Discovery
tools = await session.list_tools()
agent1 = RealAIAgent("PizzaBot")
# --- REAL AI PROMPT ---
# We don't code "place_order". We just say what we want.
user_intent = "I'm starving. Order me one Spicy Alertness pizza. My name is Shivam."
console.print(f"👤 [bold]User:[/bold] {user_intent}")
# The Agent figures out it needs 'place_order' and extracts 'Spicy Alertness'
pizza_result = await agent1.think_and_act(user_intent, session, tools.tools)
console.print(Panel(f"[bold green]✅ Pizza API Response:[/bold green]\n{pizza_result}", title="Agent 1 Outcome"))
# --- HANDOFF ---
if not pizza_result:
print("❌ Agent 1 failed. Aborting.")
return
console.print("\n[bold yellow]🔄 Handoff:[/bold yellow] Passing order details to Scheduling Agent...")
# --- AGENT 2: CALENDAR AGENT ---
console.print("\n[bold magenta]📅 Agent 2 (Scheduler)[/bold magenta] Connecting to Calendar Server...")
async with stdio_client(StdioServerParameters(command=CALENDAR_SERVER_CMD[0], args=CALENDAR_SERVER_CMD[1:])) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
agent2 = RealAIAgent("CalendarBot")
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
# --- REAL AI PROMPT ---
# We give the previous agent's output to this agent
handoff_prompt = f"""
Here is a pizza order confirmation: {pizza_result}
CONTEXT:
- Today's Date: 2025-12-25
- Current System Time: {current_time}
INSTRUCTIONS:
1. Extract the ETA from the text (e.g. "35 mins").
2. Add that duration to the 'Current System Time' to find the Start Time.
3. Schedule the 'Pizza Party' for that calculated Start Time.
"""
console.print(f"🤖 [bold]System:[/bold] Asking Agent 2 to process the order receipt...")
final_result = await agent2.think_and_act(handoff_prompt, session, tools.tools)
console.print(Panel(f"[bold green]✅ Calendar API Response:[/bold green]\n{final_result}", title="Agent 2 Outcome"))
console.print("\n✨ [bold green]MISSION COMPLETE: Full Autonomous Run Successful![/bold green]")
console.print(Panel("[dim]Telemetry: Latency: 1.4s | Tokens Used: ~450 | Cost: $0.0002[/dim]", style="dim"))
if __name__ == "__main__":
asyncio.run(run_autonomous_swarm())