Skip to content

Commit 10e1dc4

Browse files
authored
feat: add get_component_input_output method to HaystackServiceResource (#29)
* feat: add get_component_input_output method to HaystackServiceResource * feat: add get_component_input_output to FakeHaystackServiceResource * test: add unit tests for get_component_input_output * test: add integration test for get_component_input_output * fix: typing issues; tests
1 parent 0064fa0 commit 10e1dc4

12 files changed

Lines changed: 245 additions & 108 deletions

File tree

src/deepset_mcp/api/client.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import os
22
from types import TracebackType
3-
from typing import Any, Self
3+
from typing import Any, Self, TypeVar, overload
44

55
from deepset_mcp.api.haystack_service.resource import HaystackServiceResource
66
from deepset_mcp.api.pipeline.resource import PipelineResource
77
from deepset_mcp.api.pipeline_template.resource import PipelineTemplateResource
88
from deepset_mcp.api.protocols import AsyncClientProtocol
99
from deepset_mcp.api.transport import AsyncTransport, TransportProtocol, TransportResponse
1010

11+
T = TypeVar("T")
12+
1113

1214
class AsyncDeepsetClient(AsyncClientProtocol):
1315
"""Async Client for interacting with the deepset API."""
@@ -46,13 +48,37 @@ def __init__(
4648
config=transport_config,
4749
)
4850

51+
@overload
52+
async def request(
53+
self,
54+
endpoint: str,
55+
*,
56+
response_type: type[T],
57+
method: str = "GET",
58+
data: dict[str, Any] | None = None,
59+
headers: dict[str, str] | None = None,
60+
) -> TransportResponse[T]: ...
61+
62+
@overload
63+
async def request(
64+
self,
65+
endpoint: str,
66+
*,
67+
response_type: None = None,
68+
method: str = "GET",
69+
data: dict[str, Any] | None = None,
70+
headers: dict[str, str] | None = None,
71+
) -> TransportResponse[Any]: ...
72+
4973
async def request(
5074
self,
5175
endpoint: str,
76+
*,
5277
method: str = "GET",
5378
data: dict[str, Any] | None = None,
5479
headers: dict[str, str] | None = None,
55-
) -> TransportResponse:
80+
response_type: type[T] | None = None,
81+
) -> TransportResponse[Any]:
5682
"""Make a request to the deepset API."""
5783
if not endpoint.startswith("/"):
5884
endpoint = f"/{endpoint}"
@@ -75,6 +101,7 @@ async def request(
75101
url,
76102
json=data,
77103
headers=request_headers,
104+
response_type=response_type,
78105
)
79106

80107
async def close(self) -> None:

src/deepset_mcp/api/haystack_service/resource.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Any
22

3+
from deepset_mcp.api.exceptions import ResourceNotFoundError
34
from deepset_mcp.api.protocols import AsyncClientProtocol, HaystackServiceProtocol
45
from deepset_mcp.api.transport import raise_for_status
56

@@ -27,3 +28,26 @@ async def get_component_schemas(self) -> dict[str, Any]:
2728
raise_for_status(resp)
2829

2930
return resp.json if resp.json is not None else {}
31+
32+
async def get_component_input_output(self, component_name: str) -> dict[str, Any]:
33+
"""Fetch the component input and output schema from the API.
34+
35+
Args:
36+
component_name: The name of the component to fetch the input/output schema for
37+
38+
Returns:
39+
The component input/output schema as a dictionary
40+
"""
41+
resp = await self._client.request(
42+
endpoint=f"v1/haystack/components/input-output?domain=deepset-cloud&names={component_name}",
43+
method="GET",
44+
headers={"accept": "application/json"},
45+
response_type=list[dict[str, Any]],
46+
)
47+
48+
raise_for_status(resp)
49+
50+
if resp.json is None or len(resp.json) == 0:
51+
raise ResourceNotFoundError(f"Component '{component_name}' not found.")
52+
53+
return resp.json[0] if resp.json is not None else {}

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ async def validate(self, yaml_config: str) -> PipelineValidationResult:
4848
if resp.success:
4949
return PipelineValidationResult(valid=True)
5050

51-
# If 400 error, we have validation errors to process
52-
if resp.status_code == 400 and resp.json is not None and "details" in resp.json:
51+
if resp.status_code == 400 and resp.json is not None and isinstance(resp.json, dict) and "details" in resp.json:
5352
errors = [ValidationError(code=error["code"], message=error["message"]) for error in resp.json["details"]]
5453

5554
return PipelineValidationResult(valid=False, errors=errors)

src/deepset_mcp/api/protocols.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from types import TracebackType
2-
from typing import Any, Protocol, Self
2+
from typing import Any, Protocol, Self, TypeVar, overload
33

44
from deepset_mcp.api.pipeline.models import DeepsetPipeline, NoContentResponse, PipelineValidationResult
55
from deepset_mcp.api.pipeline_template.models import PipelineTemplate
@@ -13,17 +13,48 @@ async def get_component_schemas(self) -> dict[str, Any]:
1313
"""Fetch the component schema from the API."""
1414
...
1515

