diff --git a/contributing/samples/bigquery_agent_analytics_demo/__init__.py b/contributing/samples/bigquery_agent_analytics_demo/__init__.py new file mode 100644 index 0000000000..adc2a026a4 --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""BigQuery agent analytics demo agent package.""" + +from . import agent diff --git a/contributing/samples/bigquery_agent_analytics_demo/agent.py b/contributing/samples/bigquery_agent_analytics_demo/agent.py new file mode 100644 index 0000000000..b3042a1007 --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/agent.py @@ -0,0 +1,131 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +"""Minimal agent wired to the BigQuery Agent Analytics plugin.""" + +import os +from typing import Dict +from typing import List + +from google.adk import Agent +from google.adk.apps import App +from google.adk.models.google_llm import Gemini +from google.adk.plugins.bigquery_agent_analytics_plugin import ( + BigQueryAgentAnalyticsPlugin, + BigQueryLoggerConfig, +) +from google.adk.tools.function_tool import FunctionTool +from google.adk.tools.google_search_agent_tool import ( + GoogleSearchAgentTool, + create_google_search_agent, +) + +PROJECT_ID = os.getenv("BQ_AGENT_ANALYTICS_PROJECT") +DATASET_ID = os.getenv("BQ_AGENT_ANALYTICS_DATASET") +TABLE_ID = os.getenv("BQ_AGENT_ANALYTICS_TABLE", "agent_events") + +# Default Vertex AI settings if env vars are not provided. +os.environ.setdefault("VERTEXAI_PROJECT", "test-project-0728-467323") +os.environ.setdefault("VERTEXAI_LOCATION", "us-central1") +# google.genai expects these env vars to auto-switch to Vertex AI with ADC. +os.environ.setdefault("GOOGLE_GENAI_USE_VERTEXAI", "true") +os.environ.setdefault("GOOGLE_CLOUD_PROJECT", "test-project-0728-467323") +os.environ.setdefault("GOOGLE_CLOUD_LOCATION", "us-central1") + +if not PROJECT_ID or not DATASET_ID: + raise ValueError( + "Set BQ_AGENT_ANALYTICS_PROJECT and BQ_AGENT_ANALYTICS_DATASET before " + "running this agent to enable BigQuery analytics logging." + ) + +analytics_plugin = BigQueryAgentAnalyticsPlugin( + project_id=PROJECT_ID, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + config=BigQueryLoggerConfig( + event_allowlist=[ + "USER_MESSAGE_RECEIVED", + "LLM_REQUEST", + "LLM_RESPONSE", + "TOOL_STARTING", + "TOOL_COMPLETED", + "MODEL_RESPONSE", + "TOOL_CALL", + "TOOL_RESULT", + "ERROR", + ], + max_content_length=400, + ), +) + +def pick_city(top_n: int = 3) -> List[str]: + """Return a short list of recommended cities.""" + cities = ["Tokyo", "Paris", "New York", "Sydney", "Singapore", "Toronto"] + return cities[:top_n] + +def city_highlights(city: str) -> Dict[str, str]: + """Return quick highlights for a city.""" + highlights = { + "Tokyo": "Sushi, Akihabara tech, efficient transit.", + "Paris": "Museums, pastries, walkable boulevards.", + "New York": "Broadway, diverse food, skyline views.", + "Sydney": "Harbour, beaches, outdoor cafes.", + "Singapore": "Hawker food, gardens, clean and safe.", + "Toronto": "CN Tower, neighbourhood food, waterfront.", + } + return {"city": city, "highlights": highlights.get(city, "Explore freely.")} + +def estimate_trip_budget(city: str, days: int, budget_per_day: float) -> Dict[str, str]: + """Rough budget calculator.""" + total = days * budget_per_day + return { + "city": city, + "days": str(days), + "budget_per_day": f"${budget_per_day:,.0f}", + "estimated_total": f"${total:,.0f}", + "note": "Assumes lodging+food+local transit; adjust for flights.", + } + +city_tool = FunctionTool(pick_city) +city_highlights_tool = FunctionTool(city_highlights) +budget_tool = FunctionTool(estimate_trip_budget) + +gemini = Gemini(model="gemini-2.5-flash") +search_agent = create_google_search_agent(model=gemini) +google_search_tool = GoogleSearchAgentTool(agent=search_agent) + +root_agent = Agent( + name="bq_agent_analytics_demo", + model=gemini, + instruction=( + "You are a concise assistant. Prefer to use tools when asked for trip " + "ideas, highlights, or cost estimates. Keep answers short and " + "actionable." + ), + description="A minimal agent that logs events to BigQuery.", + tools=[ + city_tool, + city_highlights_tool, + budget_tool, + google_search_tool, + ], +) + +app = App( + name="bq_agent_analytics_demo", + root_agent=root_agent, + plugins=[analytics_plugin], +) diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/Dockerfile b/contributing/samples/bigquery_agent_analytics_demo/dashboard/Dockerfile new file mode 100644 index 0000000000..f2774427e7 --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/Dockerfile @@ -0,0 +1,45 @@ +# Agent Analytics Dashboard Docker Image +# +# Build: +# docker build -t agent-analytics-dashboard . +# +# Run: +# docker run -p 8080:8080 \ +# -e BQ_AGENT_ANALYTICS_PROJECT=your-project \ +# -e BQ_AGENT_ANALYTICS_DATASET=your-dataset \ +# -v ~/.config/gcloud:/root/.config/gcloud \ +# agent-analytics-dashboard + +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY app.py . +COPY __init__.py . + +# Create non-root user for security +RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app +USER appuser + +# Expose port +EXPOSE 8080 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8080/api/health || exit 1 + +# Run the application +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/README.md b/contributing/samples/bigquery_agent_analytics_demo/dashboard/README.md new file mode 100644 index 0000000000..b1a482182e --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/README.md @@ -0,0 +1,234 @@ +# Agent Analytics Dashboard + +A real-time monitoring dashboard for ADK agent behavior analytics stored in BigQuery. This dashboard provides comprehensive insights into agent performance, tool usage, error tracking, and session analysis. + +## Features + +- **Overview Dashboard**: Key metrics including sessions, events, error rates, latency, and token usage +- **Session Explorer**: Browse and inspect individual conversation sessions +- **Tool Performance**: Analyze tool invocation rates, success/failure metrics, and latency +- **LLM Metrics**: Track LLM requests, tokens, time-to-first-token, and latency trends +- **Error Tracking**: Aggregate error summaries with affected session counts +- **Real-time Updates**: Auto-refresh every 30 seconds + +## Architecture + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ ADK Agent │────▶│ BigQuery │◀────│ Dashboard │ +│ + Plugin │ │ Analytics │ │ (FastAPI) │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ + │ │ │ + │ │ │ + ▼ ▼ ▼ + Agent Events Time-Partitioned Interactive UI + (LLM, Tools) Clustered Table with Charts +``` + +## Prerequisites + +1. **Google Cloud Project** with BigQuery API enabled +2. **BigQuery Dataset** (the plugin auto-creates tables) +3. **Authentication**: Application Default Credentials (ADC) configured +4. **Python 3.11+** + +## Quick Start + +### 1. Set Environment Variables + +```bash +export BQ_AGENT_ANALYTICS_PROJECT="your-gcp-project-id" +export BQ_AGENT_ANALYTICS_DATASET="your-bigquery-dataset" +export BQ_AGENT_ANALYTICS_TABLE="agent_events_v2" # optional, default + +# For Vertex AI (if using Gemini models) +export VERTEXAI_PROJECT="your-gcp-project-id" +export VERTEXAI_LOCATION="us-central1" +export GOOGLE_GENAI_USE_VERTEXAI="true" +``` + +### 2. Install Dependencies + +```bash +cd contributing/samples/bigquery_agent_analytics_demo/dashboard +pip install -r requirements.txt +``` + +### 3. Run the Dashboard + +```bash +# Option 1: Using uvicorn directly +uvicorn app:app --reload --port 8080 + +# Option 2: Using Python +python app.py +``` + +### 4. Open in Browser + +Navigate to: http://localhost:8080 + +## Generating Sample Data + +Use the simulation script to populate your BigQuery table with realistic agent data: + +```bash +# Simulate 10 conversation sessions +python simulate_agent_data.py --num-sessions 10 + +# Simulate with more prompts per session +python simulate_agent_data.py --num-sessions 20 --min-prompts 3 --max-prompts 8 + +# Generate historical data (multiple days worth) +python simulate_agent_data.py --historical --days 7 --sessions-per-day 50 + +# Disable error simulation +python simulate_agent_data.py --num-sessions 10 --no-errors +``` + +## API Endpoints + +| Endpoint | Description | +|----------|-------------| +| `GET /` | Dashboard HTML UI | +| `GET /api/health` | Health check | +| `GET /api/config` | Current configuration | +| `GET /api/overview?hours=24` | Overview metrics | +| `GET /api/events/types?hours=24` | Event type distribution | +| `GET /api/events/timeline?hours=24` | Events over time | +| `GET /api/sessions?hours=24&limit=50` | Session list | +| `GET /api/sessions/{session_id}/events` | Session event details | +| `GET /api/tools/metrics?hours=24` | Tool performance | +| `GET /api/llm/metrics?hours=24` | LLM performance | +| `GET /api/llm/latency-timeline?hours=24` | LLM latency over time | +| `GET /api/errors?hours=24` | Error summary | +| `GET /api/agents?hours=24` | Per-agent metrics | +| `GET /api/trace/{trace_id}` | Trace event details | + +## BigQuery Table Schema + +The dashboard reads from a table with this schema (auto-created by the plugin): + +```sql +CREATE TABLE `project.dataset.agent_events_v2` ( + timestamp TIMESTAMP NOT NULL, + event_type STRING, + agent STRING, + session_id STRING, + invocation_id STRING, + user_id STRING, + trace_id STRING, + span_id STRING, + parent_span_id STRING, + content JSON, + content_parts ARRAY, + text STRING, + part_index INT64, + part_attributes STRING, + storage_mode STRING + >>, + attributes JSON, + latency_ms JSON, + status STRING, + error_message STRING, + is_truncated BOOL +) +PARTITION BY DATE(timestamp) +CLUSTER BY event_type, agent, user_id; +``` + +## Deployment Options + +### Local Development + +```bash +uvicorn app:app --reload --port 8080 +``` + +### Docker + +```dockerfile +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY app.py . +EXPOSE 8080 +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"] +``` + +```bash +docker build -t agent-analytics-dashboard . +docker run -p 8080:8080 \ + -e BQ_AGENT_ANALYTICS_PROJECT=your-project \ + -e BQ_AGENT_ANALYTICS_DATASET=your-dataset \ + -e GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json \ + -v /path/to/credentials.json:/path/to/credentials.json \ + agent-analytics-dashboard +``` + +### Cloud Run + +```bash +# Build and deploy +gcloud builds submit --tag gcr.io/YOUR_PROJECT/agent-analytics-dashboard + +gcloud run deploy agent-analytics-dashboard \ + --image gcr.io/YOUR_PROJECT/agent-analytics-dashboard \ + --platform managed \ + --region us-central1 \ + --set-env-vars BQ_AGENT_ANALYTICS_PROJECT=your-project,BQ_AGENT_ANALYTICS_DATASET=your-dataset \ + --allow-unauthenticated +``` + +## Customization + +### Adding Custom Metrics + +Edit `app.py` to add new endpoints: + +```python +@app.get("/api/custom/metric") +async def get_custom_metric(hours: int = 24): + client = get_bq_client() + query = f""" + SELECT + -- Your custom aggregation + FROM {get_full_table_id()} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + """ + result = client.query(query).result() + return [dict(row) for row in result] +``` + +### Modifying the UI + +The dashboard UI is embedded in `get_dashboard_html()`. Modify the HTML/JavaScript to: +- Add new charts (Chart.js) +- Change layouts (Tailwind CSS) +- Add new tabs or panels + +## Troubleshooting + +### "BigQuery configuration not set" +Ensure `BQ_AGENT_ANALYTICS_PROJECT` and `BQ_AGENT_ANALYTICS_DATASET` environment variables are set. + +### "Permission denied" +Ensure your credentials have `roles/bigquery.dataViewer` and `roles/bigquery.jobUser` on the project/dataset. + +### No data showing +1. Check that the table exists in BigQuery +2. Verify events are being logged (run the simulation script) +3. Adjust the time range filter in the UI + +### Slow queries +- Ensure the table is partitioned by timestamp +- Add clustering on frequently filtered columns +- Consider materializing views for complex aggregations + +## License + +Apache 2.0 - See LICENSE file diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/__init__.py b/contributing/samples/bigquery_agent_analytics_demo/dashboard/__init__.py new file mode 100644 index 0000000000..4a73cec680 --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Agent Analytics Dashboard Package.""" diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/app.py b/contributing/samples/bigquery_agent_analytics_demo/dashboard/app.py new file mode 100644 index 0000000000..fe0d895790 --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/app.py @@ -0,0 +1,2116 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""FastAPI Agent Analytics Dashboard. + +A comprehensive monitoring dashboard for agent behavior analytics stored in +BigQuery. Provides real-time insights into agent performance, tool usage, +error tracking, and session analysis. + +Usage: + uvicorn dashboard.app:app --reload --port 8080 + +Environment Variables: + BQ_AGENT_ANALYTICS_PROJECT: Google Cloud project ID + BQ_AGENT_ANALYTICS_DATASET: BigQuery dataset ID + BQ_AGENT_ANALYTICS_TABLE: BigQuery table name (default: agent_events_v2) +""" + +from __future__ import annotations + +import json +import os +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from typing import Any +from typing import Optional + +from fastapi import FastAPI +from fastapi import HTTPException +from fastapi import Query +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import HTMLResponse +from fastapi.staticfiles import StaticFiles +from google.cloud import bigquery +from pydantic import BaseModel +from pydantic import Field + + +# Configuration from environment +PROJECT_ID = os.getenv("BQ_AGENT_ANALYTICS_PROJECT") +DATASET_ID = os.getenv("BQ_AGENT_ANALYTICS_DATASET") +TABLE_ID = os.getenv("BQ_AGENT_ANALYTICS_TABLE", "agent_events_v2") + +app = FastAPI( + title="Agent Analytics Dashboard", + description="Real-time monitoring dashboard for ADK agent behavior", + version="1.0.0", +) + +# Enable CORS for frontend development +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +def get_bq_client() -> bigquery.Client: + """Get or create BigQuery client.""" + if not PROJECT_ID: + raise HTTPException( + status_code=500, + detail="BQ_AGENT_ANALYTICS_PROJECT environment variable not set", + ) + return bigquery.Client(project=PROJECT_ID) + + +def get_full_table_id() -> str: + """Get full BigQuery table ID.""" + if not PROJECT_ID or not DATASET_ID: + raise HTTPException( + status_code=500, + detail="BigQuery configuration not set. Please set " + "BQ_AGENT_ANALYTICS_PROJECT and BQ_AGENT_ANALYTICS_DATASET", + ) + return f"`{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`" + + +# ============================================================================= +# Pydantic Models +# ============================================================================= + + +class TimeRange(BaseModel): + """Time range for queries.""" + + start: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc) - timedelta(hours=24) + ) + end: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class OverviewMetrics(BaseModel): + """Overview dashboard metrics.""" + + total_sessions: int + total_invocations: int + total_events: int + error_rate: float + avg_latency_ms: float + total_tokens: int + unique_users: int + unique_agents: int + + +class EventTypeCount(BaseModel): + """Event type count.""" + + event_type: Optional[str] + count: int + percentage: float + + +class ToolMetrics(BaseModel): + """Tool usage metrics.""" + + tool_name: str + invocations: int + success_count: int + error_count: int + success_rate: float + avg_latency_ms: float + + +class SessionSummary(BaseModel): + """Session summary.""" + + session_id: Optional[str] + user_id: Optional[str] + agent: Optional[str] + start_time: datetime + end_time: Optional[datetime] + duration_seconds: Optional[float] + event_count: int + error_count: int + status: str + + +class EventDetail(BaseModel): + """Detailed event information.""" + + timestamp: datetime + event_type: Optional[str] + agent: Optional[str] + session_id: Optional[str] + invocation_id: Optional[str] + span_id: Optional[str] + parent_span_id: Optional[str] + content: Optional[dict] + latency_ms: Optional[dict] + status: str + error_message: Optional[str] + + +class ErrorSummary(BaseModel): + """Error summary.""" + + event_type: str + error_message: str + count: int + last_occurrence: datetime + affected_sessions: int + + +class LLMMetrics(BaseModel): + """LLM performance metrics.""" + + total_requests: int + total_responses: int + error_count: int + avg_latency_ms: float + avg_time_to_first_token_ms: Optional[float] + total_prompt_tokens: int + total_completion_tokens: int + total_tokens: int + avg_tokens_per_request: float + + +class TimeSeriesPoint(BaseModel): + """Time series data point.""" + + timestamp: datetime + value: float + label: Optional[str] = None + + +class LatencyPercentiles(BaseModel): + """Latency percentile metrics.""" + + min_ms: float + max_ms: float + avg_ms: float + median_ms: float + p90_ms: float + p95_ms: float + p99_ms: float + std_dev_ms: float + sample_count: int + + +class LatencyByCategory(BaseModel): + """Latency breakdown by category.""" + + category: str + count: int + avg_ms: float + min_ms: float + max_ms: float + p50_ms: float + p90_ms: float + p95_ms: float + + +class SlowestRequest(BaseModel): + """Details of a slow request.""" + + timestamp: datetime + event_type: Optional[str] + agent: Optional[str] + session_id: Optional[str] + latency_ms: float + content_preview: Optional[str] + + +class LatencyDistributionBucket(BaseModel): + """Latency distribution histogram bucket.""" + + bucket_start_ms: int + bucket_end_ms: int + count: int + percentage: float + + +# ============================================================================= +# API Endpoints +# ============================================================================= + + +@app.get("/api/health") +async def health_check(): + """Health check endpoint.""" + return {"status": "healthy", "timestamp": datetime.now(timezone.utc)} + + +@app.get("/api/config") +async def get_config(): + """Get current configuration.""" + return { + "project_id": PROJECT_ID, + "dataset_id": DATASET_ID, + "table_id": TABLE_ID, + "configured": bool(PROJECT_ID and DATASET_ID), + } + + +@app.get("/api/overview", response_model=OverviewMetrics) +async def get_overview_metrics( + hours: int = Query(default=24, ge=1, le=720), +) -> OverviewMetrics: + """Get overview dashboard metrics.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH base_data AS ( + SELECT + session_id, + invocation_id, + event_type, + user_id, + agent, + status, + JSON_VALUE(latency_ms, '$.total_ms') AS latency, + COALESCE( + SAFE_CAST(JSON_VALUE(attributes, '$.usage_metadata.total_token_count') AS INT64), + SAFE_CAST(JSON_VALUE(content, '$.usage.total') AS INT64) + ) AS tokens + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND event_type IN ( + 'USER_MESSAGE_RECEIVED', 'INVOCATION_STARTING', 'INVOCATION_COMPLETED', + 'AGENT_STARTING', 'AGENT_COMPLETED', 'LLM_REQUEST', 'LLM_RESPONSE', + 'LLM_ERROR', 'TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR' + ) + ) + SELECT + COUNT(DISTINCT session_id) AS total_sessions, + COUNT(DISTINCT invocation_id) AS total_invocations, + COUNT(*) AS total_events, + SAFE_DIVIDE( + COUNTIF(status = 'ERROR'), + COUNT(*) + ) * 100 AS error_rate, + AVG(SAFE_CAST(latency AS FLOAT64)) AS avg_latency_ms, + SUM(tokens) AS total_tokens, + COUNT(DISTINCT user_id) AS unique_users, + COUNT(DISTINCT agent) AS unique_agents + FROM base_data + """ + + result = client.query(query).result() + row = list(result)[0] + + return OverviewMetrics( + total_sessions=row.total_sessions or 0, + total_invocations=row.total_invocations or 0, + total_events=row.total_events or 0, + error_rate=row.error_rate or 0.0, + avg_latency_ms=row.avg_latency_ms or 0.0, + total_tokens=row.total_tokens or 0, + unique_users=row.unique_users or 0, + unique_agents=row.unique_agents or 0, + ) + + +@app.get("/api/events/types", response_model=list[EventTypeCount]) +async def get_event_type_counts( + hours: int = Query(default=24, ge=1, le=720), +) -> list[EventTypeCount]: + """Get event counts by type.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH counts AS ( + SELECT + event_type, + COUNT(*) AS count + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + GROUP BY event_type + ), + total AS ( + SELECT SUM(count) AS total FROM counts + ) + SELECT + c.event_type, + c.count, + SAFE_DIVIDE(c.count, t.total) * 100 AS percentage + FROM counts c, total t + ORDER BY c.count DESC + """ + + result = client.query(query).result() + return [ + EventTypeCount( + event_type=row.event_type, + count=row.count, + percentage=row.percentage or 0.0, + ) + for row in result + ] + + +@app.get("/api/events/timeline", response_model=list[TimeSeriesPoint]) +async def get_event_timeline( + hours: int = Query(default=24, ge=1, le=720), + interval_minutes: int = Query(default=60, ge=5, le=1440), +) -> list[TimeSeriesPoint]: + """Get event count timeline.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + SELECT + TIMESTAMP_TRUNC(timestamp, MINUTE) AS bucket, + COUNT(*) AS count + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + GROUP BY bucket + ORDER BY bucket + """ + + result = client.query(query).result() + return [ + TimeSeriesPoint(timestamp=row.bucket, value=float(row.count)) + for row in result + ] + + +@app.get("/api/tools/metrics", response_model=list[ToolMetrics]) +async def get_tool_metrics( + hours: int = Query(default=24, ge=1, le=720), +) -> list[ToolMetrics]: + """Get tool usage metrics.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH tool_events AS ( + SELECT + JSON_VALUE(content, '$.tool') AS tool_name, + event_type, + status, + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND event_type IN ('TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR') + ) + SELECT + tool_name, + COUNTIF(event_type = 'TOOL_STARTING') AS invocations, + COUNTIF(event_type = 'TOOL_COMPLETED') AS success_count, + COUNTIF(event_type = 'TOOL_ERROR') AS error_count, + SAFE_DIVIDE( + COUNTIF(event_type = 'TOOL_COMPLETED'), + COUNTIF(event_type IN ('TOOL_COMPLETED', 'TOOL_ERROR')) + ) * 100 AS success_rate, + AVG(CASE WHEN event_type = 'TOOL_COMPLETED' THEN latency END) AS avg_latency_ms + FROM tool_events + WHERE tool_name IS NOT NULL + GROUP BY tool_name + ORDER BY invocations DESC + """ + + result = client.query(query).result() + return [ + ToolMetrics( + tool_name=row.tool_name, + invocations=row.invocations or 0, + success_count=row.success_count or 0, + error_count=row.error_count or 0, + success_rate=row.success_rate or 0.0, + avg_latency_ms=row.avg_latency_ms or 0.0, + ) + for row in result + ] + + +@app.get("/api/sessions", response_model=list[SessionSummary]) +async def get_sessions( + hours: int = Query(default=24, ge=1, le=720), + limit: int = Query(default=50, ge=1, le=500), + status_filter: Optional[str] = Query(default=None), +) -> list[SessionSummary]: + """Get session summaries.""" + client = get_bq_client() + table = get_full_table_id() + + status_clause = "" + if status_filter: + status_clause = f"HAVING MAX(CASE WHEN status = 'ERROR' THEN 1 ELSE 0 END) = {'1' if status_filter == 'ERROR' else '0'}" + + query = f""" + SELECT + session_id, + ANY_VALUE(user_id) AS user_id, + ANY_VALUE(agent) AS agent, + MIN(timestamp) AS start_time, + MAX(timestamp) AS end_time, + TIMESTAMP_DIFF(MAX(timestamp), MIN(timestamp), SECOND) AS duration_seconds, + COUNT(*) AS event_count, + COUNTIF(status = 'ERROR') AS error_count, + CASE + WHEN MAX(CASE WHEN status = 'ERROR' THEN 1 ELSE 0 END) = 1 THEN 'ERROR' + ELSE 'OK' + END AS status + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND session_id IS NOT NULL + AND event_type IN ( + 'USER_MESSAGE_RECEIVED', 'INVOCATION_STARTING', 'INVOCATION_COMPLETED', + 'AGENT_STARTING', 'AGENT_COMPLETED', 'LLM_REQUEST', 'LLM_RESPONSE', + 'LLM_ERROR', 'TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR' + ) + GROUP BY session_id + {status_clause} + ORDER BY start_time DESC + LIMIT {limit} + """ + + result = client.query(query).result() + return [ + SessionSummary( + session_id=row.session_id, + user_id=row.user_id, + agent=row.agent, + start_time=row.start_time, + end_time=row.end_time, + duration_seconds=row.duration_seconds, + event_count=row.event_count, + error_count=row.error_count, + status=row.status, + ) + for row in result + ] + + +@app.get("/api/sessions/{session_id}/events", response_model=list[EventDetail]) +async def get_session_events(session_id: str) -> list[EventDetail]: + """Get all events for a specific session.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + SELECT + timestamp, + event_type, + agent, + session_id, + invocation_id, + span_id, + parent_span_id, + content, + latency_ms, + status, + error_message + FROM {table} + WHERE session_id = @session_id + ORDER BY timestamp ASC + """ + + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("session_id", "STRING", session_id), + ] + ) + + result = client.query(query, job_config=job_config).result() + events = [] + for row in result: + content = None + if row.content: + try: + content = ( + json.loads(row.content) + if isinstance(row.content, str) + else row.content + ) + except (json.JSONDecodeError, TypeError): + content = {"raw": str(row.content)} + + latency = None + if row.latency_ms: + try: + latency = ( + json.loads(row.latency_ms) + if isinstance(row.latency_ms, str) + else row.latency_ms + ) + except (json.JSONDecodeError, TypeError): + latency = None + + events.append( + EventDetail( + timestamp=row.timestamp, + event_type=row.event_type, + agent=row.agent, + session_id=row.session_id, + invocation_id=row.invocation_id, + span_id=row.span_id, + parent_span_id=row.parent_span_id, + content=content, + latency_ms=latency, + status=row.status or "OK", + error_message=row.error_message, + ) + ) + return events + + +@app.get("/api/errors", response_model=list[ErrorSummary]) +async def get_error_summary( + hours: int = Query(default=24, ge=1, le=720), + limit: int = Query(default=50, ge=1, le=200), +) -> list[ErrorSummary]: + """Get error summary.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + SELECT + event_type, + error_message, + COUNT(*) AS count, + MAX(timestamp) AS last_occurrence, + COUNT(DISTINCT session_id) AS affected_sessions + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND status = 'ERROR' + AND error_message IS NOT NULL + GROUP BY event_type, error_message + ORDER BY count DESC + LIMIT {limit} + """ + + result = client.query(query).result() + return [ + ErrorSummary( + event_type=row.event_type, + error_message=row.error_message, + count=row.count, + last_occurrence=row.last_occurrence, + affected_sessions=row.affected_sessions, + ) + for row in result + ] + + +@app.get("/api/llm/metrics", response_model=LLMMetrics) +async def get_llm_metrics( + hours: int = Query(default=24, ge=1, le=720), +) -> LLMMetrics: + """Get LLM performance metrics.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH llm_events AS ( + SELECT + event_type, + status, + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency, + SAFE_CAST(JSON_VALUE(latency_ms, '$.time_to_first_token_ms') AS FLOAT64) AS ttft, + COALESCE( + SAFE_CAST(JSON_VALUE(attributes, '$.usage_metadata.prompt_token_count') AS INT64), + SAFE_CAST(JSON_VALUE(content, '$.usage.prompt') AS INT64) + ) AS prompt_tokens, + COALESCE( + SAFE_CAST(JSON_VALUE(attributes, '$.usage_metadata.candidates_token_count') AS INT64), + SAFE_CAST(JSON_VALUE(content, '$.usage.completion') AS INT64) + ) AS completion_tokens, + COALESCE( + SAFE_CAST(JSON_VALUE(attributes, '$.usage_metadata.total_token_count') AS INT64), + SAFE_CAST(JSON_VALUE(content, '$.usage.total') AS INT64) + ) AS total_tokens_per_event + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND event_type IN ('LLM_REQUEST', 'LLM_RESPONSE', 'LLM_ERROR') + ) + SELECT + COUNTIF(event_type = 'LLM_REQUEST') AS total_requests, + COUNTIF(event_type = 'LLM_RESPONSE') AS total_responses, + COUNTIF(event_type = 'LLM_ERROR') AS error_count, + AVG(CASE WHEN event_type = 'LLM_RESPONSE' THEN latency END) AS avg_latency_ms, + AVG(CASE WHEN event_type = 'LLM_RESPONSE' THEN ttft END) AS avg_ttft_ms, + SUM(CASE WHEN event_type = 'LLM_RESPONSE' THEN prompt_tokens ELSE 0 END) AS total_prompt_tokens, + SUM(CASE WHEN event_type = 'LLM_RESPONSE' THEN completion_tokens ELSE 0 END) AS total_completion_tokens, + SUM(CASE WHEN event_type = 'LLM_RESPONSE' THEN total_tokens_per_event ELSE 0 END) AS total_tokens + FROM llm_events + """ + + result = client.query(query).result() + row = list(result)[0] + + total_requests = row.total_requests or 0 + total_tokens = row.total_tokens or 0 + + return LLMMetrics( + total_requests=total_requests, + total_responses=row.total_responses or 0, + error_count=row.error_count or 0, + avg_latency_ms=row.avg_latency_ms or 0.0, + avg_time_to_first_token_ms=row.avg_ttft_ms, + total_prompt_tokens=row.total_prompt_tokens or 0, + total_completion_tokens=row.total_completion_tokens or 0, + total_tokens=total_tokens, + avg_tokens_per_request=( + total_tokens / total_requests if total_requests > 0 else 0.0 + ), + ) + + +@app.get("/api/llm/latency-timeline", response_model=list[TimeSeriesPoint]) +async def get_llm_latency_timeline( + hours: int = Query(default=24, ge=1, le=720), +) -> list[TimeSeriesPoint]: + """Get LLM latency timeline.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + SELECT + TIMESTAMP_TRUNC(timestamp, HOUR) AS bucket, + AVG(SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND event_type = 'LLM_RESPONSE' + GROUP BY bucket + ORDER BY bucket + """ + + result = client.query(query).result() + return [ + TimeSeriesPoint( + timestamp=row.bucket, + value=row.avg_latency or 0.0, + label="Avg Latency (ms)", + ) + for row in result + ] + + +@app.get("/api/agents", response_model=list[dict]) +async def get_agent_summary( + hours: int = Query(default=24, ge=1, le=720), +) -> list[dict]: + """Get summary metrics per agent.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + SELECT + agent, + COUNT(*) AS total_events, + COUNT(DISTINCT session_id) AS total_sessions, + COUNTIF(status = 'ERROR') AS error_count, + SAFE_DIVIDE(COUNTIF(status = 'ERROR'), COUNT(*)) * 100 AS error_rate, + AVG(SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + GROUP BY agent + ORDER BY total_events DESC + """ + + result = client.query(query).result() + return [ + { + "agent": row.agent, + "total_events": row.total_events, + "total_sessions": row.total_sessions, + "error_count": row.error_count, + "error_rate": row.error_rate or 0.0, + "avg_latency_ms": row.avg_latency_ms or 0.0, + } + for row in result + ] + + +@app.get("/api/latency/percentiles", response_model=LatencyPercentiles) +async def get_latency_percentiles( + hours: int = Query(default=24, ge=1, le=720), + event_type: Optional[str] = Query(default=None), +) -> LatencyPercentiles: + """Get latency percentile metrics.""" + client = get_bq_client() + table = get_full_table_id() + + event_filter = "" + if event_type: + event_filter = f"AND event_type = '{event_type}'" + + query = f""" + WITH latencies AS ( + SELECT + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND JSON_VALUE(latency_ms, '$.total_ms') IS NOT NULL + {event_filter} + ) + SELECT + MIN(latency) AS min_ms, + MAX(latency) AS max_ms, + AVG(latency) AS avg_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(50)] AS median_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(90)] AS p90_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(95)] AS p95_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(99)] AS p99_ms, + STDDEV(latency) AS std_dev_ms, + COUNT(*) AS sample_count + FROM latencies + WHERE latency IS NOT NULL + """ + + result = client.query(query).result() + row = list(result)[0] + + return LatencyPercentiles( + min_ms=row.min_ms or 0.0, + max_ms=row.max_ms or 0.0, + avg_ms=row.avg_ms or 0.0, + median_ms=row.median_ms or 0.0, + p90_ms=row.p90_ms or 0.0, + p95_ms=row.p95_ms or 0.0, + p99_ms=row.p99_ms or 0.0, + std_dev_ms=row.std_dev_ms or 0.0, + sample_count=row.sample_count or 0, + ) + + +@app.get("/api/latency/by-event-type", response_model=list[LatencyByCategory]) +async def get_latency_by_event_type( + hours: int = Query(default=24, ge=1, le=720), +) -> list[LatencyByCategory]: + """Get latency breakdown by event type.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH latencies AS ( + SELECT + event_type, + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND JSON_VALUE(latency_ms, '$.total_ms') IS NOT NULL + ) + SELECT + event_type AS category, + COUNT(*) AS count, + AVG(latency) AS avg_ms, + MIN(latency) AS min_ms, + MAX(latency) AS max_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(50)] AS p50_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(90)] AS p90_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(95)] AS p95_ms + FROM latencies + WHERE latency IS NOT NULL AND event_type IS NOT NULL + GROUP BY event_type + ORDER BY avg_ms DESC + """ + + result = client.query(query).result() + return [ + LatencyByCategory( + category=row.category, + count=row.count, + avg_ms=row.avg_ms or 0.0, + min_ms=row.min_ms or 0.0, + max_ms=row.max_ms or 0.0, + p50_ms=row.p50_ms or 0.0, + p90_ms=row.p90_ms or 0.0, + p95_ms=row.p95_ms or 0.0, + ) + for row in result + ] + + +@app.get("/api/latency/by-agent", response_model=list[LatencyByCategory]) +async def get_latency_by_agent( + hours: int = Query(default=24, ge=1, le=720), +) -> list[LatencyByCategory]: + """Get latency breakdown by agent.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH latencies AS ( + SELECT + agent, + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND JSON_VALUE(latency_ms, '$.total_ms') IS NOT NULL + ) + SELECT + agent AS category, + COUNT(*) AS count, + AVG(latency) AS avg_ms, + MIN(latency) AS min_ms, + MAX(latency) AS max_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(50)] AS p50_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(90)] AS p90_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(95)] AS p95_ms + FROM latencies + WHERE latency IS NOT NULL AND agent IS NOT NULL + GROUP BY agent + ORDER BY avg_ms DESC + """ + + result = client.query(query).result() + return [ + LatencyByCategory( + category=row.category, + count=row.count, + avg_ms=row.avg_ms or 0.0, + min_ms=row.min_ms or 0.0, + max_ms=row.max_ms or 0.0, + p50_ms=row.p50_ms or 0.0, + p90_ms=row.p90_ms or 0.0, + p95_ms=row.p95_ms or 0.0, + ) + for row in result + ] + + +@app.get("/api/latency/by-tool", response_model=list[LatencyByCategory]) +async def get_latency_by_tool( + hours: int = Query(default=24, ge=1, le=720), +) -> list[LatencyByCategory]: + """Get latency breakdown by tool.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH latencies AS ( + SELECT + JSON_VALUE(content, '$.tool') AS tool_name, + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND event_type IN ('TOOL_COMPLETED', 'TOOL_ERROR') + AND JSON_VALUE(latency_ms, '$.total_ms') IS NOT NULL + ) + SELECT + tool_name AS category, + COUNT(*) AS count, + AVG(latency) AS avg_ms, + MIN(latency) AS min_ms, + MAX(latency) AS max_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(50)] AS p50_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(90)] AS p90_ms, + APPROX_QUANTILES(latency, 100)[OFFSET(95)] AS p95_ms + FROM latencies + WHERE latency IS NOT NULL AND tool_name IS NOT NULL + GROUP BY tool_name + ORDER BY avg_ms DESC + """ + + result = client.query(query).result() + return [ + LatencyByCategory( + category=row.category, + count=row.count, + avg_ms=row.avg_ms or 0.0, + min_ms=row.min_ms or 0.0, + max_ms=row.max_ms or 0.0, + p50_ms=row.p50_ms or 0.0, + p90_ms=row.p90_ms or 0.0, + p95_ms=row.p95_ms or 0.0, + ) + for row in result + ] + + +@app.get("/api/latency/distribution", response_model=list[LatencyDistributionBucket]) +async def get_latency_distribution( + hours: int = Query(default=24, ge=1, le=720), + bucket_size_ms: int = Query(default=1000, ge=100, le=10000), +) -> list[LatencyDistributionBucket]: + """Get latency distribution histogram.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + WITH latencies AS ( + SELECT + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND JSON_VALUE(latency_ms, '$.total_ms') IS NOT NULL + ), + bucketed AS ( + SELECT + CAST(FLOOR(latency / {bucket_size_ms}) * {bucket_size_ms} AS INT64) AS bucket_start + FROM latencies + WHERE latency IS NOT NULL + ), + counts AS ( + SELECT bucket_start, COUNT(*) AS count + FROM bucketed + GROUP BY bucket_start + ), + total AS ( + SELECT SUM(count) AS total FROM counts + ) + SELECT + c.bucket_start AS bucket_start_ms, + c.bucket_start + {bucket_size_ms} AS bucket_end_ms, + c.count, + SAFE_DIVIDE(c.count, t.total) * 100 AS percentage + FROM counts c, total t + ORDER BY bucket_start_ms + LIMIT 50 + """ + + result = client.query(query).result() + return [ + LatencyDistributionBucket( + bucket_start_ms=row.bucket_start_ms, + bucket_end_ms=row.bucket_end_ms, + count=row.count, + percentage=row.percentage or 0.0, + ) + for row in result + ] + + +@app.get("/api/latency/slowest", response_model=list[SlowestRequest]) +async def get_slowest_requests( + hours: int = Query(default=24, ge=1, le=720), + limit: int = Query(default=20, ge=1, le=100), +) -> list[SlowestRequest]: + """Get slowest requests.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + SELECT + timestamp, + event_type, + agent, + session_id, + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency_ms, + LEFT(COALESCE( + JSON_VALUE(content, '$.summary'), + JSON_VALUE(content, '$.tool'), + TO_JSON_STRING(content) + ), 100) AS content_preview + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND JSON_VALUE(latency_ms, '$.total_ms') IS NOT NULL + ORDER BY latency_ms DESC + LIMIT {limit} + """ + + result = client.query(query).result() + return [ + SlowestRequest( + timestamp=row.timestamp, + event_type=row.event_type, + agent=row.agent, + session_id=row.session_id, + latency_ms=row.latency_ms or 0.0, + content_preview=row.content_preview, + ) + for row in result + ] + + +@app.get("/api/latency/timeline", response_model=list[TimeSeriesPoint]) +async def get_latency_timeline( + hours: int = Query(default=24, ge=1, le=720), + metric: str = Query(default="avg", regex="^(avg|p50|p90|p95|p99)$"), +) -> list[TimeSeriesPoint]: + """Get latency timeline with different percentile options.""" + client = get_bq_client() + table = get_full_table_id() + + metric_expr = { + "avg": "AVG(latency)", + "p50": "APPROX_QUANTILES(latency, 100)[OFFSET(50)]", + "p90": "APPROX_QUANTILES(latency, 100)[OFFSET(90)]", + "p95": "APPROX_QUANTILES(latency, 100)[OFFSET(95)]", + "p99": "APPROX_QUANTILES(latency, 100)[OFFSET(99)]", + } + + query = f""" + WITH latencies AS ( + SELECT + TIMESTAMP_TRUNC(timestamp, HOUR) AS bucket, + SAFE_CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64) AS latency + FROM {table} + WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR) + AND JSON_VALUE(latency_ms, '$.total_ms') IS NOT NULL + ) + SELECT + bucket, + {metric_expr[metric]} AS value + FROM latencies + WHERE latency IS NOT NULL + GROUP BY bucket + ORDER BY bucket + """ + + result = client.query(query).result() + return [ + TimeSeriesPoint( + timestamp=row.bucket, + value=row.value or 0.0, + label=f"{metric.upper()} Latency (ms)", + ) + for row in result + ] + + +@app.get("/api/trace/{trace_id}", response_model=list[EventDetail]) +async def get_trace_events(trace_id: str) -> list[EventDetail]: + """Get all events for a specific trace.""" + client = get_bq_client() + table = get_full_table_id() + + query = f""" + SELECT + timestamp, + event_type, + agent, + session_id, + invocation_id, + span_id, + parent_span_id, + content, + latency_ms, + status, + error_message + FROM {table} + WHERE trace_id = @trace_id + ORDER BY timestamp ASC + """ + + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("trace_id", "STRING", trace_id), + ] + ) + + result = client.query(query, job_config=job_config).result() + events = [] + for row in result: + content = None + if row.content: + try: + content = ( + json.loads(row.content) + if isinstance(row.content, str) + else row.content + ) + except (json.JSONDecodeError, TypeError): + content = {"raw": str(row.content)} + + latency = None + if row.latency_ms: + try: + latency = ( + json.loads(row.latency_ms) + if isinstance(row.latency_ms, str) + else row.latency_ms + ) + except (json.JSONDecodeError, TypeError): + latency = None + + events.append( + EventDetail( + timestamp=row.timestamp, + event_type=row.event_type, + agent=row.agent, + session_id=row.session_id, + invocation_id=row.invocation_id, + span_id=row.span_id, + parent_span_id=row.parent_span_id, + content=content, + latency_ms=latency, + status=row.status or "OK", + error_message=row.error_message, + ) + ) + return events + + +# ============================================================================= +# Frontend HTML Dashboard +# ============================================================================= + + +@app.get("/", response_class=HTMLResponse) +async def dashboard(): + """Serve the main dashboard HTML.""" + return get_dashboard_html() + + +def get_dashboard_html() -> str: + """Return the dashboard HTML with embedded JavaScript.""" + return """ + + + + + Agent Analytics Dashboard + + + + + + + +
+ +
+
+

