Skip to content

Commit 92078b3

Browse files
authored
feat: add pipeline template tools (#35)
* feat: add pipeline template tools * feat: import PipelineTemplate model * feat: add pipeline template formatting utility * feat: import pipeline template tools * feat: add pipeline template MCP tools * test: add unit tests for pipeline template tools * fix: linting and format
1 parent e7c24f0 commit 92078b3

4 files changed

Lines changed: 234 additions & 0 deletions

File tree

src/deepset_mcp/main.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
update_pipeline as update_pipeline_tool,
1717
validate_pipeline as validate_pipeline_tool,
1818
)
19+
from deepset_mcp.tools.pipeline_template import (
20+
get_pipeline_template as get_pipeline_template_tool,
21+
list_pipeline_templates as list_pipeline_templates_tool,
22+
)
1923

2024
INITIALIZED_MODEL = StaticModel.from_pretrained("minishlab/potion-base-2M")
2125

@@ -45,6 +49,35 @@ async def list_pipelines() -> str:
4549
return response
4650

4751

52+
@mcp.tool()
53+
async def list_pipeline_templates() -> str:
54+
"""Retrieves a list of all pipeline templates available within the currently configured deepset workspace.
55+
56+
Use this when you need to know the available pipeline templates and their capabilities.
57+
"""
58+
workspace = get_workspace()
59+
async with AsyncDeepsetClient() as client:
60+
response = await list_pipeline_templates_tool(client, workspace)
61+
62+
return response
63+
64+
65+
@mcp.tool()
66+
async def get_pipeline_template(template_name: str) -> str:
67+
"""Fetches detailed configuration information for a specific pipeline template.
68+
69+
This includes its YAML configuration, metadata, and recommended use cases.
70+
Use this when you need to inspect a specific template's structure or settings.
71+
72+
:param template_name: Name of the pipeline template to retrieve.
73+
"""
74+
workspace = get_workspace()
75+
async with AsyncDeepsetClient() as client:
76+
response = await get_pipeline_template_tool(client, workspace, template_name)
77+
78+
return response
79+
80+
4881
@mcp.tool()
4982
async def get_pipeline(pipeline_name: str) -> str:
5083
"""Fetches detailed configuration information for a specific pipeline, identified by its unique `pipeline_name`.

src/deepset_mcp/tools/formatting_utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,40 @@
11
from deepset_mcp.api.pipeline.models import DeepsetPipeline, PipelineValidationResult
2+
from deepset_mcp.api.pipeline_template.models import PipelineTemplate
3+
4+
5+
def pipeline_template_to_llm_readable_string(template: PipelineTemplate) -> str:
6+
"""Creates a string representation of a pipeline template that is readable by LLMs."""
7+
template_parts = [
8+
f'''<pipeline_template name="{template.template_name}" id="{template.pipeline_template_id}">
9+
10+
### Basic Information
11+
12+
**Name:** {template.template_name}
13+
**Author:** {template.author}
14+
**Description:** {template.description}
15+
'''
16+
]
17+
18+
if template.best_for:
19+
template_parts.append("\n### Best For\n" + "\n".join(f"- {use}" for use in template.best_for))
20+
21+
if template.potential_applications:
22+
template_parts.append(
23+
"\n### Potential Applications\n" + "\n".join(f"- {app}" for app in template.potential_applications)
24+
)
25+
26+
if template.tags:
27+
template_parts.append("\n### Tags\n" + "\n".join(f"- {tag.name}" for tag in template.tags))
28+
29+
if template.yaml_config is not None:
30+
template_parts.append("\n### Template Configuration")
31+
template_parts.append(f"\n```yaml\n{template.yaml_config}\n```")
32+
33+
template_parts.append(
34+
f'\n</pipeline_template name="{template.template_name}" id="{template.pipeline_template_id}">'
35+
)
36+
37+
return "\n".join(template_parts)
238

339

440
def pipeline_to_llm_readable_string(pipeline: DeepsetPipeline) -> str:
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from deepset_mcp.api.exceptions import ResourceNotFoundError, UnexpectedAPIError
2+
from deepset_mcp.api.protocols import AsyncClientProtocol
3+
from deepset_mcp.tools.formatting_utils import pipeline_template_to_llm_readable_string
4+
5+
6+
async def list_pipeline_templates(client: AsyncClientProtocol, workspace: str) -> str:
7+
"""Retrieves a list of all available pipeline templates."""
8+
try:
9+
response = await client.pipeline_templates(workspace=workspace).list_templates()
10+
formatted_templates = [pipeline_template_to_llm_readable_string(t) for t in response]
11+
return "\n\n".join(formatted_templates)
12+
except ResourceNotFoundError:
13+
return f"There is no workspace named '{workspace}'. Did you mean to configure it?"
14+
except UnexpectedAPIError as e:
15+
return f"Failed to list pipeline templates: {e}"
16+
17+
18+
async def get_pipeline_template(client: AsyncClientProtocol, workspace: str, template_name: str) -> str:
19+
"""Fetches detailed information for a specific pipeline template, identified by its `template_name`."""
20+
try:
21+
response = await client.pipeline_templates(workspace=workspace).get_template(template_name)
22+
return pipeline_template_to_llm_readable_string(response)
23+
except ResourceNotFoundError:
24+
return f"There is no pipeline template named '{template_name}' in workspace '{workspace}'."
25+
except UnexpectedAPIError as e:
26+
return f"Failed to fetch pipeline template '{template_name}': {e}"
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
from uuid import UUID
2+
3+
import pytest
4+
5+
from deepset_mcp.api.exceptions import ResourceNotFoundError, UnexpectedAPIError
6+
from deepset_mcp.api.pipeline_template.models import PipelineTemplate, PipelineTemplateTag
7+
from deepset_mcp.tools.pipeline_template import get_pipeline_template, list_pipeline_templates
8+
from test.unit.conftest import BaseFakeClient
9+
10+
11+
class FakePipelineTemplateResource:
12+
def __init__(
13+
self,
14+
list_response: list[PipelineTemplate] | None = None,
15+
get_response: PipelineTemplate | None = None,
16+
list_exception: Exception | None = None,
17+
get_exception: Exception | None = None,
18+
) -> None:
19+
self._list_response = list_response
20+
self._get_response = get_response
21+
self._list_exception = list_exception
22+
self._get_exception = get_exception
23+
24+
async def list_templates(self, limit: int = 100) -> list[PipelineTemplate]:
25+
if self._list_exception:
26+
raise self._list_exception
27+
if self._list_response is not None:
28+
return self._list_response
29+
raise NotImplementedError
30+
31+
async def get_template(self, template_name: str) -> PipelineTemplate:
32+
if self._get_exception:
33+
raise self._get_exception
34+
if self._get_response is not None:
35+
return self._get_response
36+
raise NotImplementedError
37+
38+
39+
class FakeClient(BaseFakeClient):
40+
def __init__(self, resource: FakePipelineTemplateResource) -> None:
41+
self._resource = resource
42+
super().__init__()
43+
44+
def pipeline_templates(self, workspace: str) -> FakePipelineTemplateResource:
45+
return self._resource
46+
47+
48+
@pytest.mark.asyncio
49+
async def test_list_pipeline_templates_returns_formatted_string() -> None:
50+
template1 = PipelineTemplate(
51+
pipeline_name="template1",
52+
pipeline_template_id=UUID("00000000-0000-0000-0000-000000000001"),
53+
author="Alice Smith",
54+
description="First template",
55+
best_for=["use case 1", "use case 2"],
56+
potential_applications=["app 1", "app 2"],
57+
query_yaml="config1: value1",
58+
tags=[PipelineTemplateTag(name="tag1", tag_id=UUID("10000000-0000-0000-0000-000000000001"))],
59+
)
60+
template2 = PipelineTemplate(
61+
pipeline_name="template2",
62+
pipeline_template_id=UUID("00000000-0000-0000-0000-000000000002"),
63+
author="Bob Jones",
64+
description="Second template",
65+
best_for=["use case 3"],
66+
potential_applications=["app 3"],
67+
query_yaml="config2: value2",
68+
tags=[PipelineTemplateTag(name="tag2", tag_id=UUID("20000000-0000-0000-0000-000000000002"))],
69+
)
70+
resource = FakePipelineTemplateResource(list_response=[template1, template2])
71+
client = FakeClient(resource)
72+
result = await list_pipeline_templates(client, workspace="ws1")
73+
74+
assert result.count("<pipeline_template name=") == 2
75+
assert "template1" in result
76+
assert "template2" in result
77+
assert "Alice Smith" in result
78+
assert "Bob Jones" in result
79+
80+
81+
@pytest.mark.asyncio
82+
async def test_list_pipeline_templates_handles_resource_not_found() -> None:
83+
resource = FakePipelineTemplateResource(list_exception=ResourceNotFoundError())
84+
client = FakeClient(resource)
85+
result = await list_pipeline_templates(client, workspace="invalid_ws")
86+
87+
assert "no workspace named 'invalid_ws'" in result.lower()
88+
89+
90+
@pytest.mark.asyncio
91+
async def test_list_pipeline_templates_handles_unexpected_error() -> None:
92+
resource = FakePipelineTemplateResource(list_exception=UnexpectedAPIError(status_code=500, message="Server error"))
93+
client = FakeClient(resource)
94+
result = await list_pipeline_templates(client, workspace="ws1")
95+
96+
assert "Failed to list pipeline templates" in result
97+
assert "Server error" in result
98+
99+
100+
@pytest.mark.asyncio
101+
async def test_get_pipeline_template_returns_formatted_string() -> None:
102+
template = PipelineTemplate(
103+
pipeline_name="test_template",
104+
pipeline_template_id=UUID("00000000-0000-0000-0000-000000000001"),
105+
author="Eve Brown",
106+
description="Test template",
107+
best_for=["use case 1"],
108+
potential_applications=["app 1"],
109+
query_yaml="config: value",
110+
tags=[PipelineTemplateTag(name="tag1", tag_id=UUID("10000000-0000-0000-0000-000000000001"))],
111+
)
112+
resource = FakePipelineTemplateResource(get_response=template)
113+
client = FakeClient(resource)
114+
result = await get_pipeline_template(client, workspace="ws1", template_name="test_template")
115+
116+
assert "test_template" in result
117+
assert "Eve Brown" in result
118+
assert "Test template" in result
119+
assert "config: value" in result
120+
assert "tag1" in result
121+
122+
123+
@pytest.mark.asyncio
124+
async def test_get_pipeline_template_handles_resource_not_found() -> None:
125+
resource = FakePipelineTemplateResource(get_exception=ResourceNotFoundError())
126+
client = FakeClient(resource)
127+
result = await get_pipeline_template(client, workspace="ws1", template_name="invalid_template")
128+
129+
assert "no pipeline template named 'invalid_template'" in result.lower()
130+
131+
132+
@pytest.mark.asyncio
133+
async def test_get_pipeline_template_handles_unexpected_error() -> None:
134+
resource = FakePipelineTemplateResource(get_exception=UnexpectedAPIError(status_code=500, message="Server error"))
135+
client = FakeClient(resource)
136+
result = await get_pipeline_template(client, workspace="ws1", template_name="test_template")
137+
138+
assert "Failed to fetch pipeline template 'test_template'" in result
139+
assert "Server error" in result

0 commit comments

Comments
 (0)