Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Environment file for Docker deployment

# Required Dremio configuration
DREMIO_URI=prodemea
DREMIO_PAT=4rwPHjnRQYu/y8WrQymIa9ayYhV9HR3pm8D9v7b4oceXOTdU5vtTurbWz1OCnQ==

# Optional Dremio configuration (for Dremio Cloud)
DREMIO_PROJECT_ID=be852b6f-4f3b-443a-8499-dbfb5adc5a3a

# MCP Server configuration
MCP_SERVER_MODE=FOR_DATA_PATTERNS,FOR_PROMETHEUS
MCP_LOG_TO_FILE=false
MCP_TRANSPORT=sse
MCP_PORT=8080
MCP_HOST=0.0.0.0

# Prometheus configuration (required for FOR_PROMETHEUS mode)
PROMETHEUS_URI=https://vmauth.ops.dremio.tools
PROMETHEUS_TOKEN=hudson-tenant-7
14 changes: 14 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Local environment configuration for Docker testing
# Based on your current ~/.config/dremioai/config.yaml

DREMIO_URI=prodemea
DREMIO_PAT=4rwPHjnRQYu/y8WrQymIa9ayYhV9HR3pm8D9v7b4oceXOTdU5vtTurbWz1OCnQ==
DREMIO_PROJECT_ID=be852b6f-4f3b-443a-8499-dbfb5adc5a3a
MCP_SERVER_MODE=FOR_DATA_PATTERNS,FOR_PROMETHEUS
MCP_LOG_TO_FILE=true
MCP_PORT=8080
MCP_HOST=0.0.0.0

# Prometheus configuration for FOR_PROMETHEUS mode
PROMETHEUS_URI=https://vmauth.ops.dremio.tools
PROMETHEUS_TOKEN=hudson-tenant-7
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,37 @@ There are 3 modes

Multiple modes can be specified with separated by `,`

#### SQL Protocol Options

The MCP server supports two protocols for SQL execution:

1. **REST** (default): Uses Dremio's REST API for SQL execution
- Compatible with all Dremio deployments
- Standard HTTP-based communication
- Suitable for most use cases

2. **FLIGHT_SQL**: Uses Apache Arrow Flight SQL for SQL execution
- More efficient for large result sets
- Direct binary data transfer
- Requires Flight SQL endpoint to be accessible (typically port 443 for Dremio Cloud)

To configure the SQL protocol, add the `sql_protocol` field to your Dremio configuration:

```yaml
dremio:
uri: https://api.eu.dremio.cloud
pat: your-personal-access-token
project_id: your-project-id
sql_protocol: FLIGHT_SQL # or REST (default)
```

Or use the environment variable:
```bash
export DREMIO_SQL_PROTOCOL=FLIGHT_SQL
```

**Note**: Flight SQL requires the `adbc-driver-flightsql` package, which is automatically installed with the MCP server.

### The LLM (Claude) config file

