Skip to content

Commit 4a2ace6

Browse files
authored
Merge pull request #16 from deepset-ai/fix-issue-15
feat: add pipeline config validation
2 parents 45d2058 + 56ba896 commit 4a2ace6

10 files changed

Lines changed: 410 additions & 46 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ packages = ["src/deepset_mcp"]
2727
dev = [
2828
"pytest",
2929
"pytest-asyncio",
30-
"python-dotenv"
30+
"python-dotenv",
31+
"pyyaml",
3132
]
3233
lint = [
3334
"ruff",

src/deepset_mcp/api/client.py

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,11 @@
11
import os
22
from types import TracebackType
3-
from typing import Any, Protocol, Self
3+
from typing import Any, Self
44

5+
from deepset_mcp.api.pipeline.resource import PipelineResource
56
from deepset_mcp.api.transport import AsyncTransport, TransportProtocol, TransportResponse
67

78

8-
class AsyncClientProtocol(Protocol):
9-
"""Protocol defining the implementation for AsyncClient."""
10-
11-
async def request(
12-
self,
13-
endpoint: str,
14-
method: str = "GET",
15-
data: dict[str, Any] | None = None,
16-
headers: dict[str, str] | None = None,
17-
) -> TransportResponse:
18-
"""Make a request to the API."""
19-
...
20-
21-
async def close(self) -> None:
22-
"""Close underlying transport resources."""
23-
...
24-
25-
async def __aenter__(self) -> Self:
26-
"""Enter the AsyncContextManager."""
27-
...
28-
29-
async def __aexit__(
30-
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
31-
) -> bool:
32-
"""Exit the AsyncContextmanager and clean up resources."""
33-
...
34-
35-
369
class AsyncDeepsetClient:
3710
"""Async Client for interacting with the deepset API."""
3811

@@ -115,3 +88,7 @@ async def __aexit__(
11588
"""Exit the AsyncContextmanager and clean up resources."""
11689
await self.close()
11790
return False
91+
92+
def pipelines(self, workspace: str) -> PipelineResource:
93+
"""Resource to interact with pipelines in the specified workspace."""
94+
return PipelineResource(client=self, workspace=workspace)

src/deepset_mcp/api/exceptions.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from typing import Any
2+
3+
4+
class DeepsetAPIError(Exception):
5+
"""Base exception for all deepset API errors."""
6+
7+
def __init__(self, status_code: int | None = None, message: Any | None = None, detail: Any | None = None) -> None:
8+
"""Initialize the exception."""
9+
self.status_code = status_code
10+
self.message = message
11+
self.detail = detail
12+
super().__init__(self.message)
13+
14+
def __str__(self) -> str:
15+
"""Return a string representation of the exception."""
16+
return f"{self.message} (Status Code: {self.status_code})"
17+
18+
19+
class ResourceNotFoundError(DeepsetAPIError):
20+
"""Exception raised when a resource is not found (HTTP 404)."""
21+
22+
def __init__(self, message: Any = "Resource not found", detail: Any | None = None) -> None:
23+
"""Initialize the exception."""
24+
super().__init__(status_code=404, message=message, detail=detail)
25+
26+
27+
class BadRequestError(DeepsetAPIError):
28+
"""Exception raised for invalid requests (HTTP 400)."""
29+
30+
def __init__(self, message: Any = "Bad request", detail: Any | None = None) -> None:
31+
"""Initialize the exception."""
32+
super().__init__(status_code=400, message=message, detail=detail)
33+
34+
35+
class UnexpectedAPIError(DeepsetAPIError):
36+
"""Catch-all exception for unexpected API errors."""
37+
38+
def __init__(
39+
self, status_code: int | None = None, message: Any = "Unexpected API error", detail: Any | None = None
40+
):
41+
"""Initialize the exception."""
42+
super().__init__(status_code=status_code, message=message, detail=detail)

src/deepset_mcp/api/pipeline/models.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,17 @@ class Config:
4545
# When serializing back to JSON, convert datetimes to ISO format
4646
datetime: lambda dt: dt.isoformat()
4747
}
48+
49+
50+
class ValidationError(BaseModel):
51+
"""Model representing a validation error from the pipeline validation API."""
52+
53+
code: str
54+
message: str
55+
56+
57+
class PipelineValidationResult(BaseModel):
58+
"""Result of validating a pipeline configuration."""
59+
60+
valid: bool
61+
errors: list[ValidationError] = []

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from typing import Any
22

3-
from deepset_mcp.api.client import AsyncClientProtocol
4-
from deepset_mcp.api.pipeline.models import DeepsetPipeline
3+
from deepset_mcp.api.exceptions import UnexpectedAPIError
4+
from deepset_mcp.api.pipeline.models import DeepsetPipeline, PipelineValidationResult, ValidationError
5+
from deepset_mcp.api.protocols import AsyncClientProtocol
6+
from deepset_mcp.api.transport import raise_for_status
57

68

79
class PipelineResource:
@@ -16,6 +18,44 @@ def __init__(
1618
self._client = client
1719
self._workspace = workspace
1820

21+
async def validate(self, yaml_config: str) -> PipelineValidationResult:
22+
"""
23+
Validate a pipeline's YAML configuration against the API.
24+
25+
Args:
26+
yaml_config: The YAML configuration string to validate
27+
28+
Returns:
29+
PipelineValidationResult containing validation status and any errors
30+
31+
Raises:
32+
ValueError: If the YAML is not valid (422 error) or contains syntax errors
33+
"""
34+
data = {"query_yaml": yaml_config}
35+
36+
resp = await self._client.request(
37+
endpoint=f"v1/workspaces/{self._workspace}/pipeline_validations",
38+
method="POST",
39+
data=data,
40+
)
41+
42+
# If successful (status 200), the YAML is valid
43+
if resp.success:
44+
return PipelineValidationResult(valid=True)
45+
46+
# If 400 error, we have validation errors to process
47+
if resp.status_code == 400 and resp.json is not None and "details" in resp.json:
48+
errors = [ValidationError(code=error["code"], message=error["message"]) for error in resp.json["details"]]
49+
50+
return PipelineValidationResult(valid=False, errors=errors)
51+
52+
if resp.status_code == 422:
53+
errors = [ValidationError(code="YAML_ERROR", message="Syntax error in YAML")]
54+
55+
return PipelineValidationResult(valid=False, errors=errors)
56+
57+
raise UnexpectedAPIError(status_code=resp.status_code, message=resp.text, detail=resp.json)
58+
1959
async def list(
2060
self,
2161
page_number: int = 1,
@@ -39,6 +79,8 @@ async def list(
3979
method="GET",
4080
)
4181

82+
raise_for_status(resp)
83+
4284
response = resp.json
4385

4486
if response is not None:
@@ -51,13 +93,17 @@ async def list(
5193
async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPipeline:
5294
"""Fetch a single pipeline by its name."""
5395
resp = await self._client.request(endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}")
96+
raise_for_status(resp)
97+
5498
pipeline = DeepsetPipeline.model_validate(resp.json)
5599

56100
if include_yaml:
57101
yaml_response = await self._client.request(
58102
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/yaml"
59103
)
60104

105+
raise_for_status(yaml_response)
106+
61107
if yaml_response.json is not None:
62108
pipeline.yaml_config = yaml_response.json["query_yaml"]
63109

@@ -66,12 +112,14 @@ async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPip
66112
async def create(self, name: str, yaml_config: str) -> None:
67113
"""Create a new pipeline with a name and YAML config."""
68114
data = {"name": name, "query_yaml": yaml_config}
69-
await self._client.request(
115+
resp = await self._client.request(
70116
endpoint=f"v1/workspaces/{self._workspace}/pipelines",
71117
method="POST",
72118
data=data,
73119
)
74120

121+
raise_for_status(resp)
122+
75123
async def update(
76124
self,
77125
pipeline_name: str,
@@ -81,23 +129,21 @@ async def update(
81129
"""Update name and/or YAML config of an existing pipeline."""
82130
# Handle name update first if any
83131
if updated_pipeline_name is not None:
84-
await self._client.request(
132+
name_resp = await self._client.request(
85133
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}",
86134
method="PATCH",
87135
data={"name": updated_pipeline_name},
88136
)
89137

138+
raise_for_status(name_resp)
139+
90140
pipeline_name = updated_pipeline_name
91141

92142
if yaml_config is not None:
93-
await self._client.request(
143+
yaml_resp = await self._client.request(
94144
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/yaml",
95145
method="PUT",
96146
data={"query_yaml": yaml_config},
97147
)
98148

99-
100-
# async with AsyncDeepsetClient() as client:
101-
# await client.pipelines("default").list()
102-
# await client.pipelines("default").get("hello")
103-
# await client.pipelines("default").update(yaml_config="blabla")
149+
raise_for_status(yaml_resp)

src/deepset_mcp/api/protocols.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from types import TracebackType
2+
from typing import Any, Protocol, Self
3+
4+
from deepset_mcp.api.transport import TransportResponse
5+
6+
7+
class AsyncClientProtocol(Protocol):
8+
"""Protocol defining the implementation for AsyncClient."""
9+
10+
async def request(
11+
self,
12+
endpoint: str,
13+
method: str = "GET",
14+
data: dict[str, Any] | None = None,
15+
headers: dict[str, str] | None = None,
16+
) -> TransportResponse:
17+
"""Make a request to the API."""
18+
...
19+
20+
async def close(self) -> None:
21+
"""Close underlying transport resources."""
22+
...
23+
24+
async def __aenter__(self) -> Self:
25+
"""Enter the AsyncContextManager."""
26+
...
27+
28+
async def __aexit__(
29+
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
30+
) -> bool:
31+
"""Exit the AsyncContextmanager and clean up resources."""
32+
...

src/deepset_mcp/api/transport.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import httpx
66

7+
from deepset_mcp.api.exceptions import BadRequestError, ResourceNotFoundError, UnexpectedAPIError
8+
79

810
@dataclass
911
class TransportResponse:
@@ -13,6 +15,39 @@ class TransportResponse:
1315
status_code: int
1416
json: dict[str, Any] | None = None
1517

18+
@property
19+
def success(self) -> bool:
20+
"""Returns True if the response status code indicates success (< 400)."""
21+
return self.status_code < 400
22+
23+
24+
def raise_for_status(response: TransportResponse) -> None:
25+
"""Raises the appropriate exception based on the response status code."""
26+
if response.success:
27+
return
28+
29+
# Map status codes to exception classes
30+
exception_map = {
31+
400: BadRequestError,
32+
404: ResourceNotFoundError,
33+
}
34+
35+
# Extract error details from response if available
36+
detail = response.json.get("details") if response.json else None
37+
message = response.json.get("message") if response.json else response.text
38+
39+
# Get exception class
40+
exception_class = exception_map.get(response.status_code)
41+
42+
if exception_class:
43+
# For specific exceptions (BadRequestError, ResourceNotFoundError)
44+
raise exception_class(message=message, detail=detail)
45+
else:
46+
# For the catch-all case, include the status code
47+
raise UnexpectedAPIError(
48+
status_code=response.status_code, message=message or "Unexpected API error", detail=detail
49+
)
50+
1651

1752
class TransportProtocol(Protocol):
1853
"""Protocol for HTTP transport."""
@@ -63,8 +98,6 @@ async def request(self, method: str, url: str, **kwargs: Any) -> TransportRespon
6398
"""Send an HTTP request and return the response."""
6499
response = await self._client.request(method, url, **kwargs)
65100

66-
response.raise_for_status()
67-
68101
transport_response = TransportResponse(text=response.text, status_code=response.status_code)
69102

70103
try:

0 commit comments

Comments
 (0)