Skip to content

Commit 52e6f62

Browse files
committed
feat: add pipeline resource
1 parent c7a1232 commit 52e6f62

14 files changed

Lines changed: 1053 additions & 34 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
run: uv sync --locked --all-extras --dev
2626

2727
- name: Run tests
28-
run: uv run pytest
28+
run: uv run --dev pytest -m "not integration"
2929

3030
lint:
3131
name: Run Linting

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,9 @@ wheels/
88

99
# Virtual environments
1010
.venv
11+
12+
# env files
13+
.env
14+
.env.prod
15+
.env.dev
16+
.env.test

pyproject.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ dependencies = [
1010
"requests>=2.32.3",
1111
"uvicorn>=0.34.2",
1212
"httpx",
13+
"pydantic>=2.0.0",
1314
]
1415

1516
[project.scripts]
@@ -26,6 +27,7 @@ packages = ["src/deepset_mcp"]
2627
dev = [
2728
"pytest",
2829
"pytest-asyncio",
30+
"python-dotenv"
2931
]
3032
lint = [
3133
"ruff",
@@ -38,6 +40,12 @@ types = [
3840

3941
[tool.pytest.ini_options]
4042
testpaths = ["test"]
43+
asyncio_mode = "auto"
44+
asyncio_default_fixture_loop_scope = "function"
45+
46+
markers = [
47+
"integration: marks tests that interact with external resources (e.g. deepset API)."
48+
]
4149

4250
[tool.ruff]
4351
line-length = 120

src/deepset_mcp/api/__init__.py

Whitespace-only changes.
Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,36 @@
11
import os
22
from types import TracebackType
3-
from typing import Any
3+
from typing import Any, Protocol, Self
44

5-
from deepset_mcp.transport import AsyncTransport, TransportProtocol
5+
from deepset_mcp.api.transport import AsyncTransport, TransportProtocol, TransportResponse
6+
7+
8+
class AsyncClientProtocol(Protocol):
9+
"""Protocol defining the implementation for AsyncClient."""
10+
11+
async def request(
12+
self,
13+
endpoint: str,
14+
method: str = "GET",
15+
data: dict[str, Any] | None = None,
16+
headers: dict[str, str] | None = None,
17+
) -> TransportResponse:
18+
"""Make a request to the API."""
19+
...
20+
21+
async def close(self) -> None:
22+
"""Close underlying transport resources."""
23+
...
24+
25+
async def __aenter__(self) -> Self:
26+
"""Enter the AsyncContextManager."""
27+
...
28+
29+
async def __aexit__(
30+
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
31+
) -> bool:
32+
"""Exit the AsyncContextmanager and clean up resources."""
33+
...
634

735

836
class AsyncDeepsetClient:
@@ -48,7 +76,7 @@ async def request(
4876
method: str = "GET",
4977
data: dict[str, Any] | None = None,
5078
headers: dict[str, str] | None = None,
51-
) -> Any:
79+
) -> TransportResponse:
5280
"""Make a request to the deepset API."""
5381
if not endpoint.startswith("/"):
5482
endpoint = f"/{endpoint}"
@@ -77,7 +105,7 @@ async def close(self) -> None:
77105
"""Close underlying transport resources."""
78106
await self._transport.close()
79107

80-
async def __aenter__(self) -> "AsyncDeepsetClient":
108+
async def __aenter__(self) -> Self:
81109
"""Enter the AsyncContextManager."""
82110
return self
83111

src/deepset_mcp/api/pipeline/__init__.py

Whitespace-only changes.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from datetime import datetime
2+
from enum import StrEnum
3+
4+
from pydantic import BaseModel, Field
5+
6+
7+
class PipelineServiceLevel(StrEnum):
8+
"""Describes the service level of a pipeline."""
9+
10+
PRODUCTION = "PRODUCTION"
11+
DEVELOPMENT = "DEVELOPMENT"
12+
DRAFT = "DRAFT"
13+
14+
15+
class DeepsetUser(BaseModel):
16+
"""Model representing a user on the deepset platform."""
17+
18+
id: str = Field(alias="user_id")
19+
given_name: str | None = None
20+
family_name: str | None = None
21+
email: str | None = None
22+
23+
24+
class DeepsetPipeline(BaseModel):
25+
"""Model representing a pipeline on the deepset platform."""
26+
27+
id: str = Field(alias="pipeline_id")
28+
name: str
29+
status: str
30+
service_level: PipelineServiceLevel
31+
32+
created_at: datetime
33+
last_updated_at: datetime | None = Field(None, alias="last_edited_at") # Map API's last_edited_at
34+
35+
created_by: DeepsetUser
36+
last_updated_by: DeepsetUser | None = Field(None, alias="last_edited_by") # Map API's last_edited_by
37+
38+
yaml_config: str | None = None
39+
40+
class Config:
41+
"""Configuration for serialization and deserialization."""
42+
43+
populate_by_name = True # Allow both alias and model field names
44+
json_encoders = {
45+
# When serializing back to JSON, convert datetimes to ISO format
46+
datetime: lambda dt: dt.isoformat()
47+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from typing import Any
2+
3+
from deepset_mcp.api.client import AsyncClientProtocol
4+
from deepset_mcp.api.pipeline.models import DeepsetPipeline
5+
6+
7+
class PipelineResource:
8+
"""Manages interactions with the deepset pipeline API."""
9+
10+
def __init__(
11+
self,
12+
client: AsyncClientProtocol,
13+
workspace: str,
14+
) -> None:
15+
"""Initializes a PipelineResource instance."""
16+
self._client = client
17+
self._workspace = workspace
18+
19+
async def list(
20+
self,
21+
page_number: int = 1,
22+
limit: int = 10,
23+
) -> list[DeepsetPipeline]:
24+
"""
25+
Retrieve pipeline in the configured workspace with optional pagination.
26+
27+
:param page_number: Page number for paging.
28+
:param limit: Max number of items to return.
29+
:return: PipelineListResponse containing `data`, `has_more`, and `total`.
30+
"""
31+
params: dict[str, Any] = {
32+
"page_number": page_number,
33+
"limit": limit,
34+
}
35+
36+
query = "?" + "&".join(f"{k}={v}" for k, v in params.items()) if params else ""
37+
resp = await self._client.request(
38+
endpoint=f"v1/workspaces/{self._workspace}/pipelines{query}",
39+
method="GET",
40+
)
41+
42+
response = resp.json
43+
44+
if response is not None:
45+
pipelines = [DeepsetPipeline.model_validate(item) for item in response.get("data", [])]
46+
else:
47+
pipelines = []
48+
49+
return pipelines
50+
51+
async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPipeline:
52+
"""Fetch a single pipeline by its name."""
53+
resp = await self._client.request(endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}")
54+
pipeline = DeepsetPipeline.model_validate(resp.json)
55+
56+
if include_yaml:
57+
yaml_response = await self._client.request(
58+
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/yaml"
59+
)
60+
61+
if yaml_response.json is not None:
62+
pipeline.yaml_config = yaml_response.json["query_yaml"]
63+
64+
return pipeline
65+
66+
async def create(self, name: str, yaml_config: str) -> None:
67+
"""Create a new pipeline with a name and YAML config."""
68+
data = {"name": name, "query_yaml": yaml_config}
69+
await self._client.request(
70+
endpoint=f"v1/workspaces/{self._workspace}/pipelines",
71+
method="POST",
72+
data=data,
73+
)
74+
75+
async def update(
76+
self,
77+
pipeline_name: str,
78+
updated_pipeline_name: str | None = None,
79+
yaml_config: str | None = None,
80+
) -> None:
81+
"""Update name and/or YAML config of an existing pipeline."""
82+
# Handle name update first if any
83+
if updated_pipeline_name is not None:
84+
await self._client.request(
85+
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}",
86+
method="PATCH",
87+
data={"name": updated_pipeline_name},
88+
)
89+
90+
pipeline_name = updated_pipeline_name
91+
92+
if yaml_config is not None:
93+
await self._client.request(
94+
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/yaml",
95+
method="PUT",
96+
data={"query_yaml": yaml_config},
97+
)
98+
99+
100+
# async with AsyncDeepsetClient() as client:
101+
# await client.pipelines("default").list()
102+
# await client.pipelines("default").get("hello")
103+
# await client.pipelines("default").update(yaml_config="blabla")
Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1+
import json
2+
from dataclasses import dataclass
13
from typing import Any, Protocol
24

35
import httpx
46

57

8+
@dataclass
9+
class TransportResponse:
10+
"""Response envelope for HTTP transport."""
11+
12+
text: str
13+
status_code: int
14+
json: dict[str, Any] | None = None
15+
16+
617
class TransportProtocol(Protocol):
718
"""Protocol for HTTP transport."""
819

9-
async def request(self, method: str, url: str, **kwargs: Any) -> Any:
20+
async def request(self, method: str, url: str, **kwargs: Any) -> TransportResponse:
1021
"""Send an HTTP request and return the parsed JSON response."""
1122
...
1223

@@ -15,7 +26,7 @@ async def close(self) -> None:
1526
...
1627

1728

18-
class AsyncTransport(TransportProtocol):
29+
class AsyncTransport:
1930
"""Asynchronous HTTP transport using httpx.AsyncClient."""
2031

2132
def __init__(
@@ -48,12 +59,20 @@ def __init__(
4859
}
4960
self._client = httpx.AsyncClient(**client_kwargs)
5061

51-
async def request(self, method: str, url: str, **kwargs: Any) -> Any:
52-
"""Send an HTTP request and return the parsed JSON response."""
53-
resp = await self._client.request(method, url, **kwargs)
54-
resp.raise_for_status()
62+
async def request(self, method: str, url: str, **kwargs: Any) -> TransportResponse:
63+
"""Send an HTTP request and return the response."""
64+
response = await self._client.request(method, url, **kwargs)
65+
66+
response.raise_for_status()
67+
68+
transport_response = TransportResponse(text=response.text, status_code=response.status_code)
69+
70+
try:
71+
transport_response.json = response.json()
72+
except json.decoder.JSONDecodeError:
73+
pass
5574

56-
return resp.json()
75+
return transport_response
5776

5877
async def close(self) -> None:
5978
"""Clean up any resources (e.g., close connections)."""

0 commit comments

Comments
 (0)