Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
5fed71b
feat: implement DeepsetClient with async context manager support
mathislucka May 6, 2025
00e3ae5
feat: add imports for DeepsetClient
mathislucka May 6, 2025
80408d8
feat: add helper to run async functions in sync context
mathislucka May 6, 2025
b0c99a7
feat: implement DeepsetClient-based API request functions
mathislucka May 6, 2025
5cf2a0f
refactor: update list_pipelines to use DeepsetClient
mathislucka May 6, 2025
515b8d8
refactor: update get_pipeline to use DeepsetClient
mathislucka May 6, 2025
98316d9
refactor: update get_component_schemas to use DeepsetClient
mathislucka May 6, 2025
51a9f0b
refactor: update validate_pipeline_yaml to be async
mathislucka May 6, 2025
3cf3bf5
refactor: update validate_pipeline_yaml to use DeepsetClient
mathislucka May 6, 2025
cc69078
refactor: update get_pipeline_yaml to be async
mathislucka May 6, 2025
be5992b
refactor: update get_pipeline_yaml to use DeepsetClient
mathislucka May 6, 2025
cd390a9
refactor: update update_pipeline_yaml to be async
mathislucka May 6, 2025
6cf1bac
refactor: update update_pipeline_yaml to use DeepsetClient
mathislucka May 6, 2025
fb51a61
refactor: simplify update_pipeline_yaml with DeepsetClient
mathislucka May 6, 2025
54e58ac
refactor: update list_pipeline_templates to be async
mathislucka May 6, 2025
694f90f
refactor: update list_pipeline_templates to use DeepsetClient
mathislucka May 6, 2025
43adb55
refactor: update get_pipeline_template to be async
mathislucka May 6, 2025
acb1ff4
refactor: update get_pipeline_template to use DeepsetClient
mathislucka May 6, 2025
c1d422d
refactor: update get_custom_components to be async
mathislucka May 6, 2025
ad3e662
refactor: update get_custom_components to use DeepsetClient
mathislucka May 6, 2025
58f52b9
refactor: update get_latest_custom_component_installation_logs to be …
mathislucka May 6, 2025
fdf403c
refactor: update get_latest_custom_component_installation_logs to use…
mathislucka May 6, 2025
12c3c9d
refactor: update list_custom_component_installations to be async
mathislucka May 6, 2025
16453c0
refactor: update list_custom_component_installations to use DeepsetCl…
mathislucka May 6, 2025
0dbf7e3
refactor: update user info fetching to use DeepsetClient
mathislucka May 6, 2025
3c8721c
chore: add httpx and typing-extensions dependencies
mathislucka May 6, 2025
a052e37
test: add unit tests for DeepsetClient
mathislucka May 6, 2025
684e568
test: update imports for test_main.py
mathislucka May 6, 2025
282ae79
test: update test_update_pipeline_yaml_success_json_response to use D…
mathislucka May 6, 2025
97ad8de
test: update test_update_pipeline_yaml_success_empty_response to use …
mathislucka May 6, 2025
efa2cb5
test: update test_update_pipeline_yaml_api_error_422_json to use Deep…
mathislucka May 6, 2025
5985e16
test: update test_update_pipeline_yaml_api_error_500_text to use Deep…
mathislucka May 6, 2025
9a721bb
test: update test_update_pipeline_yaml_request_exception to use Deeps…
mathislucka May 6, 2025
f8024f3
test: update test_update_pipeline_yaml_empty_content to use DeepsetCl…
mathislucka May 6, 2025
87cd2c9
test: update test_update_pipeline_yaml_invalid_structure to use Deeps…
mathislucka May 6, 2025
303d19c
refactor: remove resource-specific methods from DeepsetClient
mathislucka May 6, 2025
38fdfb0
test: update client tests to focus on core functionality
mathislucka May 6, 2025
6d104db
refactor: update list_pipelines to use generic request method
mathislucka May 6, 2025
137da48
refactor: update get_pipeline to use generic request method
mathislucka May 6, 2025
eb88284
refactor: update get_component_schemas to use generic request method
mathislucka May 6, 2025
54e44a1
refactor: update validate_pipeline_yaml to use generic request method
mathislucka May 6, 2025
5a72953
refactor: update get_pipeline_yaml to use generic request method
mathislucka May 6, 2025
1da51f3
refactor: update update_pipeline_yaml to use generic request method
mathislucka May 6, 2025
8ca604b
refactor: update list_pipeline_templates to use generic request method
mathislucka May 6, 2025
a46e553
refactor: update get_pipeline_template to use generic request method
mathislucka May 6, 2025
75fae43
feat: import MockHttpClient from client module
mathislucka May 6, 2025
d0614a8
refactor: update get_latest_custom_component_installation_logs to use…
mathislucka May 6, 2025
20c1070
refactor: update list_custom_component_installations to use generic r…
mathislucka May 6, 2025
43eb511
refactor: remove async_to_sync decorator since mcp.tool supports asyn…
mathislucka May 6, 2025
1c426e9
refactor: simplify API request function to async only
mathislucka May 6, 2025
beb6188
chore: remove unused import
mathislucka May 6, 2025
2804591
refactor: update MockHttpClient with comment to move to client.py
mathislucka May 6, 2025
2aac3fa
chore: add mock import to client module
mathislucka May 6, 2025
ff322fc
feat: add MockHttpClient to client module
mathislucka May 6, 2025
78a0685
test: update test_update_pipeline_yaml_success_json_response to use M…
mathislucka May 6, 2025
0005451
test: update test_update_pipeline_yaml_success_empty_response to use …
mathislucka May 6, 2025
ad0f71a
test: update test_update_pipeline_yaml_api_error_422_json to use mock…
mathislucka May 6, 2025
c1a6183
test: update test_update_pipeline_yaml_api_error_500_text to use mock…
mathislucka May 6, 2025
be9e113
test: update test_update_pipeline_yaml_request_exception to use mocke…
mathislucka May 6, 2025
e8fcb4c
test: update test_update_pipeline_yaml_empty_content to use mocked De…
mathislucka May 6, 2025
2cfc1f8
test: update test_update_pipeline_yaml_invalid_structure to use mocke…
mathislucka May 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dependencies = [
"mcp[cli]>=1.6.0",
"requests>=2.32.3",
"uvicorn>=0.34.2",
"httpx>=0.24.1",
"typing-extensions>=4.7.0",
]