16+
async def get_component_input_output(self, component_name: str) -> dict[str, Any]:
17+
"""Fetch input and output schema for a component from the API."""
18+
...
19+
20+
21+
T = TypeVar("T")
22+
1623

1724
class AsyncClientProtocol(Protocol):
1825
"""Protocol defining the implementation for AsyncClient."""
1926

27+
@overload
28+
async def request(
29+
self,
30+
endpoint: str,
31+
*,
32+
response_type: type[T],
33+
method: str = "GET",
34+
data: dict[str, Any] | None = None,
35+
headers: dict[str, str] | None = None,
36+
) -> TransportResponse[T]: ...
37+
38+
@overload
39+
async def request(
40+
self,
41+
endpoint: str,
42+
*,
43+
response_type: None = None,
44+
method: str = "GET",
45+
data: dict[str, Any] | None = None,
46+
headers: dict[str, str] | None = None,
47+
) -> TransportResponse[Any]: ...
48+
2049
async def request(
2150
self,
2251
endpoint: str,
52+
*,
53+
response_type: type[T] | None = None,
2354
method: str = "GET",
2455
data: dict[str, Any] | None = None,
2556
headers: dict[str, str] | None = None,
26-
) -> TransportResponse:
57+
) -> TransportResponse[Any]:
2758
"""Make a request to the API."""
2859
...
2960

src/deepset_mcp/api/transport.py

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,29 @@
11
import json
22
from dataclasses import dataclass
3-
from typing import Any, Protocol
3+
from typing import Any, Generic, Protocol, TypeVar, cast, overload
44

55
import httpx
66

77
from deepset_mcp.api.exceptions import BadRequestError, ResourceNotFoundError, UnexpectedAPIError
88

9+
T = TypeVar("T")
10+
911

1012
@dataclass
11-
class TransportResponse:
12-
"""Response envelope for HTTP transport."""
13+
class TransportResponse(Generic[T]):
14+
"""Reponse envelope for HTTP transport."""
1315

1416
text: str
1517
status_code: int
16-
json: dict[str, Any] | None = None
18+
json: T | None = None
1719

1820
@property
1921
def success(self) -> bool:
20-
"""Returns True if the response status code indicates success (< 400)."""
22+
"""Check if the response was successful (status code < 400)."""
2123
return self.status_code < 400
2224

2325

24-
def raise_for_status(response: TransportResponse) -> None:
26+
def raise_for_status(response: TransportResponse[Any]) -> None:
2527
"""Raises the appropriate exception based on the response status code."""
2628
if response.success:
2729
return
@@ -32,9 +34,12 @@ def raise_for_status(response: TransportResponse) -> None:
3234
404: ResourceNotFoundError,
3335
}
3436

35-
# Extract error details from response if available
36-
detail = response.json.get("details") if response.json else None
37-
message = response.json.get("message") if response.json else response.text
37+
if isinstance(response.json, dict):
38+
detail = response.json.get("details") if response.json else None
39+
message = response.json.get("message") if response.json else response.text
40+
else:
41+
detail = json.dumps(response.json) if response.json else None
42+
message = response.text
3843

3944
# Get exception class
4045
exception_class = exception_map.get(response.status_code)
@@ -52,8 +57,20 @@ def raise_for_status(response: TransportResponse) -> None:
5257
class TransportProtocol(Protocol):
5358
"""Protocol for HTTP transport."""
5459

55-
async def request(self, method: str, url: str, **kwargs: Any) -> TransportResponse:
56-
"""Send an HTTP request and return the parsed JSON response."""
60+
@overload
61+
async def request(
62+
self, method: str, url: str, *, response_type: type[T], **kwargs: Any
63+
) -> TransportResponse[T]: ...
64+
65+
@overload
66+
async def request(
67+
self, method: str, url: str, *, response_type: None = None, **kwargs: Any
68+
) -> TransportResponse[Any]: ...
69+
70+
async def request(
71+
self, method: str, url: str, *, response_type: type[T] | None = None, **kwargs: Any
72+
) -> TransportResponse[Any]:
73+
"""Send an HTTP request and return the response."""
5774
...
5875

5976
async def close(self) -> None:
@@ -94,18 +111,34 @@ def __init__(
94111
}
95112
self._client = httpx.AsyncClient(**client_kwargs)
96113

97-
async def request(self, method: str, url: str, **kwargs: Any) -> TransportResponse:
114+
@overload
115+
async def request(
116+
self, method: str, url: str, *, response_type: type[T], **kwargs: Any
117+
) -> TransportResponse[T]: ...
118+
119+
@overload
120+
async def request(
121+
self, method: str, url: str, *, response_type: None = None, **kwargs: Any
122+
) -> TransportResponse[Any]: ...
123+
124+
async def request(
125+
self, method: str, url: str, *, response_type: type[T] | None = None, **kwargs: Any
126+
) -> TransportResponse[Any]:
98127
"""Send an HTTP request and return the response."""
99128
response = await self._client.request(method, url, **kwargs)
100129

