Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies = [
"cachetools>=5.0.0",
"fastmcp==2.10.5",
"jmespath~=1.0.1",
"json-repair",
"loguru",
"pydantic>=2.0,<2.12",
]
Expand Down
46 changes: 37 additions & 9 deletions src/mcp_server_datahub/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pathlib
import re
import string
import threading
from typing import (
Any,
Awaitable,
Expand All @@ -43,6 +44,7 @@
from datahub.sdk.search_filters import Filter, FilterDsl, load_filters
from datahub.utilities.ordered_set import OrderedSet
from fastmcp import FastMCP
from json_repair import repair_json
from loguru import logger
from pydantic import BaseModel

Expand Down Expand Up @@ -984,13 +986,34 @@ def get_entities(urns: List[str] | str) -> List[dict] | dict:
"""
client = get_datahub_client()

# Handle single URN for backward compatibility
# Handle JSON-stringified arrays (same issue as filters in search tool)
# Some MCP clients/LLMs pass arrays as JSON strings instead of proper lists
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a common problem. We had to handle this at some other places as well (e.g. compile_filters).

WDYT about having a helper written such as parse_as_form(input_structure:Any, expected_type:Type) that returns instance of Type. The implementation can use repair_json and/or pydantic validators. I think, this helper will be useful at multiple places and will make the code cleaner and maintainable.

Alternatively, even if not generic parser, for now, can you please separate this parsing into a separate method that takes care of all parsing urns input arg and returning expected List[str] . This method should then also be unit tested.

if isinstance(urns, str):
urns = [urns]
return_single = True
urns_str = urns.strip() # Remove leading/trailing whitespace

# Try to parse as JSON array first
if urns_str.startswith("["):
try:
# Use json_repair to handle malformed JSON from LLMs
urns = json.loads(repair_json(urns_str))
return_single = False
except (json.JSONDecodeError, Exception) as e:
logger.warning(
f"Failed to parse URNs as JSON array: {e}. Treating as single URN."
)
# Not valid JSON, treat as single URN string
urns = [urns_str]
return_single = True
else:
# Single URN string
urns = [urns_str]
return_single = True
else:
return_single = False

# Trim whitespace from each URN (defensive against string concatenation issues)
urns = [urn.strip() for urn in urns]

results = []
for urn in urns:
try:
Expand Down Expand Up @@ -2467,6 +2490,7 @@ def _find_upstream_lineage_path(

# Track if tools have been registered to prevent duplicate registration
_tools_registered = False
_tools_registration_lock = threading.Lock()


def register_all_tools(is_oss: bool = False) -> None:
Expand All @@ -2476,15 +2500,19 @@ def register_all_tools(is_oss: bool = False) -> None:
is_oss: If True, use OSS-compatible tool descriptions (limited sorting fields).
If False, use Cloud descriptions (full sorting features).

Note: Can only be called once. Subsequent calls are no-ops to prevent duplicate registration.
Note: Thread-safe. Can be called multiple times from different threads.
Only the first call will register tools, subsequent calls are no-ops.
"""
global _tools_registered
if _tools_registered:
logger.debug("Tools already registered, skipping duplicate registration")
return

_tools_registered = True
logger.info(f"Registering MCP tools (is_oss={is_oss})")
# Thread-safe check-and-set using lock
with _tools_registration_lock:
if _tools_registered:
logger.debug("Tools already registered, skipping duplicate registration")
return

_tools_registered = True
logger.info(f"Registering MCP tools (is_oss={is_oss})")

# Choose sorting documentation based on deployment type
if not is_oss:
Expand Down
11 changes: 11 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.