[project.scripts]
Expand Down
365 changes: 365 additions & 0 deletions src/deepset_mcp/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,365 @@
import os
import logging
from typing import Any, AsyncIterator, Protocol, TypeVar
from typing_extensions import Self # For python 3.11 compatibility
from unittest import mock

import httpx

# Types for dependency injection/protocol
T = TypeVar("T")


# Custom mock HTTP client for testing
class MockHttpClient(Protocol):
def __init__(self, responses: dict[str, Any]) -> None:
self.responses = responses
self.requests: list[dict[str, Any]] = []

async def get(
self, url: str, *, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None
) -> httpx.Response:
self.requests.append({"method": "GET", "url": url, "headers": headers, "params": params})
key = f"GET {url}"
if key in self.responses:
return self.responses[key]
# Default mock response
mock_response = mock.Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.text = "{}"
mock_response.json.return_value = {}
return mock_response

async def post(
self,
url: str,
*,
headers: dict[str, str] | None = None,
json: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> httpx.Response:
self.requests.append({"method": "POST", "url": url, "headers": headers, "json": json, "data": data})
key = f"POST {url}"
if key in self.responses:
return self.responses[key]
# Default mock response
mock_response = mock.Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.text = "{}"
mock_response.json.return_value = {}
return mock_response

async def put(
self,
url: str,
*,
headers: dict[str, str] | None = None,
json: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> httpx.Response:
self.requests.append({"method": "PUT", "url": url, "headers": headers, "json": json, "data": data})
key = f"PUT {url}"
if key in self.responses:
return self.responses[key]
# Default mock response
mock_response = mock.Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.text = "{}"
mock_response.json.return_value = {}
return mock_response


class HttpClient(Protocol):
"""Protocol for HTTP clients to enable dependency injection and testing."""

async def get(
self, url: str, *, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None
) -> httpx.Response:
"""Make a GET request."""
...

async def post(
self,
url: str,
*,
headers: dict[str, str] | None = None,
json: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> httpx.Response:
"""Make a POST request."""
...

async def put(
self,
url: str,
*,
headers: dict[str, str] | None = None,
json: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> httpx.Response:
"""Make a PUT request."""
...


class HttpxClient:
"""Implementation of HttpClient using httpx."""

def __init__(self, client: httpx.AsyncClient) -> None:
"""Initialize with an httpx client."""
self._client = client

async def get(
self, url: str, *, headers: dict[str, str] | None = None, params: dict[str, Any] | None = None
) -> httpx.Response:
"""Make a GET request using httpx."""
return await self._client.get(url, headers=headers, params=params)

async def post(
self,
url: str,
*,
headers: dict[str, str] | None = None,
json: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> httpx.Response:
"""Make a POST request using httpx."""
return await self._client.post(url, headers=headers, json=json, data=data)

async def put(
self,
url: str,
*,
headers: dict[str, str] | None = None,
json: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> httpx.Response:
"""Make a PUT request using httpx."""
return await self._client.put(url, headers=headers, json=json, data=data)


class DeepsetClient:
"""Client for interacting with the deepset API."""

def __init__(
self,
api_key: str | None = None,
workspace: str | None = None,
base_url: str = "https://api.cloud.deepset.ai/api/v1",
http_client: HttpClient | None = None,
) -> None:
"""Initialize the DeepsetClient.

Parameters
----------
api_key : str, optional
The API key to use for authentication. If not provided, it will be read from the DEEPSET_API_KEY
environment variable.
workspace : str, optional
The workspace to use. If not provided, it will be read from the DEEPSET_WORKSPACE environment variable.
base_url : str, optional
The base URL for the deepset API. Defaults to "https://api.cloud.deepset.ai/api/v1".
http_client : HttpClient, optional
The HTTP client to use for making requests. If not provided, an HttpxClient will be created.
"""
self.api_key = api_key or os.environ.get("DEEPSET_API_KEY")
if not self.api_key:
raise ValueError("API key not provided and DEEPSET_API_KEY environment variable not set")

self.workspace = workspace or os.environ.get("DEEPSET_WORKSPACE")
if not self.workspace:
raise ValueError("Workspace not provided and DEEPSET_WORKSPACE environment variable not set")

self.base_url = base_url
self._http_client = http_client
self._owns_client = http_client is None
self._logger = logging.getLogger(__name__)

async def __aenter__(self) -> Self:
"""Enter the async context manager."""
if self._http_client is None:
httpx_client = httpx.AsyncClient()
self._http_client = HttpxClient(httpx_client)
return self

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the async context manager."""
if self._owns_client and isinstance(self._http_client, HttpxClient):
await self._http_client._client.aclose()

def _get_headers(self, content_type: str | None = None) -> dict[str, str]:
"""Get headers with authentication."""
headers = {"Authorization": f"Bearer {self.api_key}", "Accept": "application/json,text/plain,*/*"}
if content_type:
headers["Content-Type"] = content_type
return headers

def _build_url(self, endpoint: str) -> str:
"""Build a URL from an endpoint."""
# Ensure endpoint starts with a slash if not empty
if endpoint and not endpoint.startswith("/"):
endpoint = f"/{endpoint}"
return f"{self.base_url}{endpoint}"

async def _process_response(self, response: httpx.Response) -> dict[str, Any]:
"""Process a response from the API."""
if response.status_code >= 400:
error_message = f"API Error: {response.status_code}"
try:
error_details = response.json()
except Exception:
error_details = response.text if response.text else "No error details provided by API"
return {"error": error_message, "details": error_details}

if not response.text or not response.text.strip():
return {"status": "success", "message": "API returned empty response body"}

try:
return response.json()
except Exception:
return {"result": response.text, "warning": "API response was not valid JSON"}

async def request(
self, endpoint: str, method: str = "GET", data: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Make a request to the deepset API.

Parameters
----------
endpoint : str
The endpoint to request, relative to the base URL.
method : str, optional
The HTTP method to use. Defaults to "GET".
data : dict[str, Any], optional
The data to send with the request. Defaults to None.

Returns
-------
dict[str, Any]
The response from the API, processed into a dictionary.
"""
if not self._http_client:
raise RuntimeError("DeepsetClient must be used as an async context manager")

url = self._build_url(endpoint)
self._logger.debug(f"Making {method} request to {url}")

try:
if method == "GET":
response = await self._http_client.get(url, headers=self._get_headers())
elif method == "POST":
response = await self._http_client.post(
url, headers=self._get_headers("application/json"), json=data
)
elif method == "PUT":
response = await self._http_client.put(
url, headers=self._get_headers("application/json"), json=data
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

return await self._process_response(response)
except httpx.RequestError as e:
self._logger.error(f"Request failed: {str(e)}")
return {"error": f"Request failed: {str(e)}"}
except Exception as e:
self._logger.error(f"Unexpected error during request: {str(e)}")
return {"error": f"Unexpected error during request: {str(e)}"}

async def request_with_custom_headers(
self, endpoint: str, headers: dict[str, str], method: str = "GET", data: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Make a request to the deepset API with custom headers.

This is useful for endpoints that need special headers (like text/plain for logs).

Parameters
----------
endpoint : str
The endpoint to request, relative to the base URL.
headers : dict[str, str]
The headers to use for the request. Authorization will be added if not present.
method : str, optional
The HTTP method to use. Defaults to "GET".
data : dict[str, Any], optional
The data to send with the request. Defaults to None.

Returns
-------
dict[str, Any]
The response from the API, processed into a dictionary.
"""
if not self._http_client:
raise RuntimeError("DeepsetClient must be used as an async context manager")

url = self._build_url(endpoint)
self._logger.debug(f"Making {method} request to {url} with custom headers")

# Add authorization if not present
if "Authorization" not in headers:
headers["Authorization"] = f"Bearer {self.api_key}"

try:
if method == "GET":
response = await self._http_client.get(url, headers=headers)
elif method == "POST":
response = await self._http_client.post(url, headers=headers, json=data)
elif method == "PUT":
response = await self._http_client.put(url, headers=headers, json=data)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

return await self._process_response(response)
except httpx.RequestError as e:
self._logger.error(f"Request failed: {str(e)}")
return {"error": f"Request failed: {str(e)}"}
except Exception as e:
self._logger.error(f"Unexpected error during request: {str(e)}")
return {"error": f"Unexpected error during request: {str(e)}"}

async def request_v2_api(
self, endpoint: str, method: str = "GET", data: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Make a request to the deepset API v2 endpoint.

Parameters
----------
endpoint : str
The endpoint to request, relative to the base URL (without /api/v2).
method : str, optional
The HTTP method to use. Defaults to "GET".
data : dict[str, Any], optional
The data to send with the request. Defaults to None.

Returns
-------
dict[str, Any]
The response from the API, processed into a dictionary.
"""
url = self._build_url(endpoint).replace("/api/v1", "/api/v2")
self._logger.debug(f"Making {method} request to v2 API: {url}")

if not self._http_client:
raise RuntimeError("DeepsetClient must be used as an async context manager")

try:
if method == "GET":
response = await self._http_client.get(url, headers=self._get_headers())
elif method == "POST":
response = await self._http_client.post(
url, headers=self._get_headers("application/json"), json=data
)
elif method == "PUT":
response = await self._http_client.put(
url, headers=self._get_headers("application/json"), json=data
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")

return await self._process_response(response)
except httpx.RequestError as e:
self._logger.error(f"Request failed: {str(e)}")
return {"error": f"Request failed: {str(e)}"}
except Exception as e:
self._logger.error(f"Unexpected error during request: {str(e)}")
return {"error": f"Unexpected error during request: {str(e)}"}

Loading
Loading