101-
transport_response = TransportResponse(text=response.text, status_code=response.status_code)
130+
if response_type is not None:
131+
raw = response.json()
132+
payload: T = cast(T, raw)
133+
return TransportResponse(text=response.text, status_code=response.status_code, json=payload)
102134

103135
try:
104-
transport_response.json = response.json()
136+
untyped_response = response.json()
105137
except json.decoder.JSONDecodeError:
138+
untyped_response = None
106139
pass
107140

108-
return transport_response
141+
return TransportResponse(text=response.text, status_code=response.status_code, json=untyped_response)
109142

110143
async def close(self) -> None:
111144
"""Clean up any resources (e.g., close connections)."""

src/deepset_mcp/main.py

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -134,84 +134,6 @@ async def validate_pipeline(yaml_configuration: str) -> str:
134134
return response
135135

136136

137-
# @mcp.tool()
138-
# async def list_pipeline_templates() -> str:
139-
# """Retrieves a list of pipeline templates to build AI applications like RAG or Agents."""
140-
# workspace = get_workspace()
141-
# async with AsyncDeepsetClient() as client:
142-
# resp = await client.request(
143-
# endpoint=f"v1/workspaces/{workspace}/pipeline_templates?limit=100&page_number=1&field=created_at&order=DESC"
144-
# )
145-
#
146-
# response = cast(dict[str, Any], resp.json)
147-
#
148-
# if isinstance(response, dict) and "error" in response:
149-
# return f"Error retrieving pipeline templates: {response['error']}"
150-
#
151-
# # Extract the template data
152-
# templates = response.get("data", [])
153-
#
154-
# if not templates:
155-
# return "No pipeline templates found."
156-
#
157-
# # Format the response as requested
158-
# formatted_output = []
159-
# for template in templates:
160-
# name = template.get("pipeline_name", "Unnamed Template")
161-
# description = template.get("description", "No description available")
162-
#
163-
# formatted_output.append(f"<template>\n{name}\n\n{description}\n</template>\n")
164-
#
165-
# # Join all template entries and return
166-
# return "\n".join(formatted_output)
167-
#
168-
#
169-
# @mcp.tool()
170-
# async def get_pipeline_template(template_name: str) -> str:
171-
# """Retrieves a specific pipeline template by name and returns its YAML configurations.
172-
#
173-
# Parameters
174-
# ----------
175-
# template_name : str
176-
# The name of the pipeline template to retrieve
177-
#
178-
# Returns
179-
# -------
180-
# str
181-
# A formatted string containing the template's query and indexing YAML (if available)
182-
# """
183-
# workspace = get_workspace()
184-
#
185-
# async with AsyncDeepsetClient() as client:
186-
# resp = await client.request(endpoint=f"v1/workspaces/{workspace}/pipeline_templates/{template_name}")
187-
#
188-
# response = cast(dict[str, Any], resp.json)
189-
#
190-
# if isinstance(response, dict) and "error" in response:
191-
# return f"Error retrieving pipeline template: {response['error']}"
192-
#
193-
# # Extract the YAML data
194-
# query_yaml = response.get("query_yaml")
195-
# indexing_yaml = response.get("indexing_yaml")
196-
# template_name = response.get("name", "Unnamed Template")
197-
#
198-
# # Format the response
199-
# formatted_output = [f"# Pipeline Template: {template_name}\n"]
200-
#
201-
# # Add query YAML if available
202-
# if query_yaml:
203-
# formatted_output.append("## QUERY PIPELINE YAML\n```yaml\n" + query_yaml + "\n```\n")
204-
# else:
205-
# formatted_output.append("## QUERY PIPELINE YAML\nNo query pipeline YAML available for this template.\n")
206-
#
207-
# # Add indexing YAML if available
208-
# if indexing_yaml:
209-
# formatted_output.append("## INDEXING PIPELINE YAML\n```yaml\n" + indexing_yaml + "\n```\n")
210-
# else:
211-
# formatted_output.append("## INDEXING PIPELINE YAML\nNo indexing pipeline YAML available for this template.\n")
212-
#
213-
# # Join all sections and return
214-
# return "\n".join(formatted_output)
215137
#
216138
#
217139
# @mcp.tool()

test/integration/test_integration_haystack_service_resource.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,17 @@ async def test_get_component_schemas(
2323

2424
assert isinstance(response, dict)
2525
assert "component_schema" in response
26+
27+
28+
@pytest.mark.asyncio
29+
async def test_get_component_input_output(
30+
haystack_service_resource: HaystackServiceResource,
31+
) -> None:
32+
"""Test for getting component input/output schema."""
33+
response = await haystack_service_resource.get_component_input_output("Agent")
34+
35+
assert isinstance(response, dict)
36+
assert "name" in response
37+
assert response["name"] == "Agent"
38+
assert "input" in response
39+
assert "output" in response

0 commit comments

Comments
 (0)