Skip to content

Commit 56ba896

Browse files
committed
feat: add pipeline validation
1 parent b463651 commit 56ba896

12 files changed

Lines changed: 372 additions & 309 deletions

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ packages = ["src/deepset_mcp"]
2727
dev = [
2828
"pytest",
2929
"pytest-asyncio",
30-
"python-dotenv"
30+
"python-dotenv",
31+
"pyyaml",
3132
]
3233
lint = [
3334
"ruff",

src/deepset_mcp/api/client.py

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,11 @@
11
import os
22
from types import TracebackType
3-
from typing import Any, Protocol, Self
3+
from typing import Any, Self
44

5+
from deepset_mcp.api.pipeline.resource import PipelineResource
56
from deepset_mcp.api.transport import AsyncTransport, TransportProtocol, TransportResponse
67

78

8-
class AsyncClientProtocol(Protocol):
9-
"""Protocol defining the implementation for AsyncClient."""
10-
11-
async def request(
12-
self,
13-
endpoint: str,
14-
method: str = "GET",
15-
data: dict[str, Any] | None = None,
16-
headers: dict[str, str] | None = None,
17-
) -> TransportResponse:
18-
"""Make a request to the API."""
19-
...
20-
21-
async def close(self) -> None:
22-
"""Close underlying transport resources."""
23-
...
24-
25-
async def __aenter__(self) -> Self:
26-
"""Enter the AsyncContextManager."""
27-
...
28-
29-
async def __aexit__(
30-
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
31-
) -> bool:
32-
"""Exit the AsyncContextmanager and clean up resources."""
33-
...
34-
35-
369
class AsyncDeepsetClient:
3710
"""Async Client for interacting with the deepset API."""
3811

@@ -115,3 +88,7 @@ async def __aexit__(
11588
"""Exit the AsyncContextmanager and clean up resources."""
11689
await self.close()
11790
return False
91+
92+
def pipelines(self, workspace: str) -> PipelineResource:
93+
"""Resource to interact with pipelines in the specified workspace."""
94+
return PipelineResource(client=self, workspace=workspace)

src/deepset_mcp/api/exceptions.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from typing import Any
2+
3+
4+
class DeepsetAPIError(Exception):
5+
"""Base exception for all deepset API errors."""
6+
7+
def __init__(self, status_code: int | None = None, message: Any | None = None, detail: Any | None = None) -> None:
8+
"""Initialize the exception."""
9+
self.status_code = status_code
10+
self.message = message
11+
self.detail = detail
12+
super().__init__(self.message)
13+
14+
def __str__(self) -> str:
15+
"""Return a string representation of the exception."""
16+
return f"{self.message} (Status Code: {self.status_code})"
17+
18+
19+
class ResourceNotFoundError(DeepsetAPIError):
20+
"""Exception raised when a resource is not found (HTTP 404)."""
21+
22+
def __init__(self, message: Any = "Resource not found", detail: Any | None = None) -> None:
23+
"""Initialize the exception."""
24+
super().__init__(status_code=404, message=message, detail=detail)
25+
26+
27+
class BadRequestError(DeepsetAPIError):
28+
"""Exception raised for invalid requests (HTTP 400)."""
29+
30+
def __init__(self, message: Any = "Bad request", detail: Any | None = None) -> None:
31+
"""Initialize the exception."""
32+
super().__init__(status_code=400, message=message, detail=detail)
33+
34+
35+
class UnexpectedAPIError(DeepsetAPIError):
36+
"""Catch-all exception for unexpected API errors."""
37+
38+
def __init__(
39+
self, status_code: int | None = None, message: Any = "Unexpected API error", detail: Any | None = None
40+
):
41+
"""Initialize the exception."""
42+
super().__init__(status_code=status_code, message=message, detail=detail)

src/deepset_mcp/api/pipeline/models.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from datetime import datetime
22
from enum import StrEnum
3-
from typing import List
43

54
from pydantic import BaseModel, Field
65

@@ -59,4 +58,4 @@ class PipelineValidationResult(BaseModel):
5958
"""Result of validating a pipeline configuration."""
6059

6160
valid: bool
62-
errors: List[ValidationError] = []
61+
errors: list[ValidationError] = []
Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from typing import Any
22

3-
from deepset_mcp.api.client import AsyncClientProtocol
3+
from deepset_mcp.api.exceptions import UnexpectedAPIError
44
from deepset_mcp.api.pipeline.models import DeepsetPipeline, PipelineValidationResult, ValidationError
5+
from deepset_mcp.api.protocols import AsyncClientProtocol
6+
from deepset_mcp.api.transport import raise_for_status
57

68

79
class PipelineResource:
@@ -15,48 +17,44 @@ def __init__(
1517
"""Initializes a PipelineResource instance."""
1618
self._client = client
1719
self._workspace = workspace
18-
20+
1921
async def validate(self, yaml_config: str) -> PipelineValidationResult:
2022
"""
2123
Validate a pipeline's YAML configuration against the API.
22-
24+
2325
Args:
2426
yaml_config: The YAML configuration string to validate
25-
27+
2628
Returns:
2729
PipelineValidationResult containing validation status and any errors
28-
30+
2931
Raises:
3032
ValueError: If the YAML is not valid (422 error) or contains syntax errors
3133
"""
32-
data = {
33-
"deepset_cloud_version": "v2",
34-
"query_yaml": yaml_config
35-
}
36-
34+
data = {"query_yaml": yaml_config}
35+
3736
resp = await self._client.request(
3837
endpoint=f"v1/workspaces/{self._workspace}/pipeline_validations",
3938
method="POST",
4039
data=data,
4140
)
42-
41+
4342
# If successful (status 200), the YAML is valid
4443
if resp.success:
4544
return PipelineValidationResult(valid=True)
46-
45+
4746
# If 400 error, we have validation errors to process
48-
if resp.status_code == 400 and resp.json is not None and "errors" in resp.json:
49-
errors = [ValidationError(code=error["code"], message=error["message"])
50-
for error in resp.json["errors"]]
47+
if resp.status_code == 400 and resp.json is not None and "details" in resp.json:
48+
errors = [ValidationError(code=error["code"], message=error["message"]) for error in resp.json["details"]]
49+
50+
return PipelineValidationResult(valid=False, errors=errors)
51+
52+
if resp.status_code == 422:
53+
errors = [ValidationError(code="YAML_ERROR", message="Syntax error in YAML")]
54+
5155
return PipelineValidationResult(valid=False, errors=errors)
52-
53-
# For other errors, return as an exception
54-
# (like 422 for invalid YAML syntax)
55-
if not resp.success:
56-
if resp.json and "detail" in resp.json:
57-
raise ValueError(f"Pipeline validation failed: {resp.json['detail']}")
58-
else:
59-
raise ValueError(f"Pipeline validation failed with status {resp.status_code}")
56+
57+
raise UnexpectedAPIError(status_code=resp.status_code, message=resp.text, detail=resp.json)
6058

6159
async def list(
6260
self,
@@ -81,6 +79,8 @@ async def list(
8179
method="GET",
8280
)
8381

82+
raise_for_status(resp)
83+
8484
response = resp.json
8585

8686
if response is not None:
@@ -93,13 +93,17 @@ async def list(
9393
async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPipeline:
9494
"""Fetch a single pipeline by its name."""
9595
resp = await self._client.request(endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}")
96+
raise_for_status(resp)
97+
9698
pipeline = DeepsetPipeline.model_validate(resp.json)
9799

98100
if include_yaml:
99101
yaml_response = await self._client.request(
100102
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/yaml"
101103
)
102104

105+
raise_for_status(yaml_response)
106+
103107
if yaml_response.json is not None:
104108
pipeline.yaml_config = yaml_response.json["query_yaml"]
105109

@@ -108,12 +112,14 @@ async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPip
108112
async def create(self, name: str, yaml_config: str) -> None:
109113
"""Create a new pipeline with a name and YAML config."""
110114
data = {"name": name, "query_yaml": yaml_config}
111-
await self._client.request(
115+
resp = await self._client.request(
112116
endpoint=f"v1/workspaces/{self._workspace}/pipelines",
113117
method="POST",
114118
data=data,
115119
)
116120

121+
raise_for_status(resp)
122+
117123
async def update(
118124
self,
119125
pipeline_name: str,
@@ -123,23 +129,21 @@ async def update(
123129
"""Update name and/or YAML config of an existing pipeline."""
124130
# Handle name update first if any
125131
if updated_pipeline_name is not None:
126-
await self._client.request(
132+
name_resp = await self._client.request(
127133
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}",
128134
method="PATCH",
129135
data={"name": updated_pipeline_name},
130136
)
131137

138+
raise_for_status(name_resp)
139+
132140
pipeline_name = updated_pipeline_name
133141

134142
if yaml_config is not None:
135-
await self._client.request(
143+
yaml_resp = await self._client.request(
136144
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/yaml",
137145
method="PUT",
138146
data={"query_yaml": yaml_config},
139147
)
140148

141-
142-
# async with AsyncDeepsetClient() as client:
143-
# await client.pipelines("default").list()
144-
# await client.pipelines("default").get("hello")
145-
# await client.pipelines("default").update(yaml_config="blabla")
149+
raise_for_status(yaml_resp)

src/deepset_mcp/api/protocols.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from types import TracebackType
2+
from typing import Any, Protocol, Self
3+
4+
from deepset_mcp.api.transport import TransportResponse
5+
6+
7+
class AsyncClientProtocol(Protocol):
8+
"""Protocol defining the implementation for AsyncClient."""
9+
10+
async def request(
11+
self,
12+
endpoint: str,
13+
method: str = "GET",
14+
data: dict[str, Any] | None = None,
15+
headers: dict[str, str] | None = None,
16+
) -> TransportResponse:
17+
"""Make a request to the API."""
18+
...
19+
20+
async def close(self) -> None:
21+
"""Close underlying transport resources."""
22+
...
23+
24+
async def __aenter__(self) -> Self:
25+
"""Enter the AsyncContextManager."""
26+
...
27+
28+
async def __aexit__(
29+
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
30+
) -> bool:
31+
"""Exit the AsyncContextmanager and clean up resources."""
32+
...

src/deepset_mcp/api/transport.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import httpx
66

7+
from deepset_mcp.api.exceptions import BadRequestError, ResourceNotFoundError, UnexpectedAPIError
8+
79

810
@dataclass
911
class TransportResponse:
@@ -12,13 +14,41 @@ class TransportResponse:
1214
text: str
1315
status_code: int
1416
json: dict[str, Any] | None = None
15-
17+
1618
@property
1719
def success(self) -> bool:
1820
"""Returns True if the response status code indicates success (< 400)."""
1921
return self.status_code < 400
2022

2123

24+
def raise_for_status(response: TransportResponse) -> None:
25+
"""Raises the appropriate exception based on the response status code."""
26+
if response.success:
27+
return
28+
29+
# Map status codes to exception classes
30+
exception_map = {
31+
400: BadRequestError,
32+
404: ResourceNotFoundError,
33+
}
34+
35+
# Extract error details from response if available
36+
detail = response.json.get("details") if response.json else None
37+
message = response.json.get("message") if response.json else response.text
38+
39+
# Get exception class
40+
exception_class = exception_map.get(response.status_code)
41+
42+
if exception_class:
43+
# For specific exceptions (BadRequestError, ResourceNotFoundError)
44+
raise exception_class(message=message, detail=detail)
45+
else:
46+
# For the catch-all case, include the status code
47+
raise UnexpectedAPIError(
48+
status_code=response.status_code, message=message or "Unexpected API error", detail=detail
49+
)
50+
51+
2252
class TransportProtocol(Protocol):
2353
"""Protocol for HTTP transport."""
2454

0 commit comments

Comments
 (0)