Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
354 changes: 354 additions & 0 deletions agents/gemini_data_analytics/a2a_http_sample.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github"
},
"source": [
"<a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/agents/gemini_data_analytics/a2a_http_sample.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "copyright"
},
"outputs": [],
"source": [
"# Copyright 2026 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "header"
},
"source": [
"# Gemini Data Analytics: A2A HTTP API Sample\n",
"\n",
"This notebook demonstrates how to interact with the **DataA2AService** using standard HTTP requests. This is useful for environments where a high-level SDK is not available or when you want to minimize dependencies."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "background-and-overview"
},
"source": [
"# Background and Overview\n",
"The **Conversational Analytics API** (also known as Gemini Data Analytics) lets you chat with your BigQuery or Looker data anywhere. This notebook demonstrates how to use the **A2A (Agent-to-Agent)** interface via standard HTTP requests. This is useful for environments where a high-level SDK is not available or when you want to minimize dependencies.\n",
"\n",
"This is a **Pre-GA** product. See [documentation](https://cloud.google.com/gemini/docs/conversational-analytics-api/overview) for more details.\n",
"\n",
"Please provide feedback to conversational-analytics-api-feedback@google.com\n",
"<br>\n",
"### This notebook will help you\n",
"1. Authenticate to Google Cloud\n",
"2. Retrieve the Agent Card\n",
"3. Send asynchronous messages and poll for results\n",
"4. Process agent outputs (Artifacts)\n",
"5. Cancel active tasks\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "setup"
},
"outputs": [],
"source": [
"# @title Setup and Authentication\n",
"import json\n",
"import time\n",
"import uuid\n",
"from google.auth import default\n",
"from google.auth.transport.requests import Request\n",
"from google.colab import auth\n",
"import requests\n",
"\n",
"# Authenticate the user\n",
"auth.authenticate_user()\n",
"\n",
"# Get credentials and project ID\n",
"creds, project_id = default()\n",
"creds.refresh(Request())\n",
"access_token = creds.token\n",
"\n",
"ENDPOINT = \"https://geminidataanalytics.googleapis.com\"\n",
"LOCATION = \"global\" # @param {type:\"string\"}\n",
"AGENT_ID = \"bigquery-ca\" # @param {type:\"string\"}\n",
"\n",
"TENANT = f\"projects/{project_id}/locations/{LOCATION}/dataAgents/{AGENT_ID}\"\n",
"HEADERS = {\n",
" \"Authorization\": f\"Bearer {access_token}\",\n",
" \"Content-Type\": \"application/json\",\n",
"}\n",
"print(f\"Target Tenant: {TENANT}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "get-agent-card-header"
},
"source": [
"## 1. Get Agent Card\n",
"\n",
"First, let's retrieve the Agent Card to verify connectivity and see what the agent can do."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "get-agent-card-code"
},
"outputs": [],
"source": [
"url = f\"{ENDPOINT}/v1/card\"\n",
"params = {\"tenant\": TENANT}\n",
"\n",
"try:\n",
" response = requests.get(url, headers=HEADERS, params=params, timeout=30)\n",
" response.raise_for_status()\n",
" print(\"Agent Card:\")\n",
" print(json.dumps(response.json(), indent=2))\n",
"except Exception as e:\n",
" print(f\"Error fetching agent card: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "send-message-async-header"
},
"source": [
"## 2. Send Message (Asynchronous + Polling)\n",
"\n",
"For long-running tasks, use `blocking=False`. This returns a `Task` object immediately, which you can poll for status."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "send-message-async-code"
},
"outputs": [],
"source": [
"def send_async_message(query):\n",
" url = f\"{ENDPOINT}/v1/message:send\"\n",
" payload = {\n",
" \"tenant\": TENANT,\n",
" \"message\": {\n",
" \"message_id\": f\"msg-{uuid.uuid4()}\",\n",
" \"context_id\": \"sample-context\",\n",
" \"role\": \"ROLE_USER\",\n",
" \"content\": [{\"text\": query}],\n",
" },\n",
" \"configuration\": {\"blocking\": False},\n",
" }\n",
"\n",
" response = requests.post(url, headers=HEADERS, json=payload, timeout=30)\n",
" response.raise_for_status()\n",
"\n",
" task = response.json().get(\"task\")\n",
" print(f\"Task created: {task.get('id')}\")\n",
" return task.get(\"id\")\n",
"\n",
"\n",
"def poll_task(task_id, max_retries=15):\n",
" url = f\"{ENDPOINT}/v1/tasks/{task_id}\"\n",
" retry_count = 0\n",
" wait_time = 2\n",
"\n",
" while retry_count < max_retries:\n",
" response = requests.get(url, headers=HEADERS, timeout=30)\n",
" response.raise_for_status()\n",
"\n",
" task = response.json()\n",
" state = task.get(\"status\", {}).get(\"state\")\n",
" print(f\"Current State: {state}\")\n",
"\n",
" if state in [\n",
" \"TASK_STATE_COMPLETED\",\n",
" \"TASK_STATE_FAILED\",\n",
" \"TASK_STATE_CANCELLED\",\n",
" ]:\n",
" return task\n",
"\n",
" time.sleep(wait_time)\n",
" # Simple exponential backoff capped at 10s\n",
" wait_time = min(wait_time * 1.5, 10)\n",
" retry_count += 1\n",
"\n",
" print(\"Polling timed out.\")\n",
" return None\n",
"\n",
"\n",
"try:\n",
" task_id = send_async_message(\"Generate a summary of sales trends\")\n",
" if task_id:\n",
" final_task = poll_task(task_id)\n",
" if final_task:\n",
" print(\"Final Task Result:\")\n",
" print(json.dumps(final_task, indent=2))\n",
"except Exception as e:\n",
" print(f\"Error during messaging/polling: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "artifact-parsing-header"
},
"source": [
"## 3. Processing Agent Outputs (Artifacts)\n",
"\n",
"Agents often produce **Artifacts** (structured data, files, or references). Here is how to parse them from a completed task."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "artifact-parsing-code"
},
"outputs": [],
"source": [
"def process_artifacts(task):\n",
" artifacts = task.get(\"artifacts\", [])\n",
" if not artifacts:\n",
" print(\"No artifacts found.\")\n",
" return\n",
"\n",
" print(f\"Found {len(artifacts)} artifacts:\")\n",
" for art in artifacts:\n",
" name = art.get(\"name\", \"Unnamed\")\n",
" art_type = (\n",
" art.get(\"metadata\", {})\n",
" .get(\"fields\", {})\n",
" .get(\"type\", {})\n",
" .get(\"stringValue\", \"Unknown\")\n",
" )\n",
" print(f\"- [{art_type}] {name}: {art.get('description')}\")\n",
"\n",
" # Example: If it is a GCS path\n",
" if \"gcs_path\" in art.get(\"metadata\", {}).get(\"fields\", {}):\n",
" path = art[\"metadata\"][\"fields\"][\"gcs_path\"][\"stringValue\"]\n",
" print(\n",
" \" GCS Link:\"\n",
" f\" https://console.cloud.google.com/storage/browser/{path.replace('gs://', '')}\"\n",
" )\n",
"\n",
"\n",
"# Use the final_task from the previous step\n",
"if \"final_task\" in locals():\n",
" process_artifacts(final_task)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cancel-task-header"
},
"source": [
"## 4. Cancel an Active Task\n",
"\n",
"If a task is taking too long or was sent in error, you can cancel it."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "cancel-task-code"
},
"outputs": [],
"source": [
"def cancel_task(task_id):\n",
" url = f\"{ENDPOINT}/v1/tasks/{task_id}:cancel\"\n",
" response = requests.post(url, headers=HEADERS, timeout=30)\n",
" response.raise_for_status()\n",
"\n",
" print(f\"Task {task_id} cancellation requested.\")\n",
" return response.json()\n",
"\n",
"\n",
"# Example usage (start a task and cancel it immediately)\n",
"try:\n",
" new_task_id = send_async_message(\"A long running query\")\n",
" if new_task_id:\n",
" cancel_task(new_task_id)\n",
"except Exception as e:\n",
" print(f\"Error during cancellation demo: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cleanup-header"
},
"source": [
"## 5. Cleanup\n",
"\n",
"It is good practice to clean up any temporary resources or state created during your session."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "cleanup-code"
},
"outputs": [],
"source": [
"# @title Resource Cleanup\n",
"print(\n",
" \"No specific cloud resources were created in this demo that require manual\"\n",
" \" deletion (e.g., storage buckets).\"\n",
")\n",
"print(\n",
" \"However, you can use this section to reset any local session state if\"\n",
" \" needed.\"\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Loading
Loading