-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
97 lines (83 loc) · 3.35 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# dune_analytics_mcp_httpx.py
from mcp.server.fastmcp import FastMCP
import httpx
import os
from dotenv import load_dotenv
import pandas as pd
import time
# Load environment variables
load_dotenv()
# Initialize MCP server
mcp = FastMCP(
name="Dune Analytics MCP Server",
description="Dune Analytics tools",
dependencies=["httpx", "pandas", "python-dotenv"]
)
# Configuration
DUNE_API_KEY = os.getenv("DUNE_API_KEY")
BASE_URL = "https://api.dune.com/api/v1"
HEADERS = {"X-Dune-API-Key": DUNE_API_KEY}
@mcp.tool()
def get_latest_result(query_id: int) -> str:
"""Get the latest results for a specific query ID as a CSV string on dune analytics"""
try:
# Fetch latest results
url = f"{BASE_URL}/query/{query_id}/results"
with httpx.Client() as client:
response = client.get(url, headers=HEADERS, timeout=300)
response.raise_for_status()
data = response.json()
# Convert results to DataFrame
result_data = data.get("result", {}).get("rows", [])
if not result_data:
return "No data available"
df = pd.DataFrame(result_data)
return df.to_csv(index=False)
except httpx.HTTPError as e:
return f"HTTP error fetching query results: {str(e)}"
except Exception as e:
return f"Error processing query results: {str(e)}"
@mcp.tool()
def run_query(query_id: int) -> str:
"""Run a query by ID and return results as a CSV string on dune analytics"""
try:
# Execute the query
url = f"{BASE_URL}/query/execute/{query_id}"
with httpx.Client() as client:
execute_response = client.post(url, headers=HEADERS, timeout=300)
execute_response.raise_for_status()
execution_data = execute_response.json()
execution_id = execution_data.get("execution_id")
if not execution_id:
return "Failed to start query execution"
# Poll for status until complete
status_url = f"{BASE_URL}/execution/{execution_id}/status"
while True:
status_response = client.get(status_url, headers=HEADERS)
status_response.raise_for_status()
status_data = status_response.json()
state = status_data.get("state")
if state == "EXECUTING" or state == "PENDING":
time.sleep(5) # Wait before polling again
elif state == "COMPLETED":
break
else:
return f"Query execution failed with state: {state}"
# Fetch results
results_url = f"{BASE_URL}/execution/{execution_id}/results"
results_response = client.get(results_url, headers=HEADERS)
results_response.raise_for_status()
results_data = results_response.json()
# Convert results to DataFrame
result_data = results_data.get("result", {}).get("rows", [])
if not result_data:
return "No data available"
df = pd.DataFrame(result_data)
return df.to_csv(index=False)
except httpx.HTTPError as e:
return f"HTTP error running query: {str(e)}"
except Exception as e:
return f"Error processing query: {str(e)}"
# Run the server
if __name__ == "__main__":
mcp.run()