Agent Analytics Dashboard

+
+ + +
+
+
+ + + + + +
+ +
+ +
+
+
Total Sessions
+
-
+
+
+
Total Events
+
-
+
+
+
Error Rate
+
-
+
+
+
Avg Latency
+
-
+
+
+
Total Tokens
+
-
+
+
+
Unique Users
+
-
+
+
+
Unique Agents
+
-
+
+
+
Invocations
+
-
+
+
+ + +
+
+

Event Types Distribution

+ +
+
+

Events Timeline

+ +
+
+
+ + + + + + + + + + + + + + + +
+
+ + + +""" + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/docker-compose.yml b/contributing/samples/bigquery_agent_analytics_demo/dashboard/docker-compose.yml new file mode 100644 index 0000000000..3f1b152d03 --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/docker-compose.yml @@ -0,0 +1,68 @@ +# Docker Compose for Agent Analytics Dashboard +# +# Usage: +# # Set environment variables first +# export BQ_AGENT_ANALYTICS_PROJECT=your-project +# export BQ_AGENT_ANALYTICS_DATASET=your-dataset +# +# # Start the dashboard +# docker-compose up -d +# +# # View logs +# docker-compose logs -f +# +# # Stop +# docker-compose down + +version: '3.8' + +services: + dashboard: + build: + context: . + dockerfile: Dockerfile + container_name: agent-analytics-dashboard + ports: + - "8080:8080" + environment: + - BQ_AGENT_ANALYTICS_PROJECT=${BQ_AGENT_ANALYTICS_PROJECT} + - BQ_AGENT_ANALYTICS_DATASET=${BQ_AGENT_ANALYTICS_DATASET} + - BQ_AGENT_ANALYTICS_TABLE=${BQ_AGENT_ANALYTICS_TABLE:-agent_events_v2} + volumes: + # Mount Google Cloud credentials for authentication + # Option 1: Mount the entire gcloud config directory + - ${HOME}/.config/gcloud:/home/appuser/.config/gcloud:ro + # Option 2: Mount a specific service account key (uncomment if using) + # - ${GOOGLE_APPLICATION_CREDENTIALS}:/app/credentials.json:ro + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/api/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + + # Optional: Run simulation to populate sample data + # Uncomment to run simulation on startup + # simulate: + # build: + # context: . + # dockerfile: Dockerfile + # container_name: agent-analytics-simulator + # command: ["python", "simulate_agent_data.py", "--num-sessions", "10"] + # environment: + # - BQ_AGENT_ANALYTICS_PROJECT=${BQ_AGENT_ANALYTICS_PROJECT} + # - BQ_AGENT_ANALYTICS_DATASET=${BQ_AGENT_ANALYTICS_DATASET} + # - BQ_AGENT_ANALYTICS_TABLE=${BQ_AGENT_ANALYTICS_TABLE:-agent_events_v2} + # - VERTEXAI_PROJECT=${BQ_AGENT_ANALYTICS_PROJECT} + # - VERTEXAI_LOCATION=us-central1 + # - GOOGLE_GENAI_USE_VERTEXAI=true + # volumes: + # - ${HOME}/.config/gcloud:/home/appuser/.config/gcloud:ro + # depends_on: + # dashboard: + # condition: service_healthy + +networks: + default: + name: agent-analytics-network diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/requirements.txt b/contributing/samples/bigquery_agent_analytics_demo/dashboard/requirements.txt new file mode 100644 index 0000000000..ba108efc2f --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/requirements.txt @@ -0,0 +1,12 @@ +# Dashboard dependencies +fastapi>=0.104.0 +uvicorn[standard]>=0.24.0 +google-cloud-bigquery>=3.13.0 +pydantic>=2.0.0 + +# For simulation script (if using ADK agent features) +# Note: Install google-adk separately if needed +# pip install google-adk + +# Optional: for development +python-multipart>=0.0.6 diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/run_dashboard.sh b/contributing/samples/bigquery_agent_analytics_demo/dashboard/run_dashboard.sh new file mode 100755 index 0000000000..1a9b279629 --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/run_dashboard.sh @@ -0,0 +1,108 @@ +#!/bin/bash +# Run the Agent Analytics Dashboard +# +# Usage: +# ./run_dashboard.sh [--simulate] [--port PORT] +# +# Options: +# --simulate Run simulation to populate sample data first +# --port PORT Port to run on (default: 8080) +# --help Show this help message + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PORT=8080 +SIMULATE=false + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + --simulate) + SIMULATE=true + shift + ;; + --port) + PORT="$2" + shift 2 + ;; + --help) + echo "Usage: $0 [--simulate] [--port PORT]" + echo "" + echo "Options:" + echo " --simulate Run simulation to populate sample data first" + echo " --port PORT Port to run on (default: 8080)" + echo " --help Show this help message" + echo "" + echo "Environment Variables (required):" + echo " BQ_AGENT_ANALYTICS_PROJECT Google Cloud project ID" + echo " BQ_AGENT_ANALYTICS_DATASET BigQuery dataset ID" + echo "" + echo "Environment Variables (optional):" + echo " BQ_AGENT_ANALYTICS_TABLE BigQuery table name (default: agent_events_v2)" + echo " VERTEXAI_PROJECT Vertex AI project ID" + echo " VERTEXAI_LOCATION Vertex AI location (default: us-central1)" + exit 0 + ;; + *) + echo "Unknown option: $1" + exit 1 + ;; + esac +done + +# Check required environment variables +if [ -z "$BQ_AGENT_ANALYTICS_PROJECT" ]; then + echo "Error: BQ_AGENT_ANALYTICS_PROJECT environment variable is not set" + echo "Please set it with: export BQ_AGENT_ANALYTICS_PROJECT=your-project-id" + exit 1 +fi + +if [ -z "$BQ_AGENT_ANALYTICS_DATASET" ]; then + echo "Error: BQ_AGENT_ANALYTICS_DATASET environment variable is not set" + echo "Please set it with: export BQ_AGENT_ANALYTICS_DATASET=your-dataset-id" + exit 1 +fi + +# Set defaults for optional variables +export BQ_AGENT_ANALYTICS_TABLE="${BQ_AGENT_ANALYTICS_TABLE:-agent_events_v2}" +export VERTEXAI_PROJECT="${VERTEXAI_PROJECT:-$BQ_AGENT_ANALYTICS_PROJECT}" +export VERTEXAI_LOCATION="${VERTEXAI_LOCATION:-us-central1}" +export GOOGLE_GENAI_USE_VERTEXAI="${GOOGLE_GENAI_USE_VERTEXAI:-true}" +export GOOGLE_CLOUD_PROJECT="${GOOGLE_CLOUD_PROJECT:-$BQ_AGENT_ANALYTICS_PROJECT}" +export GOOGLE_CLOUD_LOCATION="${GOOGLE_CLOUD_LOCATION:-us-central1}" + +echo "===================================" +echo "Agent Analytics Dashboard" +echo "===================================" +echo "Project: $BQ_AGENT_ANALYTICS_PROJECT" +echo "Dataset: $BQ_AGENT_ANALYTICS_DATASET" +echo "Table: $BQ_AGENT_ANALYTICS_TABLE" +echo "Port: $PORT" +echo "===================================" +echo "" + +# Change to script directory +cd "$SCRIPT_DIR" + +# Install dependencies if needed +if ! python -c "import fastapi" 2>/dev/null; then + echo "Installing dependencies..." + pip install -r requirements.txt +fi + +# Run simulation if requested +if [ "$SIMULATE" = true ]; then + echo "Running simulation to generate sample data..." + python simulate_agent_data.py --num-sessions 10 + echo "" + echo "Simulation complete. Starting dashboard..." + echo "" +fi + +# Start the dashboard +echo "Starting dashboard at http://localhost:$PORT" +echo "Press Ctrl+C to stop" +echo "" + +uvicorn app:app --host 0.0.0.0 --port "$PORT" --reload diff --git a/contributing/samples/bigquery_agent_analytics_demo/dashboard/simulate_agent_data.py b/contributing/samples/bigquery_agent_analytics_demo/dashboard/simulate_agent_data.py new file mode 100644 index 0000000000..14ee6ba5cf --- /dev/null +++ b/contributing/samples/bigquery_agent_analytics_demo/dashboard/simulate_agent_data.py @@ -0,0 +1,429 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Simulate agent interactions to populate BigQuery analytics data. + +This script runs a series of simulated conversations with the demo agent +to generate realistic analytics data for the dashboard. + +Usage: + python simulate_agent_data.py --num-sessions 10 + +Environment Variables: + BQ_AGENT_ANALYTICS_PROJECT: Google Cloud project ID + BQ_AGENT_ANALYTICS_DATASET: BigQuery dataset ID + BQ_AGENT_ANALYTICS_TABLE: BigQuery table name (default: agent_events_v2) + VERTEXAI_PROJECT: Vertex AI project ID + VERTEXAI_LOCATION: Vertex AI location +""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import random +import sys +import uuid +from typing import Any + +# Ensure the parent directory is in the path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from google.adk import Agent +from google.adk.apps import App +from google.adk.models.google_llm import Gemini +from google.adk.plugins.bigquery_agent_analytics_plugin import ( + BigQueryAgentAnalyticsPlugin, + BigQueryLoggerConfig, +) +from google.adk.runners import Runner +from google.adk.sessions import InMemorySessionService +from google.adk.tools.function_tool import FunctionTool +from google.genai import types + + +# Configuration from environment +PROJECT_ID = os.getenv("BQ_AGENT_ANALYTICS_PROJECT") +DATASET_ID = os.getenv("BQ_AGENT_ANALYTICS_DATASET") +TABLE_ID = os.getenv("BQ_AGENT_ANALYTICS_TABLE", "agent_events_v2") + +# Default Vertex AI settings +os.environ.setdefault("VERTEXAI_PROJECT", PROJECT_ID or "") +os.environ.setdefault("VERTEXAI_LOCATION", "us-central1") +os.environ.setdefault("GOOGLE_GENAI_USE_VERTEXAI", "true") +os.environ.setdefault("GOOGLE_CLOUD_PROJECT", PROJECT_ID or "") +os.environ.setdefault("GOOGLE_CLOUD_LOCATION", "us-central1") + + +# Sample conversation prompts for simulation +SAMPLE_PROMPTS = [ + # Trip planning conversations + "What are the top 3 cities to visit in Asia?", + "Give me highlights for Tokyo", + "What's the estimated budget for 5 days in Paris with $200 per day?", + "I want to plan a trip to New York, what should I know?", + "Compare Tokyo and Singapore for a tech enthusiast", + # Budget inquiries + "How much would a week in Sydney cost at $150/day?", + "What's a reasonable budget for Toronto?", + "Estimate trip cost for 3 days in Paris", + # City highlights + "Tell me about Singapore's food scene", + "What makes Paris special?", + "What are the must-see places in New York?", + # General travel questions + "Which city has the best food?", + "Where should I go for a tech conference?", + "Recommend a city for first-time international travelers", + # Multi-turn simulation prompts + "Pick 3 cities for me", + "Now tell me about the first one", + "What about the budget for that city?", +] + +# Sample user IDs for realistic data +SAMPLE_USER_IDS = [ + "user_alice_001", + "user_bob_002", + "user_charlie_003", + "user_diana_004", + "user_eve_005", + "user_frank_006", + "user_grace_007", + "user_henry_008", + "user_ivy_009", + "user_jack_010", +] + +# Sample agents for multi-agent simulation +AGENT_CONFIGS = [ + { + "name": "travel_advisor", + "instruction": ( + "You are a helpful travel advisor. Recommend cities and provide " + "travel tips. Use tools to get city lists, highlights, and budgets." + ), + }, + { + "name": "budget_planner", + "instruction": ( + "You are a budget travel planner. Focus on cost-effective travel. " + "Always calculate budgets when users ask about trips." + ), + }, + { + "name": "city_guide", + "instruction": ( + "You are an expert city guide. Provide detailed highlights and " + "recommendations for cities around the world." + ), + }, +] + + +# Tool functions +def pick_city(top_n: int = 3) -> list[str]: + """Return a short list of recommended cities.""" + cities = ["Tokyo", "Paris", "New York", "Sydney", "Singapore", "Toronto"] + return random.sample(cities, min(top_n, len(cities))) + + +def city_highlights(city: str) -> dict[str, str]: + """Return quick highlights for a city.""" + highlights = { + "Tokyo": "Sushi, Akihabara tech district, efficient transit system.", + "Paris": "World-class museums, pastries, walkable boulevards.", + "New York": "Broadway shows, diverse food scene, iconic skyline views.", + "Sydney": "Beautiful harbour, beaches, outdoor cafes.", + "Singapore": "Amazing hawker food, stunning gardens, clean and safe.", + "Toronto": "CN Tower, diverse neighbourhoods, waterfront activities.", + } + return { + "city": city, + "highlights": highlights.get(city, "A wonderful place to explore!"), + } + + +def estimate_trip_budget( + city: str, days: int, budget_per_day: float +) -> dict[str, Any]: + """Calculate rough trip budget.""" + total = days * budget_per_day + return { + "city": city, + "days": days, + "budget_per_day": f"${budget_per_day:,.0f}", + "estimated_total": f"${total:,.0f}", + "note": "Includes lodging, food, and local transit. Adjust for flights.", + } + + +def failing_tool(query: str) -> str: + """A tool that sometimes fails to simulate error scenarios.""" + if random.random() < 0.3: # 30% failure rate + raise ValueError(f"Failed to process query: {query}") + return f"Processed: {query}" + + +def create_agent(config: dict, include_failing_tool: bool = False) -> Agent: + """Create an agent with the given configuration.""" + gemini = Gemini(model="gemini-2.5-flash") + + tools = [ + FunctionTool(pick_city), + FunctionTool(city_highlights), + FunctionTool(estimate_trip_budget), + ] + + if include_failing_tool: + tools.append(FunctionTool(failing_tool)) + + return Agent( + name=config["name"], + model=gemini, + instruction=config["instruction"], + description=f"Demo agent: {config['name']}", + tools=tools, + ) + + +def create_analytics_plugin() -> BigQueryAgentAnalyticsPlugin: + """Create the BigQuery analytics plugin.""" + if not PROJECT_ID or not DATASET_ID: + raise ValueError( + "Set BQ_AGENT_ANALYTICS_PROJECT and BQ_AGENT_ANALYTICS_DATASET " + "environment variables" + ) + + return BigQueryAgentAnalyticsPlugin( + project_id=PROJECT_ID, + dataset_id=DATASET_ID, + table_id=TABLE_ID, + config=BigQueryLoggerConfig( + event_allowlist=[ + "USER_MESSAGE_RECEIVED", + "INVOCATION_STARTING", + "INVOCATION_COMPLETED", + "AGENT_STARTING", + "AGENT_COMPLETED", + "LLM_REQUEST", + "LLM_RESPONSE", + "LLM_ERROR", + "TOOL_STARTING", + "TOOL_COMPLETED", + "TOOL_ERROR", + ], + max_content_length=10000, + batch_size=1, + ), + ) + + +async def run_single_session( + agent_config: dict, + prompts: list[str], + user_id: str, + plugin: BigQueryAgentAnalyticsPlugin, + include_failing_tool: bool = False, +) -> None: + """Run a single conversation session.""" + agent = create_agent(agent_config, include_failing_tool) + app = App( + name=agent_config["name"], + root_agent=agent, + plugins=[plugin], + ) + + session_service = InMemorySessionService() + runner = Runner( + app=app, + session_service=session_service, + ) + + session = await session_service.create_session( + app_name=app.name, + user_id=user_id, + ) + + print(f" Session {session.id[:8]}... started for user {user_id}") + + for prompt in prompts: + print(f" User: {prompt[:50]}...") + try: + content = types.Content( + role="user", parts=[types.Part.from_text(text=prompt)] + ) + + async for event in runner.run_async( + session_id=session.id, + user_id=user_id, + new_message=content, + ): + # Process events silently + pass + + # Small delay between messages to simulate realistic timing + await asyncio.sleep(random.uniform(0.5, 2.0)) + + except Exception as e: + print(f" Error during conversation: {e}") + + # Ensure logs are flushed + await plugin.flush() + print(f" Session {session.id[:8]}... completed") + + +async def simulate_conversations( + num_sessions: int = 10, + prompts_per_session: tuple[int, int] = (2, 5), + include_errors: bool = True, +) -> None: + """Simulate multiple conversation sessions.""" + print(f"Starting simulation of {num_sessions} sessions...") + print(f"Project: {PROJECT_ID}") + print(f"Dataset: {DATASET_ID}") + print(f"Table: {TABLE_ID}") + print() + + plugin = create_analytics_plugin() + + for i in range(num_sessions): + # Randomly select agent configuration + agent_config = random.choice(AGENT_CONFIGS) + + # Randomly select user + user_id = random.choice(SAMPLE_USER_IDS) + + # Randomly select prompts + num_prompts = random.randint(*prompts_per_session) + prompts = random.sample(SAMPLE_PROMPTS, min(num_prompts, len(SAMPLE_PROMPTS))) + + # Sometimes include failing tool to simulate errors + include_failing_tool = include_errors and random.random() < 0.2 + + print(f"Session {i + 1}/{num_sessions}:") + print(f" Agent: {agent_config['name']}") + print(f" User: {user_id}") + print(f" Prompts: {num_prompts}") + + try: + await run_single_session( + agent_config=agent_config, + prompts=prompts, + user_id=user_id, + plugin=plugin, + include_failing_tool=include_failing_tool, + ) + except Exception as e: + print(f" Session failed: {e}") + + print() + + # Small delay between sessions + await asyncio.sleep(random.uniform(1.0, 3.0)) + + # Final flush and shutdown + await plugin.shutdown() + print("Simulation complete!") + + +async def inject_historical_data( + days_back: int = 7, + sessions_per_day: int = 20, +) -> None: + """Inject historical data by running simulations with backdated timestamps. + + Note: This creates real conversations but the timestamps will be current. + For true historical data, you would need to manually insert rows into BigQuery. + """ + total_sessions = days_back * sessions_per_day + print(f"Injecting data: ~{total_sessions} sessions over {days_back} days") + print("Note: Timestamps will reflect actual execution time.") + print() + + await simulate_conversations( + num_sessions=total_sessions, + prompts_per_session=(1, 4), + include_errors=True, + ) + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Simulate agent interactions for analytics" + ) + parser.add_argument( + "--num-sessions", + type=int, + default=10, + help="Number of sessions to simulate (default: 10)", + ) + parser.add_argument( + "--min-prompts", + type=int, + default=2, + help="Minimum prompts per session (default: 2)", + ) + parser.add_argument( + "--max-prompts", + type=int, + default=5, + help="Maximum prompts per session (default: 5)", + ) + parser.add_argument( + "--no-errors", + action="store_true", + help="Disable error simulation", + ) + parser.add_argument( + "--historical", + action="store_true", + help="Run extended simulation for historical data", + ) + parser.add_argument( + "--days", + type=int, + default=7, + help="Days of historical data to generate (default: 7)", + ) + parser.add_argument( + "--sessions-per-day", + type=int, + default=20, + help="Sessions per day for historical mode (default: 20)", + ) + + args = parser.parse_args() + + if args.historical: + asyncio.run( + inject_historical_data( + days_back=args.days, + sessions_per_day=args.sessions_per_day, + ) + ) + else: + asyncio.run( + simulate_conversations( + num_sessions=args.num_sessions, + prompts_per_session=(args.min_prompts, args.max_prompts), + include_errors=not args.no_errors, + ) + ) + + +if __name__ == "__main__": + main()