Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: Docker

on:
release:
types: [published]

jobs:
build_and_publish:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4

- name: Extract version from release tag
id: version
run: |
echo "version=${{ github.event.release.tag_name }}" >> "$GITHUB_OUTPUT"

- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}

- name: Log in to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
push: true
build-args: |
VERSION=${{ steps.version.outputs.version }}
tags: |
acryldata/mcp-server-datahub:${{ steps.version.outputs.version }}
acryldata/mcp-server-datahub:latest
ghcr.io/${{ github.repository }}:${{ steps.version.outputs.version }}
ghcr.io/${{ github.repository }}:latest
cache-from: type=gha
cache-to: type=gha,mode=max
32 changes: 32 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FROM python:3.11-slim

WORKDIR /app

# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unpinned uv:latest tag risks breaking Docker builds

Medium Severity

The COPY --from=ghcr.io/astral-sh/uv:latest uses an unpinned :latest tag, making the Docker build non-reproducible. If uv releases a breaking change (e.g., moving the binary path from /uv, or changing CLI behavior), builds will silently break. The existing wheels.yml workflow pins astral-sh/setup-uv@v6, but this Dockerfile has no version constraint at all. Pinning to a specific version or major version tag (e.g., uv:0.6) would prevent unexpected build failures.

Fix in Cursor Fix in Web

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoring this as other use cases use uv:latest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not pin? where are the other use cases that use uv:latest?


# Copy dependency files
COPY pyproject.toml uv.lock ./

# Install dependencies (no dev deps, no editable install yet)
RUN uv sync --frozen --no-dev --no-install-project

# Copy source
COPY src/ ./src/

# Inject version at build time so setuptools-scm fallback (0.0.0) is not used.
# The .git directory is not available during Docker builds, so we write
# _version.py directly from the VERSION build arg.
ARG VERSION=0.0.0
RUN printf '__version__ = version = "%s"\n__version_tuple__ = version_tuple = tuple(int(x) if x.isdigit() else x for x in "%s".lstrip("v").split("."))\n__commit_id__ = commit_id = None\n' \
"$VERSION" "$VERSION" \
> src/mcp_server_datahub/_version.py

# Install the project itself
RUN uv sync --frozen --no-dev
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docker image always reports version 0.0.0

Medium Severity

The project uses setuptools-scm for versioning, which derives the version from git tags and writes _version.py. Since _version.py is in .gitignore (not tracked in git) and the Dockerfile never copies the .git directory, setuptools-scm cannot determine the version when uv sync runs during the build. It falls back to fallback_version = "0.0.0" from pyproject.toml. This means every Docker image — even those tagged with a real release version by the CI workflow — will report __version__ as "0.0.0", affecting the --version CLI output and the telemetry datahub_component string.

Fix in Cursor Fix in Web

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed this


ENV PATH="/app/.venv/bin:$PATH"

EXPOSE 8000

CMD ["mcp-server-datahub", "--transport", "http"]
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,50 @@ The agent may either:
| `get_lineage_paths_between` | Understand deeper relationships between datasets. |


## Docker (HTTP Deployment)

The server can be run as a standalone HTTP service using Docker. In this mode, authentication tokens are supplied **per request** rather than baked into the server — making it suitable for multi-user deployments where each client has its own DataHub token.

### Authentication

Each request must supply a DataHub token via the `Authorization` header:

```
Authorization: Bearer <your-datahub-token>
```

### Docker Compose (recommended)

Create a `.env` file:

```env
DATAHUB_GMS_URL=https://your-datahub-instance
```

Then run:

```bash
docker compose up
```

### Docker (manual)

```bash
docker build -t mcp-server-datahub .
docker run -p 8000:8000 \
-e DATAHUB_GMS_URL=https://your-datahub-instance \
mcp-server-datahub
```

The server exposes two endpoints:

- `http://localhost:8000/mcp` — MCP endpoint (stateless HTTP transport)
- `http://localhost:8000/health` — Health check

### Optional environment variables