To setup the Claude config file (refer to [this as an example](https://modelcontextprotocol.io/quickstart/user#2-add-the-filesystem-mcp-server)) edit the Claude desktop config file
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ description = "Foundation for llm integration with Dremio"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"adbc-driver-flightsql>=1.7.0",
"aiohttp>=3.11.12",
"beeai-framework>=0.1.8",
"click>=8.1.8",
Expand All @@ -23,6 +24,7 @@ dependencies = [
"openai>=1.65.3",
"pandas>=2.2.3",
"prompt-toolkit>=3.0.50",
"pyarrow>=18.0.0",
"pydantic>=2.10.6",
"pydantic-settings>=2.8.1",
"pytest>=8.3.5",
Expand All @@ -31,6 +33,7 @@ dependencies = [
"requests>=2.32.3",
"rich>=13.9.4",
"sqlglot>=26.23.0",
"starlette>=0.41.0",
"structlog>=25.1.0",
"typer>=0.15.2",
"uvicorn>=0.34.0",
Expand Down
68 changes: 60 additions & 8 deletions src/dremioai/api/dremio/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from dremioai.api.transport import DremioAsyncHttpClient as AsyncHttpClient
from dremioai.config import settings
from dremioai.log import logger


class ArcticSourceType(UStrEnum):
Expand Down Expand Up @@ -228,16 +229,67 @@ async def get_results(
return jr


async def run_query(
async def run_query_flightsql(
query: Union[Query, str], use_df: bool = False
) -> Union[JobResultsWrapper, pd.DataFrame]:
client = AsyncHttpClient()
"""Run query using Flight SQL protocol."""
try:
from dremioai.api.transport_flightsql import FlightSQLTransport
except ImportError:
raise RuntimeError("Flight SQL support requires adbc-driver-flightsql. Install with: pip install adbc-driver-flightsql")

if not isinstance(query, Query):
query = Query(sql=query)

project_id = settings.instance().dremio.project_id
endpoint = f"/v0/projects/{project_id}" if project_id else "/api/v3"
qs: QuerySubmission = await client.post(
f"{endpoint}/sql", body=query.model_dump(), deser=QuerySubmission
)
return await get_results(project_id, qs, use_df=use_df, client=client)
config = settings.instance().dremio

try:
# Create Flight SQL transport
client = FlightSQLTransport(
uri=config.uri,
pat=config.pat,
project_id=config.project_id
)

# Execute query
df = await client.execute_query(query.sql)

if use_df:
return df
else:
# Convert DataFrame back to JobResultsWrapper format for compatibility
rows = df.to_dict('records')
schema = [ResultSchema(name=col, type=ResultSchemaType(name="VARCHAR")) for col in df.columns]
result = JobResults(rowCount=len(df), schema=schema, rows=rows)
return JobResultsWrapper([result])

except Exception as e:
logger().error(f"Flight SQL query execution failed: {e}")
raise RuntimeError(f"Flight SQL query execution failed: {e}")


async def run_query(
query: Union[Query, str], use_df: bool = False
) -> Union[JobResultsWrapper, pd.DataFrame]:
"""Run query using the configured SQL protocol (REST or Flight SQL)."""
config = settings.instance().dremio

# Check if Flight SQL is configured
sql_protocol = getattr(config, 'sql_protocol', 'REST').upper()

if sql_protocol == 'FLIGHT_SQL':
logger().info("Using Flight SQL protocol for query execution")
return await run_query_flightsql(query, use_df)
else:
logger().info("Using REST protocol for query execution")
# Original REST implementation
client = AsyncHttpClient()
if not isinstance(query, Query):
query = Query(sql=query)

project_id = settings.instance().dremio.project_id
endpoint = f"/v0/projects/{project_id}" if project_id else "/api/v3"
qs: QuerySubmission = await client.post(
f"{endpoint}/sql", body=query.model_dump(), deser=QuerySubmission
)
return await get_results(project_id, qs, use_df=use_df, client=client)
121 changes: 121 additions & 0 deletions src/dremioai/api/transport_flightsql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Flight SQL transport implementation for Dremio Cloud.

This module provides a transport layer that uses Apache Arrow Flight SQL
to connect to Dremio Cloud, which requires Flight SQL instead of basic Flight.
"""

import asyncio
from typing import Optional
import pandas as pd
import adbc_driver_flightsql.dbapi as flight_sql
from dremioai.log import logger


class FlightSQLTransport:
"""Transport implementation using Apache Arrow Flight SQL for Dremio Cloud."""

def __init__(self, uri: str, pat: str, project_id: Optional[str] = None):
"""
Initialize Flight SQL transport.

Args:
uri: Dremio URI (e.g., https://api.eu.dremio.cloud)
pat: Personal Access Token for authentication
project_id: Optional project ID for Dremio Cloud
"""
self.uri = uri
self.pat = pat
self.project_id = project_id

# Convert HTTP URI to Flight SQL URI
if uri.startswith("https://"):
self.flight_uri = uri.replace("https://", "grpc+tls://") + ":443"
elif uri.startswith("http://"):
self.flight_uri = uri.replace("http://", "grpc://")
else:
self.flight_uri = uri

self._connection = None

logger().info(f"Initialized Flight SQL transport for {self.flight_uri}")

async def _get_connection(self):
"""Get or create Flight SQL connection."""
if self._connection is None:
try:
# Prepare database kwargs for ADBC Flight SQL driver
db_kwargs = {
"adbc.flight.sql.authorization_header": f"Bearer {self.pat}",
}

# TODO: Add project ID support - need to find correct parameter name
# The 'adbc.flight.sql.catalog' parameter is not supported by this driver version
if self.project_id:
logger().warning(f"Project ID specified ({self.project_id}) but catalog parameter not yet supported in Flight SQL driver")

logger().info(f"Connecting to Flight SQL endpoint: {self.flight_uri}")

# Create connection using ADBC Flight SQL driver with correct API
self._connection = flight_sql.connect(uri=self.flight_uri, db_kwargs=db_kwargs)

logger().info("Flight SQL connection established successfully")

except Exception as e:
logger().error(f"Failed to establish Flight SQL connection: {e}")
raise RuntimeError(f"Flight SQL connection failed: {e}")

return self._connection

async def execute_query(self, sql: str) -> pd.DataFrame:
"""Execute SQL query using Flight SQL and return results as DataFrame"""
try:
connection = await self._get_connection()

logger().info(f"Executing Flight SQL query: {sql}")

# Execute query
cursor = connection.cursor()
cursor.execute(sql)

# Try different methods to fetch results, handling schema inconsistencies
try:
# First try fetch_df() which might handle schema inconsistencies better
df = cursor.fetch_df()
except Exception as df_error:
logger().warning(f"fetch_df() failed: {df_error}")
try:
# Try fetchall() which returns rows as tuples
logger().info("Trying fetchall() method...")
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]

# Convert to pandas DataFrame manually
import pandas as pd
df = pd.DataFrame(rows, columns=columns)
logger().info(f"Successfully fetched {len(df)} rows using fetchall()")

except Exception as fetchall_error:
logger().warning(f"fetchall() failed: {fetchall_error}")
# Last resort: try fetch_arrow_table()
logger().info("Trying fetch_arrow_table() as last resort...")
arrow_table = cursor.fetch_arrow_table()
df = arrow_table.to_pandas()

logger().info(f"Flight SQL query executed successfully, returned {len(df)} rows")
return df

except Exception as e:
logger().error(f"Flight SQL query execution failed: {e}")
raise RuntimeError(f"Flight SQL query execution failed: {e}")

async def close(self):
"""Close the Flight SQL connection."""
if self._connection:
try:
self._connection.close()
logger().info("Flight SQL connection closed")
except Exception as e:
logger().warning(f"Error closing Flight SQL connection: {e}")
finally:
self._connection = None
1 change: 1 addition & 0 deletions src/dremioai/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class Dremio(BaseModel):
)
oauth2: Optional[OAuth2] = None
allow_dml: Optional[bool] = False
sql_protocol: Optional[str] = Field(default="REST", description="SQL protocol to use: REST or FLIGHT_SQL")
model_config = ConfigDict(validate_assignment=True)

@field_serializer("raw_pat")
Expand Down
Loading