Skip to content

Commit 0f7711a

Browse files
authored
feat: introduce PipelineHandle for future pipeline operations (#54)
* feat: add PipelineHandle class for future pipeline operations * fix: remove existing __init__.py to replace with proper imports * feat: add PipelineHandle to package exports * feat: add PipelineHandle import for protocols * feat: update PipelineResourceProtocol to return PipelineHandle * feat: import PipelineHandle in resource module * feat: update list method to return PipelineHandle instances * feat: update get method to return PipelineHandle instance * fix: avoid circular import by using TYPE_CHECKING for PipelineHandle import * fix: move PipelineHandle import inside list method to avoid circular import * fix: move PipelineHandle import inside get method to avoid circular import * feat: add PipelineHandle import to test module * fix: update test to check for PipelineHandle instead of DeepsetPipeline * fix: update get test to check for PipelineHandle instead of DeepsetPipeline * fix: update get without yaml test to check for PipelineHandle * fix: update special characters test to check for PipelineHandle * feat: add PipelineHandle import to integration tests * fix: update integration test to expect PipelineHandle for create test * fix: update integration test to expect PipelineHandle for list test * fix: update integration test pagination to expect PipelineHandle * fix: update integration get test to expect PipelineHandle * fix: update integration update test to expect PipelineHandle * fix: update list_pipelines tool to work with PipelineHandle * fix: update get_pipeline tool to work with PipelineHandle * fix: update update_pipeline tool to work with PipelineHandle * feat: add PipelineHandle import to tools tests * fix: update FakePipelineResource to return PipelineHandle instances * fix: no changes needed for get_pipeline test - it should work with new handle * feat: add comprehensive tests for PipelineHandle class * fix: tests
1 parent 2d892f9 commit 0f7711a

9 files changed

Lines changed: 234 additions & 45 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from .handle import PipelineHandle
2+
from .models import DeepsetPipeline, NoContentResponse, PipelineValidationResult, ValidationError
3+
from .resource import PipelineResource
4+
5+
__all__ = [
6+
"PipelineHandle",
7+
"DeepsetPipeline",
8+
"NoContentResponse",
9+
"PipelineValidationResult",
10+
"ValidationError",
11+
"PipelineResource",
12+
]
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from typing import TYPE_CHECKING
2+
3+
from deepset_mcp.api.pipeline.models import DeepsetPipeline
4+
5+
if TYPE_CHECKING:
6+
from deepset_mcp.api.protocols import PipelineResourceProtocol
7+
8+
9+
class PipelineHandle:
10+
"""Handle for performing operations on a specific pipeline.
11+
12+
This class provides a convenient interface for interacting with a pipeline,
13+
combining the pipeline data model with resource operations. It allows both
14+
direct access to pipeline attributes and future operations like getting logs
15+
or deployment management.
16+
"""
17+
18+
def __init__(self, pipeline: DeepsetPipeline, resource: "PipelineResourceProtocol") -> None:
19+
"""Initialize a PipelineHandle.
20+
21+
:param pipeline: The pipeline data model containing all pipeline information.
22+
:param resource: The resource interface for performing operations on this pipeline.
23+
"""
24+
self._pipeline = pipeline
25+
self._resource = resource
26+
27+
def __getattr__(self, name: str) -> object:
28+
"""Proxy attribute access to the underlying pipeline."""
29+
return getattr(self._pipeline, name)
30+
31+
if TYPE_CHECKING:
32+
# Help type checkers understand available attributes from the underlying pipeline
33+
id: str
34+
name: str
35+
status: str
36+
service_level: str
37+
created_at: object
38+
last_updated_at: object | None
39+
created_by: object
40+
last_updated_by: object | None
41+
yaml_config: str | None
42+
43+
@property
44+
def pipeline(self) -> DeepsetPipeline:
45+
"""Access the full pipeline data model.
46+
47+
:returns: The underlying DeepsetPipeline instance with all pipeline data.
48+
"""
49+
return self._pipeline

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any
1+
from typing import TYPE_CHECKING, Any
22

33
from deepset_mcp.api.exceptions import UnexpectedAPIError
44
from deepset_mcp.api.pipeline.models import (
@@ -7,16 +7,19 @@
77
PipelineValidationResult,
88
ValidationError,
99
)
10-
from deepset_mcp.api.protocols import AsyncClientProtocol, PipelineResourceProtocol
1110
from deepset_mcp.api.transport import raise_for_status
1211

12+
if TYPE_CHECKING:
13+
from deepset_mcp.api.pipeline.handle import PipelineHandle
14+
from deepset_mcp.api.protocols import AsyncClientProtocol
1315

14-
class PipelineResource(PipelineResourceProtocol):
16+
17+
class PipelineResource:
1518
"""Manages interactions with the deepset pipeline API."""
1619

1720
def __init__(
1821
self,
19-
client: AsyncClientProtocol,
22+
client: "AsyncClientProtocol",
2023
workspace: str,
2124
) -> None:
2225
"""Initializes a PipelineResource instance."""
@@ -64,14 +67,16 @@ async def list(
6467
self,
6568
page_number: int = 1,
6669
limit: int = 10,
67-
) -> list[DeepsetPipeline]:
70+
) -> list["PipelineHandle"]:
6871
"""
6972
Retrieve pipeline in the configured workspace with optional pagination.
7073
7174
:param page_number: Page number for paging.
7275
:param limit: Max number of items to return.
73-
:return: PipelineListResponse containing `data`, `has_more`, and `total`.
76+
:return: List of PipelineHandle instances.
7477
"""
78+
from deepset_mcp.api.pipeline.handle import PipelineHandle
79+
7580
params: dict[str, Any] = {
7681
"page_number": page_number,
7782
"limit": limit,
@@ -92,10 +97,12 @@ async def list(
9297
else:
9398
pipelines = []
9499

95-
return pipelines
100+
return [PipelineHandle(pipeline=pipeline, resource=self) for pipeline in pipelines]
96101

97-
async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPipeline:
102+
async def get(self, pipeline_name: str, include_yaml: bool = True) -> "PipelineHandle":
98103
"""Fetch a single pipeline by its name."""
104+
from deepset_mcp.api.pipeline.handle import PipelineHandle
105+
99106
resp = await self._client.request(endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}")
100107
raise_for_status(resp)
101108

@@ -111,7 +118,7 @@ async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPip
111118
if yaml_response.json is not None:
112119
pipeline.yaml_config = yaml_response.json["query_yaml"]
113120

114-
return pipeline
121+
return PipelineHandle(pipeline=pipeline, resource=self)
115122

116123
async def create(self, name: str, yaml_config: str) -> NoContentResponse:
117124
"""Create a new pipeline with a name and YAML config."""

src/deepset_mcp/api/protocols.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
from types import TracebackType
2-
from typing import Any, Protocol, Self, TypeVar, overload
2+
from typing import TYPE_CHECKING, Any, Protocol, Self, TypeVar, overload
33

44
from deepset_mcp.api.indexes.models import Index, IndexList
5-
from deepset_mcp.api.pipeline.models import DeepsetPipeline, NoContentResponse, PipelineValidationResult
5+
from deepset_mcp.api.pipeline.models import NoContentResponse, PipelineValidationResult
66
from deepset_mcp.api.pipeline_template.models import PipelineTemplate
77
from deepset_mcp.api.transport import TransportResponse
88

9+
if TYPE_CHECKING:
10+
from deepset_mcp.api.pipeline.handle import PipelineHandle
11+
912

1013
class HaystackServiceProtocol(Protocol):
1114
"""Protocol defining the implementation for HaystackService."""
@@ -148,15 +151,15 @@ async def validate(self, yaml_config: str) -> PipelineValidationResult:
148151
"""Validate a pipeline's YAML configuration against the API."""
149152
...
150153

151-
async def get(self, pipeline_name: str) -> DeepsetPipeline:
154+
async def get(self, pipeline_name: str, include_yaml: bool = True) -> "PipelineHandle":
152155
"""Fetch a single pipeline by its name."""
153156
...
154157

155158
async def list(
156159
self,
157160
page_number: int = 1,
158161
limit: int = 10,
159-
) -> list[DeepsetPipeline]:
162+
) -> list["PipelineHandle"]:
160163
"""List pipelines in the configured workspace with optional pagination."""
161164
...
162165

src/deepset_mcp/tools/pipeline.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
async def list_pipelines(client: AsyncClientProtocol, workspace: str) -> str:
99
"""Retrieves a list of all pipeline available within the currently configured deepset workspace."""
1010
response = await client.pipelines(workspace=workspace).list()
11-
formatted_pipelines = [pipeline_to_llm_readable_string(p) for p in response]
11+
formatted_pipelines = [pipeline_to_llm_readable_string(h.pipeline) for h in response]
1212

1313
return "\n\n".join(formatted_pipelines)
1414

1515

1616
async def get_pipeline(client: AsyncClientProtocol, workspace: str, pipeline_name: str) -> str:
1717
"""Fetches detailed configuration information for a specific pipeline, identified by its unique `pipeline_name`."""
1818
response = await client.pipelines(workspace=workspace).get(pipeline_name)
19-
return pipeline_to_llm_readable_string(response)
19+
return pipeline_to_llm_readable_string(response.pipeline)
2020

2121

2222
async def validate_pipeline(client: AsyncClientProtocol, workspace: str, yaml_configuration: str) -> str:
@@ -70,14 +70,14 @@ async def update_pipeline(
7070
replacement snippet is used to update the pipeline's configuration in the target workspace.
7171
"""
7272
try:
73-
original_pipeline = await client.pipelines(workspace=workspace).get(pipeline_name=pipeline_name)
73+
original_handle = await client.pipelines(workspace=workspace).get(pipeline_name=pipeline_name)
7474
except ResourceNotFoundError:
7575
return f"There is no pipeline named '{pipeline_name}'. Did you mean to create it?"
7676

77-
if original_pipeline.yaml_config is None:
77+
if original_handle.yaml_config is None:
7878
raise ValueError("The pipeline does not have a YAML configuration.")
7979

80-
occurrences = original_pipeline.yaml_config.count(original_config_snippet)
80+
occurrences = original_handle.yaml_config.count(original_config_snippet)
8181

8282
if occurrences == 0:
8383
return f"No occurrences of the provided configuration snippet were found in the pipeline '{pipeline_name}'."
@@ -88,7 +88,7 @@ async def update_pipeline(
8888
f"'{pipeline_name}'. Specify a more precise snippet to proceed with the update."
8989
)
9090

91-
updated_yaml_configuration = original_pipeline.yaml_config.replace(
91+
updated_yaml_configuration = original_handle.yaml_config.replace(
9292
original_config_snippet, replacement_config_snippet, 1
9393
)
9494

test/integration/test_integration_pipeline_resource.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from deepset_mcp.api.client import AsyncDeepsetClient
44
from deepset_mcp.api.exceptions import ResourceNotFoundError
5+
from deepset_mcp.api.pipeline.handle import PipelineHandle
56
from deepset_mcp.api.pipeline.models import DeepsetPipeline
67
from deepset_mcp.api.pipeline.resource import PipelineResource
78

@@ -71,10 +72,11 @@ async def test_create_pipeline(
7172
await pipeline_resource.create(name=pipeline_name, yaml_config=sample_yaml_config)
7273

7374
# Verify the pipeline was created by retrieving it
74-
pipeline: DeepsetPipeline = await pipeline_resource.get(pipeline_name=pipeline_name)
75+
handle: PipelineHandle = await pipeline_resource.get(pipeline_name=pipeline_name)
7576

76-
assert pipeline.name == pipeline_name
77-
assert pipeline.yaml_config == sample_yaml_config
77+
assert handle.name == pipeline_name
78+
assert handle.yaml_config == sample_yaml_config
79+
assert isinstance(handle.pipeline, DeepsetPipeline)
7880

7981

8082
@pytest.mark.asyncio
@@ -91,23 +93,30 @@ async def test_list_pipelines(
9193
await pipeline_resource.create(name=pipeline_name, yaml_config=sample_yaml_config)
9294

9395
# Test listing without pagination
94-
pipelines = await pipeline_resource.list(limit=10)
95-
assert len(pipelines) == 3
96+
handles = await pipeline_resource.list(limit=10)
97+
assert len(handles) == 3
9698

9799
# Verify our created pipelines are in the list
98-
retrieved_names = [p.name for p in pipelines]
100+
retrieved_names = [h.name for h in handles]
99101
for name in pipeline_names:
100102
assert name in retrieved_names
101103

104+
# Verify all are PipelineHandle instances
105+
for handle in handles:
106+
assert isinstance(handle, PipelineHandle)
107+
assert isinstance(handle.pipeline, DeepsetPipeline)
108+
102109
# Test pagination
103-
if len(pipelines) > 1:
110+
if len(handles) > 1:
104111
# Get the first page with 1 item
105112
first_page = await pipeline_resource.list(limit=1)
106113
assert len(first_page) == 1
114+
assert isinstance(first_page[0], PipelineHandle)
107115

108116
# Get the second page
109117
second_page = await pipeline_resource.list(page_number=2, limit=1)
110118
assert len(second_page) == 1
119+
assert isinstance(second_page[0], PipelineHandle)
111120

112121
# Verify they're different pipelines
113122
assert first_page[0].id != second_page[0].id
@@ -125,16 +134,16 @@ async def test_get_pipeline(
125134
await pipeline_resource.create(name=pipeline_name, yaml_config=sample_yaml_config)
126135

127136
# Test getting with YAML config
128-
pipeline_with_yaml: DeepsetPipeline = await pipeline_resource.get(pipeline_name=pipeline_name, include_yaml=True)
129-
assert pipeline_with_yaml.name == pipeline_name
130-
assert pipeline_with_yaml.yaml_config == sample_yaml_config
137+
handle_with_yaml: PipelineHandle = await pipeline_resource.get(pipeline_name=pipeline_name, include_yaml=True)
138+
assert handle_with_yaml.name == pipeline_name
139+
assert handle_with_yaml.yaml_config == sample_yaml_config
140+
assert isinstance(handle_with_yaml.pipeline, DeepsetPipeline)
131141

132142
# Test getting without YAML config
133-
pipeline_without_yaml: DeepsetPipeline = await pipeline_resource.get(
134-
pipeline_name=pipeline_name, include_yaml=False
135-
)
136-
assert pipeline_without_yaml.name == pipeline_name
137-
assert pipeline_without_yaml.yaml_config is None
143+
handle_without_yaml: PipelineHandle = await pipeline_resource.get(pipeline_name=pipeline_name, include_yaml=False)
144+
assert handle_without_yaml.name == pipeline_name
145+
assert handle_without_yaml.yaml_config is None
146+
assert isinstance(handle_without_yaml.pipeline, DeepsetPipeline)
138147

139148

140149
@pytest.mark.asyncio
@@ -156,8 +165,9 @@ async def test_update_pipeline(
156165
)
157166

158167
# Verify the name was updated
159-
updated_pipeline: DeepsetPipeline = await pipeline_resource.get(pipeline_name=updated_name)
160-
assert updated_pipeline.name == updated_name
168+
updated_handle: PipelineHandle = await pipeline_resource.get(pipeline_name=updated_name)
169+
assert updated_handle.name == updated_name
170+
assert isinstance(updated_handle.pipeline, DeepsetPipeline)
161171

162172
# Update the pipeline config
163173
modified_yaml = sample_yaml_config.replace("temperature: 0.1", "temperature: 0.2")
@@ -167,8 +177,9 @@ async def test_update_pipeline(
167177
)
168178

169179
# Verify the config was updated
170-
updated_pipeline = await pipeline_resource.get(pipeline_name=updated_name)
171-
assert updated_pipeline.yaml_config == modified_yaml
180+
updated_handle = await pipeline_resource.get(pipeline_name=updated_name)
181+
assert updated_handle.yaml_config == modified_yaml
182+
assert isinstance(updated_handle.pipeline, DeepsetPipeline)
172183

173184

174185
@pytest.mark.asyncio

0 commit comments

Comments
 (0)