Pass any [configuration variables](#environment-variables) via `.env` or `-e` flags. For example, to enable mutation tools set `TOOLS_IS_MUTATION_ENABLED=true`.

## Developing

See [DEVELOPING.md](DEVELOPING.md).
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
services:
mcp-server-datahub:
build: .
ports:
- "${MCP_SERVER_PORT:-8000}:8000"
environment:
DATAHUB_GMS_URL: ${DATAHUB_GMS_URL}
TOOLS_IS_MUTATION_ENABLED: ${TOOLS_IS_MUTATION_ENABLED:-false}
TOOLS_IS_USER_ENABLED: ${TOOLS_IS_USER_ENABLED:-false}
111 changes: 100 additions & 11 deletions src/mcp_server_datahub/__main__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import logging
from typing import Any
import os
from typing import Any, Optional

import click
from datahub.ingestion.graph.config import ClientMode
from datahub.ingestion.graph.config import ClientMode, DatahubClientConfig
from datahub.sdk.main_client import DataHubClient
from datahub.telemetry import telemetry
from fastmcp import FastMCP
from fastmcp.server.auth import TokenVerifier
from fastmcp.server.auth.auth import AccessToken
from fastmcp.server.dependencies import get_http_request
from fastmcp.server.middleware import Middleware
from fastmcp.server.middleware.logging import LoggingMiddleware
from starlette.requests import Request
Expand All @@ -24,6 +28,64 @@
register_all_tools(is_oss=True)


_GET_ME_QUERY = "query getMe { me { corpUser { urn username } } }"


def _build_client(server_url: str, token: str) -> DataHubClient:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to have a cache for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we need to have a cache for this. There's no http calls here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it results in the new connection for every call, no? I'm pretty certain we need to cache

return DataHubClient(
config=DatahubClientConfig(
server=server_url,
token=token,
client_mode=ClientMode.SDK,
datahub_component=f"mcp-server-datahub/{__version__}",
)
)


def _verify_client(client: DataHubClient) -> None:
"""Verify the client can authenticate by calling the me query."""
client._graph.execute_graphql(_GET_ME_QUERY)


def _token_from_request() -> Optional[str]:
"""Extract a DataHub token from the current HTTP request.

Reads the ``Authorization: Bearer <token>`` header.
"""
try:
request = get_http_request()
except RuntimeError:
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when runtime error would be thrown? should we propagate the exception up instead of returning None here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is called outside of an http context. It should never happen in the current implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should propagate the exception then, why changing it to None?

auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
return auth[len("Bearer ") :]
return None


class _DataHubTokenVerifier(TokenVerifier):
"""FastMCP TokenVerifier that validates DataHub bearer tokens.

Called by FastMCP's BearerAuthBackend for every HTTP request that carries
an Authorization: Bearer header. If the token is valid a synthetic
AccessToken is returned; otherwise None causes FastMCP to reply with
401 WWW-Authenticate: Bearer automatically.
"""

def __init__(self, server_url: str) -> None:
super().__init__()
self._server_url = server_url

async def verify_token(self, token: str) -> Optional[AccessToken]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to have this cacheable?

try:
client = _build_client(self._server_url, token)
_verify_client(client)
return AccessToken(
client_id=f"mcp-server-datahub/{__version__}", scopes=[], token=token
)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are caching perhaps we should be more precise in what to catch here, we don't want to cache null if there was 500 server error from the server

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want to avoid caching errors in general

return None


class _DataHubClientMiddleware(Middleware):
"""Middleware that propagates the DataHub client ContextVar into each request.

Expand All @@ -32,18 +94,34 @@ class _DataHubClientMiddleware(Middleware):
thread. This middleware ensures the DataHub client is available in every request
context by setting the ContextVar at the start of each MCP message.

Token validation is handled upstream by ``_DataHubTokenVerifier`` for Bearer
tokens. This middleware only needs to build the client for the current request
(or fall back to the default token when a global token is configured).

Must be added as the first middleware so it wraps all other middlewares.
"""

def __init__(self, client: DataHubClient) -> None:
self._client = client
def __init__(self, server_url: str, default_token: Optional[str] = None) -> None:
self._server_url = server_url
self._default_token = default_token

def _client_for_request(self) -> DataHubClient:
token = _token_from_request()
if token is not None:
# Token already validated by _DataHubTokenVerifier.
return _build_client(self._server_url, token)
if self._default_token is not None:
return _build_client(self._server_url, self._default_token)
raise ValueError(
"No DataHub token provided. Supply a token via the Authorization header."
)

async def on_message(
self,
context: Any,
call_next: Any,
) -> Any:
with with_datahub_client(self._client):
with with_datahub_client(self._client_for_request()):
return await call_next(context)


Expand Down Expand Up @@ -72,16 +150,19 @@ def create_app() -> FastMCP:
if _app_initialized:
return mcp

client = DataHubClient.from_env(
client_mode=ClientMode.SDK,
datahub_component=f"mcp-server-datahub/{__version__}",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude is right - this is a breaking change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix.

server_url = os.environ.get("DATAHUB_GMS_URL")
if not server_url:
raise RuntimeError("DATAHUB_GMS_URL environment variable is required.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we often configure gms and token vs datahubEnv, this will break this flow


global_token = os.environ.get("DATAHUB_GMS_TOKEN")
if global_token:
_verify_client(_build_client(server_url, global_token))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do this here? not that it is a big deal either way, I'm just confused how this is related to the pr

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to error at startup if the token isn't valid since nothing will work without a valid token.


# _DataHubClientMiddleware must be first so the client ContextVar is
# available to all subsequent middlewares and tool handlers. This is
# especially important for HTTP transport where each request runs in a
# separate async context.
mcp.add_middleware(_DataHubClientMiddleware(client))
mcp.add_middleware(_DataHubClientMiddleware(server_url, global_token))
mcp.add_middleware(TelemetryMiddleware())
mcp.add_middleware(VersionFilterMiddleware())
mcp.add_middleware(DocumentToolsMiddleware())
Expand Down Expand Up @@ -115,7 +196,15 @@ def main(transport: Literal["stdio", "sse", "http"], debug: bool) -> None:
create_app()

if transport == "http":
mcp.run(transport=transport, show_banner=False, stateless_http=True)
server_url = os.environ.get("DATAHUB_GMS_URL", "")
if not os.environ.get("DATAHUB_GMS_TOKEN"):
mcp.auth = _DataHubTokenVerifier(server_url)
mcp.run(
transport=transport,
show_banner=False,
stateless_http=True,
host="0.0.0.0",
)
else:
mcp.run(transport=transport, show_banner=False)

Expand Down
Loading
Loading