-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
100 lines (78 loc) · 3.82 KB
/
agent.py
File metadata and controls
100 lines (78 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import os
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search
from google.genai import types
import asyncio
import requests
from google.adk.agents.llm_agent import Agent
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams, StdioServerParameters
from google.adk.tools.mcp_tool import McpToolset, StreamableHTTPConnectionParams
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
APP_NAME = "ECOMMERCE_AGENT_API"
from bedrock_agentcore.runtime import BedrockAgentCoreApp
app = BedrockAgentCoreApp()
CLIENT_ID = <CLIENT_ID>
CLIENT_SECRET = <CLIENT_SECRET>
TOKEN_URL = <TOKEN_URL>
MCP_GATEWAY_URL = <MCP_GATEWAY_URL>
def fetch_access_token(client_id, client_secret, token_url):
"""Fetches the OAuth bearer token."""
response = requests.post(
token_url,
data="grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}".format(
client_id=client_id, client_secret=client_secret),
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
response.raise_for_status()
return response.json()['access_token']
MCP_BEARER_TOKEN = fetch_access_token(CLIENT_ID, CLIENT_SECRET, TOKEN_URL)
print(f"mcp bearer token is {MCP_BEARER_TOKEN}")
if not MCP_BEARER_TOKEN or MCP_BEARER_TOKEN.startswith("<PUT_"):
raise RuntimeError("Set MCP_BEARER_TOKEN to a valid token.")
root_agent = Agent(
model='gemini-2.5-flash',
name='root_agent',
description="The primary ecommerce assistant that handles product catalog, order management, and payment processing.",
instruction="You are an advanced Ecommerce AI assistant. Your role is to manage all customer requests for sales and order processing. "
"You have two distinct toolsets available through MCP: "
"1. Ecommerce Tools: Use these for managing products (getting, adding, listing) and orders (placing, retrieving). "
"2. Payment Tools (Stripe): Use these for generating payment links for new or existing products and also carryout other Stripe related activities. "
"Analyze the user's request and select the correct tool from the appropriate toolset. If placing an order, you might need to use the ecommerce tools first, then the payment tools to generate a link.",
tools=[
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=MCP_GATEWAY_URL,
headers={"Authorization": f"Bearer {MCP_BEARER_TOKEN}"}
),
)
],
)
# Session and Runner
async def setup_session_and_runner(user_id, session_id):
session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=user_id, session_id=session_id)
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)
return session, runner
# Agent Interaction
async def call_agent_async(query, user_id, session_id):
content = types.Content(role='user', parts=[types.Part(text=query)])
session, runner = await setup_session_and_runner(user_id, session_id)
events = runner.run_async(user_id=user_id, session_id=session_id, new_message=content)
final_response = "" # Initialize final response
async for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("event content: ", event.content)
print("Agent Response: ", final_response)
return final_response
@app.entrypoint
async def agent_invocation(payload, context):
return await call_agent_async(
payload.get("prompt", "what is Bedrock Agentcore Runtime?"),
payload.get("user_id", "educloud academy"),
context.session_id
)
app.run()