Skip to content

Commit d6094d8

Browse files
authored
Merge pull request #12 from deepset-ai/feat/deepset_client
feat: add AsyncDeepsetClient
2 parents 696eef7 + 23ac9e7 commit d6094d8

6 files changed

Lines changed: 295 additions & 17 deletions

File tree

pyproject.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies = [
99
"mcp[cli]>=1.6.0",
1010
"requests>=2.32.3",
1111
"uvicorn>=0.34.2",
12+
"httpx",
1213
]
1314

1415
[project.scripts]
@@ -23,7 +24,8 @@ packages = ["src/deepset_mcp"]
2324

2425
[dependency-groups]
2526
dev = [
26-
"pytest"
27+
"pytest",
28+
"pytest-asyncio",
2729
]
2830
lint = [
2931
"ruff",
@@ -60,6 +62,9 @@ ignore = [
6062
]
6163
isort = { combine-as-imports = true, known-first-party = ["deepset_mcp"] }
6264

65+
[tool.ruff.lint.per-file-ignores]
66+
"test/*" = ["D"]
67+
6368
[tool.ruff.lint.pydocstyle]
6469
convention = "pep257"
6570

src/deepset_mcp/client.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import os
2+
from types import TracebackType
3+
from typing import Any
4+
5+
from deepset_mcp.transport import AsyncTransport, TransportProtocol
6+
7+
8+
class AsyncDeepsetClient:
9+
"""Async Client for interacting with the deepset API."""
10+
11+
def __init__(
12+
self,
13+
api_key: str | None = None,
14+
base_url: str = "https://api.cloud.deepset.ai/api",
15+
transport: TransportProtocol | None = None,
16+
transport_config: dict[str, Any] | None = None,
17+
) -> None:
18+
"""
19+
Initialize an instance of the AsyncDeepsetClient.
20+
21+
Parameters
22+
----------
23+
api_key : str, optional
24+
API key or token. Falls back to DEEPSET_API_KEY env var.
25+
base_url : str, optional
26+
Base URL for the deepset API.
27+
transport : TransportProtocol, optional
28+
Custom transport implementation.
29+
transport_config : dict, optional
30+
Configuration for default transport (e.g. timeout).
31+
"""
32+
self.api_key = api_key or os.environ.get("DEEPSET_API_KEY")
33+
if not self.api_key:
34+
raise ValueError("API key not provided and DEEPSET_API_KEY environment variable not set")
35+
self.base_url = base_url
36+
if transport is not None:
37+
self._transport = transport
38+
else:
39+
self._transport = AsyncTransport(
40+
base_url=self.base_url,
41+
api_key=self.api_key,
42+
config=transport_config,
43+
)
44+
45+
async def request(
46+
self,
47+
endpoint: str,
48+
method: str = "GET",
49+
data: dict[str, Any] | None = None,
50+
headers: dict[str, str] | None = None,
51+
) -> Any:
52+
"""Make a request to the deepset API."""
53+
if not endpoint.startswith("/"):
54+
endpoint = f"/{endpoint}"
55+
url = self.base_url + endpoint
56+
57+
# Default headers
58+
request_headers: dict[str, str] = {
59+
"Authorization": f"Bearer {self.api_key}",
60+
"Accept": "application/json,text/plain,*/*",
61+
}
62+
if data is not None:
63+
request_headers["Content-Type"] = "application/json"
64+
# Merge custom headers
65+
if headers:
66+
headers.setdefault("Authorization", request_headers["Authorization"])
67+
request_headers.update(headers)
68+
69+
return await self._transport.request(
70+
method,
71+
url,
72+
json=data,
73+
headers=request_headers,
74+
)
75+
76+
async def close(self) -> None:
77+
"""Close underlying transport resources."""
78+
await self._transport.close()
79+
80+
async def __aenter__(self) -> "AsyncDeepsetClient":
81+
"""Enter the AsyncContextManager."""
82+
return self
83+
84+
async def __aexit__(
85+
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
86+
) -> bool:
87+
"""Exit the AsyncContextmanager and clean up resources."""
88+
await self.close()
89+
return False

src/deepset_mcp/main.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from mcp.server.fastmcp import FastMCP
66
from requests import HTTPError
77

8+
from deepset_mcp.client import AsyncDeepsetClient
9+
810
# Initialize MCP Server
911
mcp = FastMCP("Deepset Cloud MCP")
1012

@@ -69,20 +71,18 @@ def deepset_api_request(endpoint: str, method: str = "GET", data: dict[str, Any]
6971

7072

7173
@mcp.tool()
72-
def list_pipelines() -> dict[str, Any]:
74+
async def list_pipelines() -> Any:
7375
"""Retrieves a list of all pipelines available within the currently configured deepset workspace.
7476
7577
Use this when you need to know the names or IDs of existing pipelines.
7678
"""
7779
workspace = get_workspace()
78-
try:
79-
return deepset_api_request(f"/workspaces/{workspace}/pipelines")
80-
except Exception as e:
81-
return {"error": str(e)}
80+
async with AsyncDeepsetClient() as client:
81+
return await client.request(endpoint=f"v1/workspaces/{workspace}/pipelines", method="GET")
8282

8383

8484
@mcp.tool()
85-
def get_pipeline(pipeline_id: str) -> dict[str, Any]:
85+
async def get_pipeline(pipeline_id: str) -> Any:
8686
"""Fetches detailed configuration information for a specific pipeline, identified by its unique `pipeline_id`.
8787
8888
This includes its components, connections, and metadata.
@@ -91,23 +91,19 @@ def get_pipeline(pipeline_id: str) -> dict[str, Any]:
9191
:param pipeline_id: ID of the pipeline to retrieve.
9292
"""
9393
workspace = get_workspace()
94-
try:
95-
return deepset_api_request(f"/workspaces/{workspace}/pipelines/{pipeline_id}")
96-
except Exception as e:
97-
return {"error": str(e)}
94+
async with AsyncDeepsetClient() as client:
95+
return await client.request(endpoint=f"v1/workspaces/{workspace}/pipelines/{pipeline_id}", method="GET")
9896

9997

10098
@mcp.tool()
101-
def get_component_schemas() -> dict[str, Any]:
99+
async def get_component_schemas() -> Any:
102100
"""Retrieves the schemas for all available Haystack components from the deepset API.
103101
104102
These schemas define the expected input and output parameters for each component type, which is useful for
105103
constructing or validating componets in a pipeline YAML.
106104
"""
107-
try:
108-
return deepset_api_request("/haystack/components")
109-
except Exception as e:
110-
return {"error": str(e)}
105+
async with AsyncDeepsetClient() as client:
106+
return await client.request(endpoint="v1/haystack/components", method="GET")
111107

112108

113109
@mcp.tool()

src/deepset_mcp/transport.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from typing import Any, Protocol
2+
3+
import httpx
4+
5+
6+
class TransportProtocol(Protocol):
7+
"""Protocol for HTTP transport."""
8+
9+
async def request(self, method: str, url: str, **kwargs: Any) -> Any:
10+
"""Send an HTTP request and return the parsed JSON response."""
11+
...
12+
13+
async def close(self) -> None:
14+
"""Clean up any resources (e.g., close connections)."""
15+
...
16+
17+
18+
class AsyncTransport(TransportProtocol):
19+
"""Asynchronous HTTP transport using httpx.AsyncClient."""
20+
21+
def __init__(
22+
self,
23+
base_url: str,
24+
api_key: str,
25+
config: dict[str, Any] | None = None,
26+
):
27+
"""
28+
Initialize an instance of AsyncTransport.
29+
30+
Parameters
31+
----------
32+
base_url : str
33+
Base URL for the API
34+
api_key : str
35+
Bearer token for authentication
36+
config : dict, optional
37+
Configuration for httpx.AsyncClient, e.g., {'timeout': 10.0}
38+
"""
39+
config = config or {}
40+
# Ensure auth header
41+
headers = config.pop("headers", {})
42+
headers.setdefault("Authorization", f"Bearer {api_key}")
43+
# Build client kwargs
44+
client_kwargs = {
45+
"base_url": base_url,
46+
"headers": headers,
47+
**config,
48+
}
49+
self._client = httpx.AsyncClient(**client_kwargs)
50+
51+
async def request(self, method: str, url: str, **kwargs: Any) -> Any:
52+
"""Send an HTTP request and return the parsed JSON response."""
53+
resp = await self._client.request(method, url, **kwargs)
54+
resp.raise_for_status()
55+
56+
return resp.json()
57+
58+
async def close(self) -> None:
59+
"""Clean up any resources (e.g., close connections)."""
60+
await self._client.aclose()

test/test_async_deepset_client.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from typing import Any
2+
3+
import pytest
4+
import pytest_asyncio
5+
6+
from deepset_mcp.client import AsyncDeepsetClient
7+
from deepset_mcp.transport import AsyncTransport, TransportProtocol
8+
9+
10+
class DummyProtocol(TransportProtocol):
11+
def __init__(self) -> None:
12+
self.requests: list[dict[str, Any]] = []
13+
self.closed: bool = False
14+
15+
async def request(self, method: str, url: str, **kwargs: Any) -> Any:
16+
# Record the request and return a dummy response
17+
record: dict[str, Any] = {"method": method, "url": url, **kwargs}
18+
self.requests.append(record)
19+
return {"dummy": "response"}
20+
21+
async def close(self) -> None:
22+
self.closed = True
23+
24+
25+
@pytest_asyncio.fixture(autouse=True) # type: ignore
26+
def clear_env(monkeypatch: pytest.MonkeyPatch) -> None:
27+
# Ensure DEEPSET_API_KEY is unset by default unless explicitly set
28+
monkeypatch.delenv("DEEPSET_API_KEY", raising=False)
29+
30+
31+
@pytest.mark.asyncio
32+
async def test_init_with_api_key_and_transport() -> None:
33+
dummy: DummyProtocol = DummyProtocol()
34+
client: AsyncDeepsetClient = AsyncDeepsetClient(api_key="testkey", transport=dummy)
35+
assert client.api_key == "testkey"
36+
assert client._transport is dummy
37+
38+
39+
@pytest.mark.asyncio
40+
async def test_init_without_api_key_raises() -> None:
41+
# No API key in args or env
42+
with pytest.raises(ValueError):
43+
AsyncDeepsetClient()
44+
45+
46+
@pytest.mark.asyncio
47+
async def test_init_with_env_api_key(monkeypatch: pytest.MonkeyPatch) -> None:
48+
monkeypatch.setenv("DEEPSET_API_KEY", "envkey")
49+
client: AsyncDeepsetClient = AsyncDeepsetClient()
50+
assert client.api_key == "envkey"
51+
# Default transport is AsyncTransport
52+
assert isinstance(client._transport, AsyncTransport)
53+
54+
55+
@pytest.mark.asyncio
56+
async def test_request_default_headers_and_url() -> None:
57+
dummy: DummyProtocol = DummyProtocol()
58+
client: AsyncDeepsetClient = AsyncDeepsetClient(api_key="key", base_url="https://api.test", transport=dummy)
59+
60+
resp: Any = await client.request("endpoint")
61+
# Should normalize endpoint to '/endpoint'
62+
assert resp == {"dummy": "response"}
63+
assert len(dummy.requests) == 1
64+
65+
call: dict[str, Any] = dummy.requests[0]
66+
assert call["method"] == "GET"
67+
assert call["url"] == "https://api.test/endpoint"
68+
headers: dict[str, str] = call["headers"]
69+
assert headers["Authorization"] == "Bearer key"
70+
assert headers["Accept"] == "application/json,text/plain,*/*"
71+
assert "Content-Type" not in headers
72+
assert call.get("json") is None
73+
74+
75+
@pytest.mark.asyncio
76+
async def test_request_with_data_and_custom_headers() -> None:
77+
dummy: DummyProtocol = DummyProtocol()
78+
client: AsyncDeepsetClient = AsyncDeepsetClient(api_key="key", base_url="https://api.test", transport=dummy)
79+
80+
data: dict[str, Any] = {"foo": "bar"}
81+
custom: dict[str, str] = {"X-Custom": "value"}
82+
resp: Any = await client.request("/path", method="POST", data=data, headers=custom)
83+
assert resp == {"dummy": "response"}
84+
assert len(dummy.requests) == 1
85+
86+
call = dummy.requests[0]
87+
assert call["method"] == "POST"
88+
assert call["url"] == "https://api.test/path"
89+
headers = call["headers"]
90+
# Custom header merged
91+
assert headers["X-Custom"] == "value"
92+
assert headers["Content-Type"] == "application/json"
93+
# Authorization preserved
94+
assert headers["Authorization"] == "Bearer key"
95+
assert call.get("json") == data
96+
97+
98+
@pytest.mark.asyncio
99+
async def test_close_and_context_manager() -> None:
100+
dummy: DummyProtocol = DummyProtocol()
101+
client: AsyncDeepsetClient = AsyncDeepsetClient(api_key="key", transport=dummy)
102+
103+
# Test close
104+
await client.close()
105+
assert dummy.closed is True
106+
107+
# Test async context manager
108+
dummy2: DummyProtocol = DummyProtocol()
109+
async with AsyncDeepsetClient(api_key="key", transport=dummy2) as ctx:
110+
assert isinstance(ctx, AsyncDeepsetClient)
111+
# After exit, close should be called
112+
assert dummy2.closed is True

uv.lock

Lines changed: 17 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)