Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Dremio MCP Server

## Project Overview

An MCP (Model Context Protocol) server that enables LLM integration with Dremio. It allows LLMs like Claude to query and interact with Dremio data sources via the MCP protocol. Supports local (stdio) and remote (streaming HTTP) deployment modes.

## Tech Stack

- **Language**: Python 3.11+
- **Package Manager**: `uv` (not pip)
- **Build System**: Hatchling
- **Framework**: FastMCP / FastAPI / Starlette
- **Key Libraries**: mcp, pydantic, structlog, typer, PyJWT, LaunchDarkly SDK
- **Testing**: pytest with pytest-asyncio (strict mode)

## Project Structure

```
src/dremioai/
├── api/ # API clients (Dremio REST, Prometheus, CLI)
│ ├── dremio/ # Dremio API client
│ ├── prometheus/ # Prometheus API client
│ └── cli/ # CLI helpers
├── config/ # Configuration management (YAML-based)
├── servers/ # MCP server implementation
│ ├── mcp.py # Main MCP server entry point (CLI via typer)
│ ├── jwks_verifier.py # JWT/JWKS auth verification
│ └── frameworks/ # Framework integrations (langchain, beeai)
├── tools/ # MCP tool definitions
│ └── tools.py # Base Tools class
├── metrics/ # Prometheus metrics
└── resources/ # MCP resources
```

## Common Commands

```bash
# Install dependencies
uv sync

# Run the MCP server
uv run dremio-mcp-server run

# Run with custom config
uv run dremio-mcp-server run --config-file <path>

# Run all tests
uv run pytest tests

# Run a specific test file
uv run pytest tests/test_chart.py

# Manage config
uv run dremio-mcp-server config create dremioai --uri <uri> --pat <pat>
uv run dremio-mcp-server config list --type dremioai

# Build Docker image
docker build -t dremio-mcp:0.1.0 .
```

## Development Guidelines

- Follow PEP 8 style guidelines
- Use type hints for function arguments and return values
- Async-first: tools and server handlers are async (`asyncio_mode = strict`)
- New tools must inherit from the `Tools` base class in `dremioai.tools.tools`
- Tools are categorized by `ToolType`: `FOR_DATA_PATTERNS`, `FOR_SELF`, `FOR_PROMETHEUS`
- Config is YAML-based, located at `~/.config/dremioai/config.yaml` by default
- Commit messages start with a JIRA ticket ID (e.g., `DX-XXXXX: description`)
- Branch from `main` for all changes

## Testing

- Test files live in `tests/` mirroring the `src/` structure
- pytest config is in `pytest.ini` with `-v --showlocals -x` defaults
- Tests use strict asyncio mode — use `@pytest.mark.asyncio` for async tests
- E2E tests are in `tests/e2e/`

## Deployment

- **Local**: stdio mode via `uv run dremio-mcp-server run`
- **Remote/K8s**: Helm chart in `helm/dremio-mcp/` with streaming HTTP mode
- Auth: PAT (dev/local) or OAuth + External Token Provider (production)
36 changes: 25 additions & 11 deletions src/dremioai/api/dremio/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio

from pydantic import (
BaseModel,
Expand All @@ -21,18 +22,22 @@
field_validator,
)
from typing import (
Any,
List,
Union,
Optional,
Optional, Dict,
)
from dremioai.api.util import UStrEnum
from datetime import datetime
from enum import auto
from dremioai.config import settings
from dremioai.api.transport import DremioAsyncHttpClient as AsyncHttpClient
from dremioai.api.dremio.catalog import get_schemas
from dremioai.api.dremio.catalog import get_schemas, get_schema
from dremioai import log
import pandas as pd

logger = log.logger(__name__)


class QueryType(UStrEnum):
UI_RUN = auto()
Expand Down Expand Up @@ -155,6 +160,16 @@ class EnterpriseSearchCatalogObject(BaseModel):
modified_at: Optional[datetime] = Field(default=None, alias="lastModifiedAt")
func_sql: Optional[str] = Field(default=None, alias="functionSql")
owner: Optional[EnterpriseSearchUserOrRoleObject] = None
schema: Optional[Dict[str, Any]] = None

async def populate_schemas(self):
if self.path:
try:
data = await get_schema(dataset_path_or_id=self.path, include_tags=True, flatten=True)
self.schema = data.get('schema') if data else None
except Exception as e:
logger.error("Schema not found for search",
path=self.path, reason=str(e))

def as_df_dict(self):
return {
Expand All @@ -163,6 +178,7 @@ def as_df_dict(self):
"type": self.type,
"tags": ",".join(self.labels),
"description": self.wiki,
"schema": self.schema,
}


Expand Down Expand Up @@ -247,16 +263,14 @@ async def get_search_results(
deser=EnterpriseSearchResults,
params=params,
)
result = [r for r in result if r.category in (Category.TABLE, Category.VIEW)]
tasks = [r.catalog.populate_schemas() for r in result]
await asyncio.gather(*tasks, return_exceptions=True)

