diff --git a/cloud-governance-mcp/.dockerignore b/cloud-governance-mcp/.dockerignore new file mode 100644 index 00000000..cdea2c0e --- /dev/null +++ b/cloud-governance-mcp/.dockerignore @@ -0,0 +1,7 @@ +.venv +.env +conversations/ +streamlit.log +__pycache__ +*.pyc +.claude/ diff --git a/cloud-governance-mcp/.env.example b/cloud-governance-mcp/.env.example new file mode 100644 index 00000000..bc3500a1 --- /dev/null +++ b/cloud-governance-mcp/.env.example @@ -0,0 +1,13 @@ +# AI Model Configuration +# Get your API key from: https://ai.google.dev/ +GEMINI_API_KEY=your-gemini-api-key-here +MODEL_NAME=gemini-2.5-flash + +# OpenSearch Connection (used by mcp_server.py subprocess) +OPENSEARCH_HOSTS=http://your-opensearch-host:9200 +# Only needed if authentication is required: +# OPENSEARCH_USERNAME=your-username +# OPENSEARCH_PASSWORD=your-password + +# Default index to query (can be changed to any index you want) +ES_INDEX=cloud-governance-policy-es-index diff --git a/cloud-governance-mcp/.gitignore b/cloud-governance-mcp/.gitignore new file mode 100644 index 00000000..fca55b38 --- /dev/null +++ b/cloud-governance-mcp/.gitignore @@ -0,0 +1,29 @@ +# Environment variables +.env + +# Python virtual environment +.venv/ +venv/ +env/ + +# Python cache +__pycache__/ +*.py[cod] +*$py.class + +# Logs +*.log +streamlit.log + +# Conversation history +conversations/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db diff --git a/cloud-governance-mcp/Dockerfile b/cloud-governance-mcp/Dockerfile new file mode 100644 index 00000000..e69e5275 --- /dev/null +++ b/cloud-governance-mcp/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN useradd -m -u 10001 appuser && \ + mkdir -p /home/appuser/.streamlit && \ + printf '[browser]\ngatherUsageStats = false\n[client]\nshowErrorDetails = false\n' > /home/appuser/.streamlit/config.toml + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py mcp_server.py ./ +RUN chown -R appuser:appuser /app + +USER appuser + +EXPOSE 8501 + +CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0", "--server.headless=true"] diff --git a/cloud-governance-mcp/README.md b/cloud-governance-mcp/README.md new file mode 100644 index 00000000..2bca2af0 --- /dev/null +++ b/cloud-governance-mcp/README.md @@ -0,0 +1,242 @@ +# Cloud Governance AI Agent + +AI-powered natural language interface for querying cloud-governance OpenSearch data using MCP (Model Context Protocol). + +## Overview + +This application allows you to ask questions about cloud costs, usage, and policy compliance in plain English, without needing to know OpenSearch query syntax or Kibana dashboard building. + +**Architecture:** +- **Streamlit Web UI**: Chat interface (port 8501) +- **Custom MCP Server**: Python subprocess with high-level query tools (stdio transport, no container) +- **Google Gemini AI**: Natural language understanding and tool calling +- **OpenSearch/Elasticsearch**: Your existing cloud-governance data + +## Prerequisites + +### System Requirements +- **OS**: Linux (RHEL, Fedora, CentOS) or macOS +- **Python**: 3.10+ + +### Access Requirements +1. **OpenSearch/Elasticsearch Cluster** + - Host URL (e.g., `http://your-host:9200`) + - Username and password (if authentication enabled) + - Cloud-governance indices with data + +2. **Google Gemini API Key** + - Get from: https://ai.google.dev/ + - Free tier available for development + +## Quick Start + +### 1. Configure Environment + +```bash +cd cloud-governance-mcp +cp .env.example .env +vi .env +``` + +Fill in your credentials: + +```bash +GEMINI_API_KEY=your-actual-gemini-api-key +OPENSEARCH_HOSTS=http://your-opensearch-host:9200 +# OPENSEARCH_USERNAME=your-username (only if auth required) +# OPENSEARCH_PASSWORD=your-password (only if auth required) +``` + +### 2. Start the Application + +```bash +./run_agent.sh +``` + +The MCP server starts automatically as a subprocess -- no separate container or process needed. + +**Expected output:** +``` +Streamlit started successfully +Access the UI at: http://localhost:8501 +``` + +### 3. Open in Browser + +Navigate to: http://localhost:8501 + +## Available Tools + +The AI agent has 7 query tools available: + +| Tool | Description | +|------|-------------| +| `list_indices` | List all cloud-governance indices with document counts | +| `get_fields` | Discover field names and types for an index | +| `search_documents` | Filtered search with simple field/value pairs (auto-handles `.keyword`) | +| `count_by_field` | Group documents by a field and count (terms aggregation) | +| `aggregate` | Compute sum/avg/max/min on numeric fields, grouped by another field | +| `date_range_search` | Search within a date range with optional filters | +| `raw_search` | Escape hatch for complex raw OpenSearch Query DSL | + +## Usage Examples + +### Example Questions + +**Discovery:** +``` +What fields are available in this index? +Show me 5 sample documents +What indices are available? +``` + +**Filtered Searches:** +``` +Show me all zombie_cluster_resource resources from PERF-DEPT +Find all unattached volumes in us-east-1 +List resources with skip_policy tag +``` + +**Aggregations:** +``` +Count documents by policy type +Top 10 accounts by resource count +Total yearly savings by account +Average cost per region +``` + +**Date Ranges:** +``` +Show resources created in the last 7 days +Find all policies that ran between 2026-01-01 and 2026-03-31 +``` + +### How It Works + +1. **You ask a question** in natural language +2. **AI selects the right tool** (search, count, aggregate, etc.) +3. **MCP server builds the query** automatically (handles `.keyword` suffixes, Query DSL construction) +4. **AI formats the results** as markdown tables +5. **You get the answer** with data summary + +## Troubleshooting + +### Streamlit Won't Start + +```bash +# Kill existing process +kill $(lsof -ti tcp:8501) + +# Try again +./run_agent.sh +``` + +### AI Not Calling Tools + +- Check GEMINI_API_KEY is valid +- Verify tools loaded (check sidebar: "Connected (X tools)") +- Try more specific questions +- Check streamlit.log for errors + +### OpenSearch Connection Failed + +```bash +# Test connection manually +curl http://your-opensearch-host:9200/_cat/indices + +# Check non-secret OpenSearch settings +grep -E '^(OPENSEARCH_HOSTS|OPENSEARCH_USERNAME)=' .env +``` + +## Management Commands + +```bash +# View logs +tail -f streamlit.log + +# Stop Streamlit +kill $(lsof -ti tcp:8501) + +# Restart +./run_agent.sh + +# Update dependencies +source .venv/bin/activate +pip install -r requirements.txt --upgrade +``` + +## Available OpenSearch Indices + +The cloud-governance repository populates these indices: + +| Index | Description | +|-------|-------------| +| `cloud-governance-policy-es-index` | Policy execution results | +| `cloud-governance-global-cost-billing-index` | Cost billing data | +| `cloud-governance-cost-explorer-perf` | Performance account costs | +| `cloud-governance-cost-explorer-psap` | PSAP account costs | +| `cloud-governance-cost-explorer-global` | Global cost explorer | +| `cloud-governance-resource-orchestration` | CRO tracking | +| `cloud-governance-yearly-saving` | Yearly savings analysis | + +## Security Notes + +- `.env` file is gitignored (never commit credentials) +- Uses read-only OpenSearch credentials (recommended) +- Streamlit has no built-in authentication (deploy behind VPN) +- GEMINI_API_KEY gives full access to your AI usage quota + +## Advanced Configuration + +### Use Different Gemini Model + +Edit `.env`: +```bash +MODEL_NAME=gemini-1.5-pro # More capable, slower +MODEL_NAME=gemini-2.5-flash # Experimental, fastest +``` + +### Change Streamlit Port + +Edit `run_agent.sh`: +```bash +streamlit run app.py --server.port 9001 +``` + +### Connect to Different OpenSearch Cluster + +Update `.env`: +```bash +OPENSEARCH_HOSTS=http://my-other-cluster.example.com:9200 +OPENSEARCH_USERNAME=different-user +OPENSEARCH_PASSWORD=different-password +``` + +Then restart: `./run_agent.sh` + +## Project Structure + +``` +cloud-governance-mcp/ +├── .env # Configuration (DO NOT COMMIT) +├── .env.example # Template for .env +├── .gitignore # Ignore sensitive files +├── app.py # Streamlit chat application +├── mcp_server.py # Custom MCP server with high-level query tools +├── requirements.txt # Python dependencies +├── run_agent.sh # Start Streamlit (MCP server auto-starts) +├── README.md # This file +├── .venv/ # Virtual environment (auto-created) +└── streamlit.log # Application logs +``` + +## Support + +- **Cloud Governance**: https://github.com/redhat-performance/cloud-governance +- **MCP Documentation**: https://modelcontextprotocol.io/ +- **Streamlit Docs**: https://docs.streamlit.io/ +- **Gemini API**: https://ai.google.dev/ + +## License + +Same as cloud-governance repository (Apache License 2.0) diff --git a/cloud-governance-mcp/app.py b/cloud-governance-mcp/app.py new file mode 100644 index 00000000..05c0f561 --- /dev/null +++ b/cloud-governance-mcp/app.py @@ -0,0 +1,778 @@ +""" +Cloud Governance AI Agent - Streamlit Application +Provides natural language interface to query OpenSearch data via MCP server +""" + +import asyncio +import os +import sys +import threading +from contextlib import AsyncExitStack +from typing import List + +import streamlit as st +from google import genai +from google.genai import types +from dotenv import load_dotenv +from mcp.client.stdio import stdio_client, StdioServerParameters +from mcp.client.session import ClientSession + +CONVERSATION_DIR = os.path.join(os.path.dirname(__file__), 'conversations') +CONVERSATION_FILE = os.path.join(CONVERSATION_DIR, 'chat_history.json') + + +def _reload_config(): + """Re-read .env so changes are picked up without restarting Streamlit.""" + load_dotenv(override=True) + return { + "MODEL_NAME": os.getenv('MODEL_NAME', 'gemini-2.0-flash-exp'), + "GEMINI_API_KEY": os.getenv('GEMINI_API_KEY'), + "ES_INDEX": os.getenv('ES_INDEX', 'cloud-governance-policy-es-index'), + "OPENSEARCH_HOSTS": os.getenv('OPENSEARCH_HOSTS', 'http://localhost:9200'), + } + + +_cfg = _reload_config() +MODEL_NAME = _cfg["MODEL_NAME"] +GEMINI_API_KEY = _cfg["GEMINI_API_KEY"] +ES_INDEX = _cfg["ES_INDEX"] +OPENSEARCH_HOSTS = _cfg["OPENSEARCH_HOSTS"] + + +@st.cache_data(ttl=300) +def _fetch_available_indices() -> list[str]: + """Fetch index names from OpenSearch for the sidebar dropdown.""" + try: + from opensearchpy import OpenSearch + username = os.getenv("OPENSEARCH_USERNAME", "") + password = os.getenv("OPENSEARCH_PASSWORD", "") + kwargs = { + "hosts": [OPENSEARCH_HOSTS], + "use_ssl": OPENSEARCH_HOSTS.startswith("https"), + "verify_certs": False, + "ssl_show_warn": False, + "timeout": 10, + } + if username and password: + kwargs["http_auth"] = (username, password) + client = OpenSearch(**kwargs) + indices = client.cat.indices(format="json") + names = sorted( + idx.get("index", "") for idx in indices + if idx.get("index", "").startswith("cloud-governance") + ) + return names + except Exception: + return [] + + +def _build_mcp_server_params() -> StdioServerParameters: + env = { + "OPENSEARCH_HOSTS": OPENSEARCH_HOSTS, + "PATH": os.environ.get("PATH", ""), + "HOME": os.environ.get("HOME", ""), + } + username = os.getenv("OPENSEARCH_USERNAME", "") + password = os.getenv("OPENSEARCH_PASSWORD", "") + if username: + env["OPENSEARCH_USERNAME"] = username + if password: + env["OPENSEARCH_PASSWORD"] = password + + return StdioServerParameters( + command=sys.executable, + args=[os.path.join(os.path.dirname(__file__), "mcp_server.py")], + env=env, + ) + +# System instruction for AI agent (will be formatted with ES_INDEX) +SYSTEM_INSTRUCTION_TEMPLATE = """ +You query OpenSearch index "{es_index}". You MUST call tools for every question. + +WORKFLOW: +1. If unsure about available fields, call get_fields(index="{es_index}") +2. Use search_documents for filtered lookups +3. Use count_by_field for grouping and counting +4. Use aggregate for sums, averages, max, min on numeric fields +5. Use date_range_search for time-based queries +6. Use raw_search only for complex queries that don't fit the above tools +7. Present results in clear markdown tables + +TIPS: +- You do NOT need to add .keyword suffix - tools handle this automatically +- Use filters as a list: [{{"field": "", "value": ""}}] +- Filters work on ANY field type including numeric fields +- Use date_range_search ONLY for actual date/timestamp fields, NOT for integer fields +- To filter aggregations by a field value, pass the filters parameter to aggregate or count_by_field +- Always call get_fields first to discover field names and types before querying +- Always pass index="{es_index}" to every tool call +""" + +def _get_system_instruction(): + return SYSTEM_INSTRUCTION_TEMPLATE.format(es_index=ES_INDEX) + + +# Conversation persistence functions +def load_conversation_history() -> list: + """Load chat history from file""" + import json + + # Create directory if it doesn't exist + os.makedirs(CONVERSATION_DIR, exist_ok=True) + + if os.path.exists(CONVERSATION_FILE): + try: + with open(CONVERSATION_FILE, 'r') as f: + return json.load(f) + except Exception as e: + st.warning(f"Could not load chat history: {e}") + return [] + return [] + + +def save_conversation_history(messages: list): + """Save chat history to file""" + import json + + try: + # Create directory if it doesn't exist + os.makedirs(CONVERSATION_DIR, exist_ok=True) + + with open(CONVERSATION_FILE, 'w') as f: + json.dump(messages, f, indent=2) + except Exception as e: + st.warning(f"Could not save chat history: {e}") + + +def clear_conversation_history(): + """Delete saved chat history""" + if os.path.exists(CONVERSATION_FILE): + try: + os.remove(CONVERSATION_FILE) + except Exception as e: + st.warning(f"Could not delete chat history: {e}") + + +def fix_gemini_schema(schema) -> dict: + """Normalize MCP schemas for Gemini compatibility + + MCP schemas may use array notation for types: {"type": ["string", "null"]} + Gemini expects single type: {"type": "string"} + """ + if not isinstance(schema, dict): + return {"type": str(schema)} + + cleaned = {} + + # Handle type as list or string + raw_type = schema.get("type") + if isinstance(raw_type, list): + # Take first non-null type + cleaned["type"] = next((t for t in raw_type if t != "null"), raw_type[0]) + else: + cleaned["type"] = raw_type or "object" + + # Preserve description + if "description" in schema: + cleaned["description"] = schema["description"] + + # Handle array items recursively + if cleaned["type"] == "array": + if "items" in schema: + cleaned["items"] = fix_gemini_schema(schema["items"]) + else: + cleaned["items"] = {"type": "string"} + + # Recursively clean nested properties + if "properties" in schema and isinstance(schema["properties"], dict): + cleaned["properties"] = { + k: fix_gemini_schema(v) + for k, v in schema["properties"].items() + } + + # Preserve required fields + if "required" in schema: + cleaned["required"] = schema["required"] + + return cleaned + + +async def fetch_mcp_tools_async() -> List[types.FunctionDeclaration]: + """Dynamically fetch available tools from MCP server (stdio subprocess)""" + st.info("Connecting to MCP server (stdio)...") + + server_params = _build_mcp_server_params() + + async with AsyncExitStack() as stack: + read, write = await stack.enter_async_context( + stdio_client(server_params) + ) + st.info("stdio connection established") + + session = await stack.enter_async_context( + ClientSession(read, write) + ) + st.info("Session created") + + await session.initialize() + st.info("Session initialized") + + mcp_data = await session.list_tools() + st.info(f"Found {len(mcp_data.tools)} tools") + + tools = [ + types.FunctionDeclaration( + name=tool.name, + description=tool.description or "No description", + parameters=fix_gemini_schema(tool.inputSchema) + ) + for tool in mcp_data.tools + ] + + st.success(f"Successfully loaded {len(tools)} tools!") + return tools + + +def fetch_mcp_tools() -> List[types.FunctionDeclaration]: + """Synchronous wrapper that runs async tool fetching in a separate thread""" + result = [] + error = None + + def run_in_thread(): + nonlocal result, error + try: + # Create a new event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(fetch_mcp_tools_async()) + finally: + loop.close() + except Exception as e: + error = e + + thread = threading.Thread(target=run_in_thread) + thread.start() + thread.join() + + if error: + st.error(f"❌ Failed to connect to MCP server") + st.error(f"Error type: {type(error).__name__}") + st.error(f"Error message: {str(error)}") + st.info(f"OpenSearch: {OPENSEARCH_HOSTS}") + st.info("Check that opensearch-py is installed and OPENSEARCH_HOSTS is correct in .env") + + import traceback + st.code(traceback.format_exception(type(error), error, error.__traceback__)) + + return [] + + return result + + +async def execute_mcp_tool_async(name: str, arguments: dict) -> str: + """Execute a tool via MCP server (stdio subprocess)""" + try: + server_params = _build_mcp_server_params() + + async with AsyncExitStack() as stack: + read, write = await stack.enter_async_context( + stdio_client(server_params) + ) + session = await stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + + result = await session.call_tool(name, arguments) + + return "\n".join([ + content.text for content in result.content + if hasattr(content, 'text') + ]) + except Exception as e: + import json + error_msg = f"ERROR calling {name}:\n" + error_msg += f"Type: {type(e).__name__}\n" + error_msg += f"Message: {str(e)}\n" + error_msg += f"Arguments: {json.dumps(arguments, indent=2)}\n" + + if hasattr(e, '__cause__') and e.__cause__: + error_msg += f"Cause: {str(e.__cause__)}\n" + + return error_msg + + +def execute_mcp_tool(name: str, arguments: dict) -> str: + """Synchronous wrapper for executing MCP tool""" + import json + + result = None + error = None + + # Debug logging - show what tool is being called + debug_mode = st.session_state.get("debug_mode", True) + if debug_mode: + with st.expander(f"🔧 Tool Call: {name}", expanded=False): + st.code(json.dumps(arguments, indent=2), language="json") + + def run_in_thread(): + nonlocal result, error + try: + # Create a new event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(execute_mcp_tool_async(name, arguments)) + finally: + loop.close() + except Exception as e: + error = e + + thread = threading.Thread(target=run_in_thread) + thread.start() + thread.join() + + if error: + error_msg = f"Error executing {name}: {str(error)}" + st.error(error_msg) + return error_msg + + # Debug logging - show tool response + if debug_mode: + with st.expander(f"✅ Tool Response: {name}", expanded=False): + try: + # Try to pretty-print JSON + parsed = json.loads(result) + st.code(json.dumps(parsed, indent=2), language="json") + except (json.JSONDecodeError, TypeError): + # If not JSON, show as text (truncate if too long) + if len(result) > 2000: + st.text(result[:2000] + f"\n\n... (truncated, total {len(result)} chars)") + else: + st.text(result) + + return result + + +def run_agent_loop_gemini(user_message: str, tools: List[types.FunctionDeclaration], previous_messages: list | None = None) -> str: + """Run agentic loop with Gemini""" + + if not GEMINI_API_KEY: + return "❌ GEMINI_API_KEY not set. Please configure .env file." + + try: + client = genai.Client(api_key=GEMINI_API_KEY) + except Exception as e: + return f"❌ Failed to initialize Gemini client: {e}" + + # Create tool wrapper + gemini_tool = types.Tool(function_declarations=tools) if tools else None + + # Debug: Verify tools are available + if hasattr(st, 'session_state') and st.session_state.get("debug_mode", False): + if tools: + st.info(f"🔍 Debug: {len(tools)} tools available to AI: {[t.name for t in tools]}") + else: + st.error("❌ Debug: NO TOOLS AVAILABLE - this will cause issues!") + + # Few-shot example: discover fields then search + history = [] + + history.append(types.Content( + role="user", + parts=[types.Part(text="Show me the data in this index")] + )) + history.append(types.Content( + role="model", + parts=[ + types.Part(function_call=types.FunctionCall( + name="get_fields", + args={"index": ES_INDEX} + )) + ] + )) + history.append(types.Content( + role="user", + parts=[ + types.Part(function_response=types.FunctionResponse( + name="get_fields", + response={"result": "| Field | Type | Aggregatable |\n| --- | --- | --- |\n| name | text | |\n| category | keyword | Yes (keyword) |\n| status | text | |\n| count | long | Yes (numeric) |\n| timestamp | date | |\n\nNumeric fields for aggregation (sum/avg/max/min): count"} + )) + ] + )) + history.append(types.Content( + role="model", + parts=[ + types.Part(function_call=types.FunctionCall( + name="search_documents", + args={"index": ES_INDEX, "size": 5} + )) + ] + )) + history.append(types.Content( + role="user", + parts=[ + types.Part(function_response=types.FunctionResponse( + name="search_documents", + response={"result": "Total matching documents: 100\nShowing 5 results:\n\n| name | category | status | count |\n| --- | --- | --- | --- |\n| item-1 | typeA | active | 10 |"} + )) + ] + )) + history.append(types.Content( + role="model", + parts=[types.Part(text=f"Here are 5 sample documents from `{ES_INDEX}`. The index contains fields like name, category, status, count, and timestamp.")] + )) + + # Second few-shot: count by field with filter + history.append(types.Content( + role="user", + parts=[types.Part(text="Count active items by category")] + )) + history.append(types.Content( + role="model", + parts=[ + types.Part(function_call=types.FunctionCall( + name="count_by_field", + args={ + "index": ES_INDEX, + "group_by": "category", + "filters": [{"field": "status", "value": "active"}] + } + )) + ] + )) + history.append(types.Content( + role="user", + parts=[ + types.Part(function_response=types.FunctionResponse( + name="count_by_field", + response={"result": "| category | Count |\n| --- | --- |\n| typeA | 45 |\n| typeB | 23 |\n| typeC | 12 |\nTotal matching documents: 80"} + )) + ] + )) + history.append(types.Content( + role="model", + parts=[types.Part(text="Here are active items grouped by category:\n\n| Category | Count |\n|---------|-------|\n| typeA | 45 |\n| typeB | 23 |\n| typeC | 12 |\n\nTotal: 80 active items found.")] + )) + + # Third few-shot: aggregate with filters (teaches filtering pattern) + history.append(types.Content( + role="user", + parts=[types.Part(text="Show me the sum of count grouped by category, but only for active items")] + )) + history.append(types.Content( + role="model", + parts=[ + types.Part(function_call=types.FunctionCall( + name="aggregate", + args={ + "index": ES_INDEX, + "group_by": "category", + "metric_field": "count", + "metric_type": "sum", + "filters": [{"field": "status", "value": "active"}] + } + )) + ] + )) + history.append(types.Content( + role="user", + parts=[ + types.Part(function_response=types.FunctionResponse( + name="aggregate", + response={"result": "| category | sum(count) | Count |\n| --- | --- | --- |\n| typeA | 1,250 | 45 |\n| typeB | 830 | 23 |\n| typeC | 410 | 12 |\nTotal matching documents: 80"} + )) + ] + )) + history.append(types.Content( + role="model", + parts=[types.Part(text="Here is the sum of count for active items by category:\n\n| Category | Total Count | Documents |\n|---------|------------|----------|\n| typeA | 1,250 | 45 |\n| typeB | 830 | 23 |\n| typeC | 410 | 12 |")] + )) + + # Add previous conversation messages if any + if previous_messages: + for msg in previous_messages: + role = "model" if msg["role"] == "assistant" else msg["role"] + history.append(types.Content( + role=role, + parts=[types.Part(text=msg["content"])] + )) + + # Add current user message + history.append(types.Content(role="user", parts=[types.Part(text=user_message)])) + + MAX_TURNS = 10 + final_answer = "" + tool_calls_made = [] + + for turn in range(MAX_TURNS): + try: + # Force tool calling on first turn, allow text on subsequent turns + if turn == 0 and gemini_tool: + tool_config = types.ToolConfig( + function_calling_config=types.FunctionCallingConfig( + mode="ANY" + ) + ) + else: + tool_config = types.ToolConfig( + function_calling_config=types.FunctionCallingConfig( + mode="AUTO" + ) + ) + + response = client.models.generate_content( + model=MODEL_NAME, + contents=history, + config=types.GenerateContentConfig( + tools=[gemini_tool] if gemini_tool else None, + tool_config=tool_config if gemini_tool else None, + system_instruction=_get_system_instruction() + ) + ) + + # Add assistant response to history + history.append(response.candidates[0].content) + + # Extract function calls + fcalls = [ + p.function_call for p in response.candidates[0].content.parts + if hasattr(p, 'function_call') and p.function_call + ] + + # If no function calls, we have final answer + if not fcalls: + final_answer = response.text + + if hasattr(st, 'session_state') and st.session_state.get("debug_mode", False): + st.info(f"🔍 Debug: Final answer on turn {turn + 1}. Tools called: {len(tool_calls_made)}") + + break + + # Execute all function calls + tool_responses = [] + for fc in fcalls: + tool_calls_made.append(fc.name) + result = execute_mcp_tool(fc.name, dict(fc.args)) + + tool_responses.append( + types.Part( + function_response=types.FunctionResponse( + name=fc.name, + response={"result": result} + ) + ) + ) + + # Add tool results to history + history.append(types.Content(role="user", parts=tool_responses)) + + except Exception as e: + return f"❌ Error during agent loop: {str(e)}" + + if final_answer and tool_calls_made: + citation = f"\n\n✅ *Data from OpenSearch via: {', '.join(set(tool_calls_made))}*" + return final_answer + citation + elif final_answer and not tool_calls_made: + return "❌ I was unable to query the database for this request. Please try rephrasing your question, for example:\n- *Show me 5 sample documents*\n- *What fields are in this index?*\n- *Count documents grouped by [field]*" + + return final_answer or "⚠️ Max reasoning turns reached. Please refine your question." + + +def main(): + """Main Streamlit application""" + + # Re-read .env on every Streamlit rerun so config changes are picked up + global MODEL_NAME, GEMINI_API_KEY, ES_INDEX, OPENSEARCH_HOSTS + cfg = _reload_config() + MODEL_NAME = cfg["MODEL_NAME"] + GEMINI_API_KEY = cfg["GEMINI_API_KEY"] + ES_INDEX = cfg["ES_INDEX"] + OPENSEARCH_HOSTS = cfg["OPENSEARCH_HOSTS"] + + st.set_page_config( + page_title="Cloud Governance AI Agent", + page_icon="☁️", + layout="wide" + ) + + st.title("☁️ Cloud Governance AI Agent") + st.caption("Ask questions about cloud costs, usage, and policy compliance in natural language") + + # Index will be set from sidebar selectbox; initialize from .env default + if "es_index" not in st.session_state: + st.session_state.es_index = ES_INDEX + + # Initialize session state - load from file if exists (do this before sidebar) + if "messages" not in st.session_state: + st.session_state.messages = load_conversation_history() + if st.session_state.messages: + st.info(f"📂 Loaded {len(st.session_state.messages)} previous messages from history") + + # Initialize MCP tools (only once) + if "mcp_tools" not in st.session_state: + with st.spinner("🔄 Connecting to MCP server and discovering tools..."): + # Run async function in separate thread with clean event loop + tools = fetch_mcp_tools() + st.session_state.mcp_tools = tools + if tools: + st.success(f"✅ Loaded {len(tools)} tools from MCP server") + else: + st.error("❌ No tools available. Check if MCP server is running.") + + # Sidebar + with st.sidebar: + st.header("⚙️ Configuration") + + # Index selector dropdown + available_indices = _fetch_available_indices() + if available_indices: + default_idx = available_indices.index(ES_INDEX) if ES_INDEX in available_indices else 0 + selected_index = st.selectbox( + "OpenSearch Index", + available_indices, + index=default_idx, + help="Select the index to query. Changing this resets the conversation.", + ) + if selected_index != st.session_state.es_index: + st.session_state.es_index = selected_index + ES_INDEX = selected_index + clear_conversation_history() + st.session_state.pop("messages", None) + st.session_state.pop("mcp_tools", None) + st.rerun() + ES_INDEX = st.session_state.es_index + else: + st.text_input("OpenSearch Index", value=ES_INDEX, disabled=True) + + st.caption(f"**Model:** {MODEL_NAME} | **OpenSearch:** {OPENSEARCH_HOSTS}") + + # Connection status + if "mcp_tools" in st.session_state and st.session_state.mcp_tools: + st.success(f"✅ Connected ({len(st.session_state.mcp_tools)} tools)") + else: + st.warning("⚠️ Not connected") + + st.divider() + + # Conversation info + if "messages" in st.session_state and st.session_state.messages: + st.info(f"💬 **Messages:** {len(st.session_state.messages)}") + st.caption("✅ Context preserved across queries") + if os.path.exists(CONVERSATION_FILE): + import datetime + mod_time = os.path.getmtime(CONVERSATION_FILE) + last_saved = datetime.datetime.fromtimestamp(mod_time).strftime("%Y-%m-%d %H:%M:%S") + st.caption(f"Last saved: {last_saved}") + + st.divider() + + # Debug mode toggle + if "debug_mode" not in st.session_state: + st.session_state.debug_mode = True # Default to True to see what's happening + + st.session_state.debug_mode = st.checkbox( + "🐛 Debug Mode (Show Tool Calls)", + value=st.session_state.debug_mode, + help="Show detailed tool calls and responses for debugging" + ) + + st.divider() + + # Reset button + if st.button("🧹 Reset Conversation", use_container_width=True): + clear_conversation_history() + st.session_state.clear() + st.rerun() + + # Export button + if "messages" in st.session_state and st.session_state.messages: + import json + import datetime + + # Create JSON export + export_data = { + "exported_at": datetime.datetime.now().isoformat(), + "model": MODEL_NAME, + "message_count": len(st.session_state.messages), + "messages": st.session_state.messages + } + + st.download_button( + label="📥 Export Chat History", + data=json.dumps(export_data, indent=2), + file_name=f"chat_history_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json", + mime="application/json", + use_container_width=True + ) + + st.divider() + + # Example queries + st.subheader("📝 Example Queries") + st.caption(f"Queries will search: `{ES_INDEX}`") + + st.markdown(""" + **Discovery Queries:** + - *What fields are available in this index?* + - *Show me 5 sample documents* + - *List all unique values for [field_name]* + + **Data Queries:** + - *Count documents by [field_name]* + - *Show documents from the last 7 days* + - *Filter by [field_name] = [value]* + + **Aggregation Queries:** + - *Total/average/max of [numeric_field]* + - *Group by [field] and sum [numeric_field]* + - *Top 10 [field] by [metric]* + """) + + st.divider() + + # Data integrity reminder + st.warning("⚠️ **Data Integrity**: Always check debug tool calls to verify data is from OpenSearch, not generated") + + # Display chat history + for msg in st.session_state.messages: + with st.chat_message(msg["role"]): + st.markdown(msg["content"]) + + # Chat input + if prompt := st.chat_input("Ask about cloud costs, usage, or policies..."): + # Add user message + st.session_state.messages.append({ + "role": "user", + "content": prompt + }) + + # Display user message + with st.chat_message("user"): + st.markdown(prompt) + + # Get AI response + with st.chat_message("assistant"): + with st.spinner("🤖 Investigating..."): + # Pass previous messages for context (exclude the just-added user message) + previous = st.session_state.messages[:-1] if len(st.session_state.messages) > 1 else [] + response = run_agent_loop_gemini( + prompt, + st.session_state.mcp_tools, + previous_messages=previous + ) + st.markdown(response) + + # Add assistant message to history + st.session_state.messages.append({ + "role": "assistant", + "content": response + }) + + # Save conversation to file + save_conversation_history(st.session_state.messages) + + +if __name__ == "__main__": + main() diff --git a/cloud-governance-mcp/mcp_server.py b/cloud-governance-mcp/mcp_server.py new file mode 100644 index 00000000..8ac6c96d --- /dev/null +++ b/cloud-governance-mcp/mcp_server.py @@ -0,0 +1,636 @@ +""" +Cloud Governance MCP Server +Exposes high-level OpenSearch query tools via Model Context Protocol (stdio transport). +""" + +import json +import os +import sys +from typing import Any + +from dotenv import load_dotenv +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import TextContent, Tool +from opensearchpy import OpenSearch + +load_dotenv() + +OPENSEARCH_HOSTS = os.getenv("OPENSEARCH_HOSTS", "http://localhost:9200") +OPENSEARCH_USERNAME = os.getenv("OPENSEARCH_USERNAME", "") +OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "") + +server = Server("cloud-governance-mcp") + +_client: OpenSearch | None = None +_mapping_cache: dict[str, dict[str, str]] = {} + + +def _get_client() -> OpenSearch: + global _client + if _client is not None: + return _client + + parsed = OPENSEARCH_HOSTS + use_ssl = parsed.startswith("https") + + kwargs: dict[str, Any] = { + "hosts": [OPENSEARCH_HOSTS], + "use_ssl": use_ssl, + "verify_certs": False, + "ssl_show_warn": False, + "timeout": 30, + } + if OPENSEARCH_USERNAME and OPENSEARCH_PASSWORD: + kwargs["http_auth"] = (OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD) + + _client = OpenSearch(**kwargs) + return _client + + +def _get_field_types(index: str) -> dict[str, str]: + """Return {field_name: es_type} for an index, with caching.""" + if index in _mapping_cache: + return _mapping_cache[index] + + client = _get_client() + mappings = client.indices.get_mapping(index=index) + + field_types: dict[str, str] = {} + index_key = list(mappings.keys())[0] + properties = mappings[index_key].get("mappings", {}).get("properties", {}) + + def _extract(props: dict, prefix: str = ""): + for name, meta in props.items(): + full = f"{prefix}{name}" if not prefix else f"{prefix}.{name}" + ftype = meta.get("type", "object") + field_types[full] = ftype + if "properties" in meta: + _extract(meta["properties"], full) + if "fields" in meta: + for sub_name, sub_meta in meta["fields"].items(): + sub_full = f"{full}.{sub_name}" + field_types[sub_full] = sub_meta.get("type", "keyword") + + _extract(properties) + _mapping_cache[index] = field_types + return field_types + + +def _find_field(index: str, field: str) -> str: + """Find the actual field name, handling case-insensitive matching.""" + field_types = _get_field_types(index) + if field in field_types: + return field + field_lower = field.lower() + candidates = [f for f in field_types if f.lower() == field_lower and ".keyword" not in f] + if not candidates: + return field + # Prefer the candidate that has a .keyword sub-field (more useful for queries) + for c in candidates: + if f"{c}.keyword" in field_types: + return c + return candidates[0] + + +def _resolve_field(index: str, field: str) -> str: + """Resolve field name (case-insensitive) and append .keyword for text fields.""" + field_types = _get_field_types(index) + actual = _find_field(index, field) + if actual in field_types and field_types[actual] == "text": + if f"{actual}.keyword" in field_types: + return f"{actual}.keyword" + return actual + + +def _coerce_value(index: str, field: str, value: Any) -> Any: + """Cast value to the correct type based on field mapping.""" + field_types = _get_field_types(index) + actual = _find_field(index, field) + ftype = field_types.get(actual, "") + if ftype in ("integer", "long", "short", "byte"): + try: + return int(value) + except (ValueError, TypeError): + return value + if ftype in ("float", "double", "half_float", "scaled_float"): + try: + return float(value) + except (ValueError, TypeError): + return value + if ftype == "boolean": + if isinstance(value, bool): + return value + return str(value).lower() in ("true", "1", "yes") + # For text/keyword fields, ensure string + if isinstance(value, (int, float)): + return str(value) + return value + + +def _build_bool_query(index: str, filters: list[dict] | None) -> dict: + """Build a bool/must query from a simple filter list.""" + if not filters: + return {"match_all": {}} + + must_clauses = [] + for f in filters: + field = f.get("field", "") + value = f.get("value", "") + resolved = _resolve_field(index, field) + coerced = _coerce_value(index, field, value) + must_clauses.append({"term": {resolved: coerced}}) + + return {"bool": {"must": must_clauses}} + + +def _format_hits(hits: list[dict], fields: list[str] | None = None) -> str: + """Format ES hits as a readable table.""" + if not hits: + return "No documents found." + + sources = [h.get("_source", {}) for h in hits] + + if fields: + display_fields = fields + else: + all_keys: list[str] = [] + for src in sources: + for k in src: + if k not in all_keys: + all_keys.append(k) + display_fields = all_keys[:15] + + header = " | ".join(display_fields) + separator = " | ".join(["---"] * len(display_fields)) + rows = [] + for src in sources: + row = " | ".join(str(src.get(f, "")) for f in display_fields) + rows.append(row) + + table = f"| {header} |\n| {separator} |\n" + table += "\n".join(f"| {r} |" for r in rows) + return table + + +@server.list_tools() +async def list_tools() -> list[Tool]: + return [ + Tool( + name="list_indices", + description="List all OpenSearch indices with document counts", + inputSchema={"type": "object", "properties": {}, "required": []}, + ), + Tool( + name="get_fields", + description="Get field names and types for an index. Call this first to discover available fields.", + inputSchema={ + "type": "object", + "properties": { + "index": {"type": "string", "description": "Index name to inspect"}, + }, + "required": ["index"], + }, + ), + Tool( + name="search_documents", + description=( + "Search documents with simple filters. Filters are AND-ed. " + "No need to use .keyword suffix — it is handled automatically." + ), + inputSchema={ + "type": "object", + "properties": { + "index": {"type": "string", "description": "Index name"}, + "filters": { + "type": "array", + "description": 'List of filters, e.g. [{"field": "status", "value": "active"}]', + "items": { + "type": "object", + "properties": { + "field": {"type": "string"}, + "value": {"type": "string"}, + }, + "required": ["field", "value"], + }, + }, + "fields": { + "type": "array", + "description": "Fields to include in results (optional, defaults to all)", + "items": {"type": "string"}, + }, + "size": { + "type": "integer", + "description": "Number of results to return (default 10)", + }, + "sort_by": { + "type": "object", + "description": 'Sort results, e.g. {"field": "timestamp", "order": "desc"}', + "properties": { + "field": {"type": "string"}, + "order": {"type": "string", "enum": ["asc", "desc"]}, + }, + }, + }, + "required": ["index"], + }, + ), + Tool( + name="count_by_field", + description=( + "Count documents grouped by a field (terms aggregation). " + "No need to use .keyword suffix — it is handled automatically." + ), + inputSchema={ + "type": "object", + "properties": { + "index": {"type": "string", "description": "Index name"}, + "group_by": {"type": "string", "description": "Field to group by"}, + "filters": { + "type": "array", + "description": "Optional filters to narrow results", + "items": { + "type": "object", + "properties": { + "field": {"type": "string"}, + "value": {"type": "string"}, + }, + "required": ["field", "value"], + }, + }, + "top_n": { + "type": "integer", + "description": "Number of top groups to return (default 20)", + }, + }, + "required": ["index", "group_by"], + }, + ), + Tool( + name="aggregate", + description=( + "Compute a metric (sum, avg, max, min) on a numeric field, grouped by another field. " + "No need to use .keyword suffix — it is handled automatically." + ), + inputSchema={ + "type": "object", + "properties": { + "index": {"type": "string", "description": "Index name"}, + "group_by": {"type": "string", "description": "Field to group by"}, + "metric_field": {"type": "string", "description": "Numeric field to aggregate"}, + "metric_type": { + "type": "string", + "enum": ["sum", "avg", "max", "min"], + "description": "Aggregation type", + }, + "filters": { + "type": "array", + "description": "Optional filters", + "items": { + "type": "object", + "properties": { + "field": {"type": "string"}, + "value": {"type": "string"}, + }, + "required": ["field", "value"], + }, + }, + "top_n": { + "type": "integer", + "description": "Number of top groups to return (default 20)", + }, + }, + "required": ["index", "group_by", "metric_field", "metric_type"], + }, + ), + Tool( + name="date_range_search", + description="Search documents within a date range, with optional filters.", + inputSchema={ + "type": "object", + "properties": { + "index": {"type": "string", "description": "Index name"}, + "date_field": {"type": "string", "description": "Date field name (e.g. timestamp, SnapshotDate)"}, + "gte": {"type": "string", "description": "Start date (yyyy-MM-dd)"}, + "lte": {"type": "string", "description": "End date (yyyy-MM-dd)"}, + "filters": { + "type": "array", + "description": "Optional additional filters", + "items": { + "type": "object", + "properties": { + "field": {"type": "string"}, + "value": {"type": "string"}, + }, + "required": ["field", "value"], + }, + }, + "size": { + "type": "integer", + "description": "Number of results (default 10)", + }, + }, + "required": ["index", "date_field", "gte", "lte"], + }, + ), + Tool( + name="raw_search", + description="Execute a raw OpenSearch Query DSL query. Use only for complex queries that don't fit the other tools.", + inputSchema={ + "type": "object", + "properties": { + "index": {"type": "string", "description": "Index name"}, + "query_body": { + "type": "object", + "description": "Full OpenSearch Query DSL body", + }, + }, + "required": ["index", "query_body"], + }, + ), + ] + + +@server.call_tool() +async def call_tool(name: str, arguments: dict) -> list[TextContent]: + try: + result = _dispatch(name, arguments) + return [TextContent(type="text", text=result)] + except Exception as e: + return [TextContent(type="text", text=f"Error in {name}: {type(e).__name__}: {e}")] + + +def _dispatch(name: str, args: dict) -> str: + if name == "list_indices": + return _tool_list_indices() + elif name == "get_fields": + return _tool_get_fields(args["index"]) + elif name == "search_documents": + return _tool_search_documents( + index=args["index"], + filters=args.get("filters"), + fields=args.get("fields"), + size=args.get("size", 10), + sort_by=args.get("sort_by"), + ) + elif name == "count_by_field": + return _tool_count_by_field( + index=args["index"], + group_by=args["group_by"], + filters=args.get("filters"), + top_n=args.get("top_n", 20), + ) + elif name == "aggregate": + return _tool_aggregate( + index=args["index"], + group_by=args["group_by"], + metric_field=args["metric_field"], + metric_type=args["metric_type"], + filters=args.get("filters"), + top_n=args.get("top_n", 20), + ) + elif name == "date_range_search": + return _tool_date_range_search( + index=args["index"], + date_field=args["date_field"], + gte=args["gte"], + lte=args["lte"], + filters=args.get("filters"), + size=args.get("size", 10), + ) + elif name == "raw_search": + return _tool_raw_search(args["index"], args["query_body"]) + else: + return f"Unknown tool: {name}" + + +def _tool_list_indices() -> str: + client = _get_client() + indices = client.cat.indices(format="json") + if not indices: + return "No indices found." + + lines = ["| Index | Docs | Size |", "| --- | --- | --- |"] + for idx in sorted(indices, key=lambda x: x.get("index", "")): + lines.append(f"| {idx.get('index', '')} | {idx.get('docs.count', '0')} | {idx.get('store.size', 'N/A')} |") + return "\n".join(lines) + + +def _tool_get_fields(index: str) -> str: + _mapping_cache.pop(index, None) + field_types = _get_field_types(index) + if not field_types: + return f"No fields found in index '{index}'." + + numeric_types = {"integer", "long", "short", "byte", "float", "double", "half_float", "scaled_float"} + # Skip .keyword sub-fields for cleaner output + top_level = {f: t for f, t in field_types.items() if ".keyword" not in f} + + lines = ["| Field | Type | Aggregatable |", "| --- | --- | --- |"] + for field, ftype in sorted(top_level.items()): + agg = "Yes (numeric)" if ftype in numeric_types else "Yes (keyword)" if ftype == "keyword" else "" + lines.append(f"| {field} | {ftype} | {agg} |") + + numeric_fields = [f for f, t in top_level.items() if t in numeric_types] + if numeric_fields: + lines.append(f"\nNumeric fields for aggregation (sum/avg/max/min): {', '.join(numeric_fields)}") + return "\n".join(lines) + + +def _tool_search_documents( + index: str, + filters: list[dict] | None = None, + fields: list[str] | None = None, + size: int = 10, + sort_by: dict | None = None, +) -> str: + client = _get_client() + query = _build_bool_query(index, filters) + + body: dict[str, Any] = {"query": query, "size": size} + if sort_by: + sort_field = _resolve_field(index, sort_by.get("field", "")) + body["sort"] = [{sort_field: {"order": sort_by.get("order", "desc")}}] + if fields: + body["_source"] = fields + + response = client.search(index=index, body=body) + total = response["hits"]["total"]["value"] + hits = response["hits"]["hits"] + + table = _format_hits(hits, fields) + result = f"Total matching documents: {total}\nShowing {len(hits)} results:\n\n{table}" + if total == 0 and filters: + result += f"\n\nDebug - query sent: {json.dumps(body, default=str)}" + return result + + +def _tool_count_by_field( + index: str, + group_by: str, + filters: list[dict] | None = None, + top_n: int = 20, +) -> str: + client = _get_client() + query = _build_bool_query(index, filters) + resolved_group = _resolve_field(index, group_by) + + body: dict[str, Any] = { + "size": 0, + "query": query, + "aggs": { + "group_count": { + "terms": {"field": resolved_group, "size": top_n} + } + }, + } + + response = client.search(index=index, body=body) + total = response["hits"]["total"]["value"] + buckets = response.get("aggregations", {}).get("group_count", {}).get("buckets", []) + + if not buckets: + field_types = _get_field_types(index) + actual_group = _find_field(index, group_by) + msg = f"No results found. Total documents matching filters: {total}" + if total > 0 and field_types.get(actual_group) == "text" and f"{actual_group}.keyword" not in field_types: + msg += f"\nWARNING: '{actual_group}' is a text field without a .keyword sub-field. Aggregations on analyzed text fields return empty results. Try a different field." + msg += f"\nDebug - query sent: {json.dumps(body, default=str)}" + return msg + + lines = [f"| {group_by} | Count |", "| --- | --- |"] + for b in buckets: + lines.append(f"| {b['key']} | {b['doc_count']} |") + + other = response.get("aggregations", {}).get("group_count", {}).get("sum_other_doc_count", 0) + footer = f"\nTotal matching documents: {total}" + if other > 0: + footer += f"\n(+{other} documents in other groups not shown)" + return "\n".join(lines) + footer + + +def _tool_aggregate( + index: str, + group_by: str, + metric_field: str, + metric_type: str, + filters: list[dict] | None = None, + top_n: int = 20, +) -> str: + client = _get_client() + query = _build_bool_query(index, filters) + resolved_group = _resolve_field(index, group_by) + resolved_metric = _find_field(index, metric_field) + + body: dict[str, Any] = { + "size": 0, + "query": query, + "aggs": { + "group_agg": { + "terms": {"field": resolved_group, "size": top_n}, + "aggs": { + "metric_value": {metric_type: {"field": resolved_metric}} + }, + } + }, + } + + response = client.search(index=index, body=body) + total = response["hits"]["total"]["value"] + buckets = response.get("aggregations", {}).get("group_agg", {}).get("buckets", []) + + if not buckets: + msg = f"No results found. Total documents matching filters: {total}" + msg += f"\nDebug - query sent: {json.dumps(body, default=str)}" + return msg + + lines = [f"| {group_by} | {metric_type}({metric_field}) | Count |", "| --- | --- | --- |"] + all_zero = True + for b in buckets: + val = b.get("metric_value", {}).get("value", 0) + if val and val != 0: + all_zero = False + formatted = f"{val:,.2f}" if isinstance(val, float) else str(val) + lines.append(f"| {b['key']} | {formatted} | {b['doc_count']} |") + + result = "\n".join(lines) + f"\nTotal matching documents: {total}" + + if all_zero and total > 0: + field_types = _get_field_types(index) + numeric_types = {"integer", "long", "short", "byte", "float", "double", "half_float", "scaled_float"} + numeric_fields = [f for f, t in field_types.items() if t in numeric_types and ".keyword" not in f] + result += ( + f"\n\nWARNING: All {metric_type} values are 0. " + f"The field '{metric_field}' may not exist or may not be numeric. " + f"Available numeric fields: {', '.join(numeric_fields)}" + f"\nDebug - query sent: {json.dumps(body, default=str)}" + ) + + return result + + +def _tool_date_range_search( + index: str, + date_field: str, + gte: str, + lte: str, + filters: list[dict] | None = None, + size: int = 10, +) -> str: + client = _get_client() + + must_clauses = [] + if filters: + for f in filters: + field = f.get("field", "") + resolved = _resolve_field(index, field) + coerced = _coerce_value(index, field, f.get("value", "")) + must_clauses.append({"term": {resolved: coerced}}) + + field_types = _get_field_types(index) + resolved_date = _find_field(index, date_field) + ftype = field_types.get(resolved_date, "") + + if ftype == "text": + kw = f"{resolved_date}.keyword" + range_field = kw if kw in field_types else resolved_date + range_clause = {"gte": gte, "lte": lte} + else: + range_field = resolved_date + range_clause = {"gte": gte, "lte": lte, "format": "yyyy-MM-dd"} + + must_clauses.append({"range": {range_field: range_clause}}) + + sort_field = range_field + body: dict[str, Any] = { + "query": {"bool": {"must": must_clauses}}, + "size": size, + "sort": [{sort_field: {"order": "desc"}}], + } + + response = client.search(index=index, body=body) + total = response["hits"]["total"]["value"] + hits = response["hits"]["hits"] + + table = _format_hits(hits) + return f"Total matching documents: {total}\nShowing {len(hits)} results:\n\n{table}" + + +def _tool_raw_search(index: str, query_body: dict) -> str: + client = _get_client() + if "size" not in query_body: + query_body["size"] = 100 + elif query_body.get("size", 0) > 1000: + query_body["size"] = 1000 + response = client.search(index=index, body=query_body, request_timeout=30) + return json.dumps(response, indent=2, default=str) + + +async def main(): + async with stdio_server() as (read_stream, write_stream): + await server.run(read_stream, write_stream, server.create_initialization_options()) + + +if __name__ == "__main__": + import asyncio + asyncio.run(main()) diff --git a/cloud-governance-mcp/requirements.txt b/cloud-governance-mcp/requirements.txt new file mode 100644 index 00000000..b6b2b901 --- /dev/null +++ b/cloud-governance-mcp/requirements.txt @@ -0,0 +1,8 @@ +elasticsearch>=8.10.0 +google-genai>=0.3.0 +mcp[all]>=1.0.0 +nest-asyncio>=1.6.0 +opensearch-py>=2.4.0 +python-dotenv>=1.0.0 +requests>=2.31.0 +streamlit>=1.31.0 diff --git a/cloud-governance-mcp/run_agent.sh b/cloud-governance-mcp/run_agent.sh new file mode 100755 index 00000000..a28e26e1 --- /dev/null +++ b/cloud-governance-mcp/run_agent.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +set -e + +echo "Starting Cloud Governance AI Agent..." + +# Check if .env file exists +if [ ! -f .env ]; then + echo "❌ Error: .env file not found" + echo "Please copy .env.example to .env and configure it:" + echo " cp .env.example .env" + echo " vi .env" + exit 1 +fi + +# Setup Python virtual environment +if [ ! -d ".venv" ]; then + echo "Creating virtual environment..." + python3 -m venv .venv +fi + +# Activate virtual environment +echo "Activating virtual environment..." +source .venv/bin/activate + +# Install/update dependencies +echo "Installing dependencies..." +python -m pip install -U pip > /dev/null +python -m pip install -r requirements.txt + +# Stop existing Streamlit processes on port 8501 +echo "Stopping existing Streamlit processes..." +pids="" +for pid in $(lsof -ti tcp:8501 2>/dev/null); do + if ps -o cmd= -p "$pid" 2>/dev/null | grep -q "streamlit"; then + pids="$pids $pid" + fi +done +if [ -n "$pids" ]; then + kill $pids 2>/dev/null || true + sleep 1 + kill -9 $pids 2>/dev/null || true +fi + +# Configure Streamlit (skip telemetry prompt) +mkdir -p ~/.streamlit +cat > ~/.streamlit/config.toml < streamlit.log 2>&1 & + +# Wait for Streamlit to start +sleep 3 + +# Check if Streamlit is running +if lsof -i :8501 > /dev/null 2>&1; then + echo "✅ Streamlit started successfully" + echo "" + echo "🌐 Access the UI at: http://localhost:8501" + echo "📝 View logs: tail -f streamlit.log" + echo "🛑 Stop server: kill \$(lsof -ti tcp:8501)" +else + echo "❌ Failed to start Streamlit" + echo "Check logs: cat streamlit.log" + exit 1 +fi