if use_df:
result = [r for r in result if r.category in (Category.TABLE, Category.VIEW)]

data = [r.catalog.as_df_dict() for r in result]
paths = [p["path"] for p in data]
if schemas := await get_schemas(paths, include_tags=True, flatten=True):
for ix, schema in enumerate(schemas):
data[ix]["schema"] = schema.get("schema")

return pd.DataFrame(data=data)

df = pd.DataFrame(data=data)
Comment thread
ssaumitra marked this conversation as resolved.
if not df.empty:
df["not_found"] = df.schema.isna()
return df
return EnterpriseSearchResultsWrapper(results=result)
132 changes: 132 additions & 0 deletions tests/api/dremio/test_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#
# Copyright (C) 2017-2025 Dremio Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from unittest.mock import patch
from types import SimpleNamespace

import pandas as pd
import pytest
from aiohttp import ClientResponseError

from dremioai.api.dremio import catalog, search
from dremioai.tools import tools as tools_mod


def _client_response_error(status: int, message: str) -> ClientResponseError:
request_info = SimpleNamespace(
real_url="http://test/catalog/by-path/x", method="GET", headers={}, url="http://test"
)
return ClientResponseError(
request_info=request_info, history=(), status=status, message=message
)


@pytest.mark.asyncio
async def test_get_schemas_all_success():
async def fake_get_schema(p, *_a, **_kw):
return {"schema": {"col": "VARCHAR"}, "path": p}

with patch.object(catalog, "get_schema", side_effect=fake_get_schema):
result = await catalog.get_schemas([["a", "b"], ["c"]])

assert len(result) == 2
assert result[0]["path"] == ["a", "b"]
assert result[1]["path"] == ["c"]


@pytest.mark.asyncio
async def test_get_schemas_propagates_http_exception():
"""get_schemas does not swallow errors — exceptions bubble up to the caller (DX-118395)."""

async def fake_get_schema(*_a, **_kw):
raise _client_response_error(400, "Bad Request")

with patch.object(catalog, "get_schema", side_effect=fake_get_schema):
with pytest.raises(ClientResponseError):
await catalog.get_schemas([["ok", "one"], ["bad", "view"]])


@pytest.mark.asyncio
async def test_get_schemas_propagates_non_http_exception():
async def fake_get_schema(*_a, **_kw):
raise ValueError("kapow")

with patch.object(catalog, "get_schema", side_effect=fake_get_schema):
with pytest.raises(ValueError, match="kapow"):
await catalog.get_schemas([["boom"]])


@pytest.mark.asyncio
async def test_populate_schemas_marks_not_found_on_failure():
"""One broken catalog entry must not fail the whole search (DX-118395).
Error is embedded per-row via schema_not_found on EnterpriseSearchCatalogObject."""

async def fake_get_schema(dataset_path_or_id, *_a, **_kw):
if dataset_path_or_id == ["bad", "view"]:
raise _client_response_error(400, "Bad Request")
return {"schema": {"col": "VARCHAR"}}

ok = search.EnterpriseSearchCatalogObject(path=["ok", "one"], labels=[])
bad = search.EnterpriseSearchCatalogObject(path=["bad", "view"], labels=[])

with patch.object(search, "get_schema", side_effect=fake_get_schema):
await ok.populate_schemas()
await bad.populate_schemas()

assert ok.schema == {"col": "VARCHAR"}
assert bad.schema is None


@pytest.mark.asyncio
async def test_search_table_and_views_drops_broken_entries_and_returns_healthy_ones():
"""DX-118395: one broken catalog entry must not fail the whole tool call.

The tool silently drops entries whose schema could not be fetched and
returns the healthy ones.
"""

ok_df = pd.DataFrame(
[{"path": ["ok", "tbl"], "name": "ok.tbl", "schema": {"a": "INT"}}]
)
bad_df = pd.DataFrame(
[{"path": ["bad", "view"], "name": "bad.view", "schema": None}]
)

async def fake_search(search_obj, use_df=False):
if search_obj.filter == 'category in ["TABLE"]':
return ok_df
return bad_df

with patch.object(tools_mod.search, "get_search_results", side_effect=fake_search):
result = await tools_mod.SearchTableAndViews().invoke("NYC bike trips")

assert set(result.keys()) == {"results"}
names = {row["name"] for row in result["results"]}
assert "ok.tbl" in names


@pytest.mark.asyncio
async def test_get_descriptions_raises_on_schema_fetch_error():
"""get_descriptions must remain fail-fast so GetDescriptionOfTableOrSchema
surfaces an error instead of silently returning partial data."""

async def fake_get_schema(p, *_a, **_kw):
raise _client_response_error(400, "Bad Request")

with patch.object(catalog, "get_schema", side_effect=fake_get_schema):
with pytest.raises(ClientResponseError):
await catalog.get_descriptions([["a", "b"]])

Loading