Skip to content

Commit 8b49e66

Browse files
authored
feat: add pipeline deployment functionality (#67)
* feat: add deploy method to PipelineResource * feat: add deploy_pipeline tool function * feat: import deploy_pipeline_tool in main.py * feat: add deploy_pipeline MCP tool * test: add unit tests for deploy method in PipelineResource * test: add deploy parameters to FakePipelineResource * test: add deploy method to FakePipelineResource * test: import deploy_pipeline in test file * test: add unit tests for deploy_pipeline tool function * test: add integration tests for deploy method * fix: format and lint and types
1 parent ed6d019 commit 8b49e66

7 files changed

Lines changed: 323 additions & 0 deletions

File tree

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,35 @@ async def get_logs(
206206
else:
207207
# Return empty log list if no response
208208
return PipelineLogList(data=[], has_more=False, total=0)
209+
210+
async def deploy(self, pipeline_name: str) -> PipelineValidationResult:
211+
"""Deploy a pipeline to production.
212+
213+
:param pipeline_name: Name of the pipeline to deploy.
214+
215+
:returns: PipelineValidationResult containing deployment status and any errors.
216+
217+
:raises: UnexpectedAPIError: If the API returns an unexpected status code.
218+
"""
219+
resp = await self._client.request(
220+
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/deploy",
221+
method="POST",
222+
)
223+
224+
# If successful (status 200), the deployment was successful
225+
if resp.success:
226+
return PipelineValidationResult(valid=True)
227+
228+
# Handle validation errors (422)
229+
if resp.status_code == 422 and resp.json is not None and isinstance(resp.json, dict) and "details" in resp.json:
230+
errors = [ValidationError(code=error["code"], message=error["message"]) for error in resp.json["details"]]
231+
return PipelineValidationResult(valid=False, errors=errors)
232+
233+
# Handle other 4xx errors (400, 404)
234+
if 400 <= resp.status_code < 500:
235+
# For non-validation errors, create a generic error
236+
error_message = resp.text if resp.text else f"HTTP {resp.status_code} error"
237+
errors = [ValidationError(code="DEPLOYMENT_ERROR", message=error_message)]
238+
return PipelineValidationResult(valid=False, errors=errors)
239+
240+
raise UnexpectedAPIError(status_code=resp.status_code, message=resp.text, detail=resp.json)

src/deepset_mcp/api/protocols.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,7 @@ async def get_logs(
187187
) -> PipelineLogList:
188188
"""Fetch logs for a specific pipeline."""
189189
...
190+
191+
async def deploy(self, pipeline_name: str) -> PipelineValidationResult:
192+
"""Deploy a pipeline."""
193+
...

src/deepset_mcp/main.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from deepset_mcp.tools.pipeline import (
2222
create_pipeline as create_pipeline_tool,
23+
deploy_pipeline as deploy_pipeline_tool,
2324
get_pipeline as get_pipeline_tool,
2425
get_pipeline_logs as get_pipeline_logs_tool,
2526
list_pipelines as list_pipelines_tool,
@@ -330,6 +331,26 @@ async def get_pipeline_logs(pipeline_name: str, limit: int = 30, level: str | No
330331
return response
331332

332333

334+
@mcp.tool()
335+
async def deploy_pipeline(pipeline_name: str) -> str:
336+
"""Deploys a pipeline to production in the deepset workspace.
337+
338+
Use this to deploy a pipeline that has been created and validated.
339+
The deployment process will validate the pipeline configuration and deploy it if valid.
340+
If deployment fails due to validation errors, you will receive detailed error information.
341+
342+
:param pipeline_name: Name of the pipeline to deploy.
343+
"""
344+
workspace = get_workspace()
345+
async with AsyncDeepsetClient() as client:
346+
response = await deploy_pipeline_tool(
347+
client=client,
348+
workspace=workspace,
349+
pipeline_name=pipeline_name,
350+
)
351+
return response
352+
353+
333354
#
334355
#
335356
# @mcp.tool()

src/deepset_mcp/tools/pipeline.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,31 @@ async def get_pipeline_logs(
146146
from deepset_mcp.tools.formatting_utils import pipeline_logs_to_llm_readable_string
147147

148148
return pipeline_logs_to_llm_readable_string(logs, pipeline_name, level)
149+
150+
151+
async def deploy_pipeline(client: AsyncClientProtocol, workspace: str, pipeline_name: str) -> str:
152+
"""Deploys a pipeline to production.
153+
154+
This function attempts to deploy the specified pipeline in the given workspace.
155+
If the deployment fails due to validation errors, it returns a readable string
156+
describing the validation errors.
157+
158+
:param client: The async client for API communication.
159+
:param workspace: The workspace name.
160+
:param pipeline_name: Name of the pipeline to deploy.
161+
162+
:returns: A string indicating the deployment result.
163+
"""
164+
try:
165+
deployment_result = await client.pipelines(workspace=workspace).deploy(pipeline_name=pipeline_name)
166+
except ResourceNotFoundError:
167+
return f"There is no pipeline named '{pipeline_name}' in workspace '{workspace}'."
168+
except BadRequestError as e:
169+
return f"Failed to deploy pipeline '{pipeline_name}': {e}"
170+
except UnexpectedAPIError as e:
171+
return f"Failed to deploy pipeline '{pipeline_name}': {e}"
172+
173+
if deployment_result.valid:
174+
return f"Pipeline '{pipeline_name}' deployed successfully."
175+
else:
176+
return validation_result_to_llm_readable_string(deployment_result)

test/integration/test_integration_pipeline_resource.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,37 @@ async def test_validation_syntax_error(
241241

242242
assert resp.valid is False
243243
assert resp.errors[0].code == "YAML_ERROR"
244+
245+
246+
@pytest.mark.asyncio
247+
async def test_deploy_pipeline_success(
248+
pipeline_resource: PipelineResource,
249+
sample_yaml_config: str,
250+
) -> None:
251+
"""Test successful pipeline deployment."""
252+
pipeline_name = "test-deploy-pipeline"
253+
254+
# Create a pipeline to deploy
255+
await pipeline_resource.create(name=pipeline_name, yaml_config=sample_yaml_config)
256+
257+
# Deploy the pipeline
258+
result = await pipeline_resource.deploy(pipeline_name=pipeline_name)
259+
260+
# Verify deployment was successful
261+
assert result.valid is True
262+
assert len(result.errors) == 0
263+
264+
265+
@pytest.mark.asyncio
266+
async def test_deploy_nonexistent_pipeline(
267+
pipeline_resource: PipelineResource,
268+
) -> None:
269+
"""Test deploying a non-existent pipeline."""
270+
non_existent_name = "non-existent-deploy-pipeline"
271+
272+
# Deploy a non-existent pipeline
273+
result = await pipeline_resource.deploy(pipeline_name=non_existent_name)
274+
275+
# Should return validation errors indicating the pipeline doesn't exist
276+
assert result.valid is False
277+
assert len(result.errors) > 0

test/unit/api/pipeline/test_pipeline_resource.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -890,3 +890,126 @@ async def test_get_logs_preserves_extra_fields(self) -> None:
890890
# Verify extra fields are preserved
891891
assert "custom_field" in result.data[0].extra_fields
892892
assert result.data[0].extra_fields["custom_field"] == "custom_value"
893+
894+
@pytest.mark.asyncio
895+
async def test_deploy_pipeline_success(self) -> None:
896+
"""Test successful pipeline deployment."""
897+
# Create client with successful response
898+
client = DummyClient(responses={"test-workspace/pipelines/test-pipeline/deploy": {"status": "success"}})
899+
900+
# Create resource and call deploy method
901+
resource = PipelineResource(client=client, workspace="test-workspace")
902+
result = await resource.deploy(pipeline_name="test-pipeline")
903+
904+
# Verify results
905+
assert isinstance(result, PipelineValidationResult)
906+
assert result.valid is True
907+
assert len(result.errors) == 0
908+
909+
# Verify request
910+
assert len(client.requests) == 1
911+
assert client.requests[0]["endpoint"] == "v1/workspaces/test-workspace/pipelines/test-pipeline/deploy"
912+
assert client.requests[0]["method"] == "POST"
913+
914+
@pytest.mark.asyncio
915+
async def test_deploy_pipeline_with_validation_errors(self) -> None:
916+
"""Test deployment with validation errors (422)."""
917+
# Create a response with validation errors
918+
validation_errors = {
919+
"details": [
920+
{"code": "invalid_component", "message": "Component 'invalid_reader' is not available"},
921+
{"code": "missing_field", "message": "Required field 'index' is missing"},
922+
]
923+
}
924+
925+
# Create mock client with 422 response containing validation errors
926+
client = DummyClient()
927+
transport_response = TransportResponse(text="", status_code=422, json=validation_errors)
928+
client.responses = {"test-workspace/pipelines/test-pipeline/deploy": transport_response}
929+
930+
# Run the deployment
931+
resource = PipelineResource(client=client, workspace="test-workspace")
932+
result = await resource.deploy(pipeline_name="test-pipeline")
933+
934+
# Check the result
935+
assert isinstance(result, PipelineValidationResult)
936+
assert result.valid is False
937+
assert len(result.errors) == 2
938+
assert result.errors[0].code == "invalid_component"
939+
assert result.errors[0].message == "Component 'invalid_reader' is not available"
940+
assert result.errors[1].code == "missing_field"
941+
assert result.errors[1].message == "Required field 'index' is missing"
942+
943+
@pytest.mark.asyncio
944+
async def test_deploy_pipeline_with_400_error(self) -> None:
945+
"""Test deployment with 400 error."""
946+
# Create response for 400 error
947+
error_response: TransportResponse[None] = TransportResponse(text="Bad request", status_code=400, json=None)
948+
949+
client = DummyClient(responses={"test-workspace/pipelines/test-pipeline/deploy": error_response})
950+
951+
# Run the deployment
952+
resource = PipelineResource(client=client, workspace="test-workspace")
953+
result = await resource.deploy(pipeline_name="test-pipeline")
954+
955+
# Check the result
956+
assert result.valid is False
957+
assert len(result.errors) == 1
958+
assert result.errors[0].code == "DEPLOYMENT_ERROR"
959+
assert result.errors[0].message == "Bad request"
960+
961+
@pytest.mark.asyncio
962+
async def test_deploy_pipeline_with_404_error(self) -> None:
963+
"""Test deployment with 404 error (pipeline not found)."""
964+
# Create response for 404 error
965+
error_response: TransportResponse[None] = TransportResponse(
966+
text="Pipeline not found", status_code=404, json=None
967+
)
968+
969+
client = DummyClient(responses={"test-workspace/pipelines/nonexistent-pipeline/deploy": error_response})
970+
971+
# Run the deployment
972+
resource = PipelineResource(client=client, workspace="test-workspace")
973+
result = await resource.deploy(pipeline_name="nonexistent-pipeline")
974+
975+
# Check the result
976+
assert result.valid is False
977+
assert len(result.errors) == 1
978+
assert result.errors[0].code == "DEPLOYMENT_ERROR"
979+
assert result.errors[0].message == "Pipeline not found"
980+
981+
@pytest.mark.asyncio
982+
async def test_deploy_pipeline_with_500_error(self) -> None:
983+
"""Test deployment with 500 error (unexpected error)."""
984+
# Create response for 500 error
985+
error_response: TransportResponse[None] = TransportResponse(
986+
text="Internal server error", status_code=500, json=None
987+
)
988+
989+
client = DummyClient(responses={"test-workspace/pipelines/test-pipeline/deploy": error_response})
990+
991+
# Run the deployment and expect an exception
992+
resource = PipelineResource(client=client, workspace="test-workspace")
993+
with pytest.raises(UnexpectedAPIError) as exc_info:
994+
await resource.deploy(pipeline_name="test-pipeline")
995+
996+
assert exc_info.value.status_code == 500
997+
assert "Internal server error" in str(exc_info.value)
998+
999+
@pytest.mark.asyncio
1000+
async def test_deploy_pipeline_with_empty_error_text(self) -> None:
1001+
"""Test deployment with error response but empty text."""
1002+
# Create response for 400 error with empty text
1003+
error_response: TransportResponse[None] = TransportResponse(text="", status_code=400, json=None)
1004+
1005+
client = DummyClient(responses={"test-workspace/pipelines/test-pipeline/deploy": error_response})
1006+
1007+
# Run the deployment
1008+
resource = PipelineResource(client=client, workspace="test-workspace")
1009+
result = await resource.deploy(pipeline_name="test-pipeline")
1010+
1011+
# Check the result
1012+
assert result.valid is False
1013+
assert len(result.errors) == 1
1014+
assert result.errors[0].code == "DEPLOYMENT_ERROR"
1015+
assert result.errors[0].message == "HTTP 400 error"

test/unit/tools/test_pipeline.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
# Adjust the import path below to match your project structure
2323
from deepset_mcp.tools.pipeline import (
2424
create_pipeline,
25+
deploy_pipeline,
2526
get_pipeline,
2627
get_pipeline_logs,
2728
list_pipelines,
@@ -40,10 +41,12 @@ def __init__(
4041
create_response: NoContentResponse | None = None,
4142
update_response: NoContentResponse | None = None,
4243
logs_response: PipelineLogList | None = None,
44+
deploy_response: PipelineValidationResult | None = None,
4345
get_exception: Exception | None = None,
4446
update_exception: Exception | None = None,
4547
create_exception: Exception | None = None,
4648
logs_exception: Exception | None = None,
49+
deploy_exception: Exception | None = None,
4750
) -> None:
4851
self._list_response = list_response
4952
self._get_response = get_response
@@ -55,6 +58,8 @@ def __init__(
5558
self._update_exception = update_exception
5659
self._logs_response = logs_response
5760
self._logs_exception = logs_exception
61+
self._deploy_response = deploy_response
62+
self._deploy_exception = deploy_exception
5863

5964
async def list(self, page_number: int = 1, limit: int = 10) -> list[DeepsetPipeline]:
6065
if self._list_response is not None:
@@ -105,6 +110,13 @@ async def get_logs(
105110
return self._logs_response
106111
raise NotImplementedError
107112

113+
async def deploy(self, pipeline_name: str) -> PipelineValidationResult:
114+
if self._deploy_exception:
115+
raise self._deploy_exception
116+
if self._deploy_response is not None:
117+
return self._deploy_response
118+
raise NotImplementedError
119+
108120

109121
class FakeClient(BaseFakeClient):
110122
def __init__(self, resource: FakePipelineResource) -> None:
@@ -505,3 +517,72 @@ async def test_get_pipeline_logs_unexpected_error() -> None:
505517
result = await get_pipeline_logs(client, workspace="ws", pipeline_name="test-pipeline")
506518

507519
assert "Failed to fetch logs for pipeline 'test-pipeline': Internal server error" in result
520+
521+
522+
@pytest.mark.asyncio
523+
async def test_deploy_pipeline_success() -> None:
524+
"""Test successful pipeline deployment."""
525+
success_result = PipelineValidationResult(valid=True, errors=[])
526+
resource = FakePipelineResource(deploy_response=success_result)
527+
client = FakeClient(resource)
528+
529+
result = await deploy_pipeline(client, workspace="ws", pipeline_name="test-pipeline")
530+
531+
assert "Pipeline 'test-pipeline' deployed successfully." == result
532+
533+
534+
@pytest.mark.asyncio
535+
async def test_deploy_pipeline_with_validation_errors() -> None:
536+
"""Test deployment with validation errors."""
537+
error_result = PipelineValidationResult(
538+
valid=False,
539+
errors=[
540+
ValidationError(code="INVALID_COMPONENT", message="Component 'invalid_reader' is not available"),
541+
ValidationError(code="MISSING_FIELD", message="Required field 'index' is missing"),
542+
],
543+
)
544+
resource = FakePipelineResource(deploy_response=error_result)
545+
client = FakeClient(resource)
546+
547+
result = await deploy_pipeline(client, workspace="ws", pipeline_name="test-pipeline")
548+
549+
assert "configuration is invalid" in result
550+
assert "Error 1" in result
551+
assert "Error 2" in result
552+
assert "INVALID_COMPONENT" in result
553+
assert "MISSING_FIELD" in result
554+
555+
556+
@pytest.mark.asyncio
557+
async def test_deploy_pipeline_not_found() -> None:
558+
"""Test deployment of non-existent pipeline."""
559+
resource = FakePipelineResource(deploy_exception=ResourceNotFoundError())
560+
client = FakeClient(resource)
561+
562+
result = await deploy_pipeline(client, workspace="ws", pipeline_name="missing-pipeline")
563+
564+
assert "There is no pipeline named 'missing-pipeline' in workspace 'ws'." == result
565+
566+
567+
@pytest.mark.asyncio
568+
async def test_deploy_pipeline_bad_request() -> None:
569+
"""Test deployment with bad request error."""
570+
resource = FakePipelineResource(deploy_exception=BadRequestError("Pipeline is not ready for deployment"))
571+
client = FakeClient(resource)
572+
573+
result = await deploy_pipeline(client, workspace="ws", pipeline_name="test-pipeline")
574+
575+
assert "Failed to deploy pipeline 'test-pipeline': Pipeline is not ready for deployment" in result
576+
577+
578+
@pytest.mark.asyncio
579+
async def test_deploy_pipeline_unexpected_error() -> None:
580+
"""Test deployment with unexpected API error."""
581+
resource = FakePipelineResource(
582+
deploy_exception=UnexpectedAPIError(status_code=500, message="Internal server error")
583+
)
584+
client = FakeClient(resource)
585+
586+
result = await deploy_pipeline(client, workspace="ws", pipeline_name="test-pipeline")
587+
588+
assert "Failed to deploy pipeline 'test-pipeline': Internal server error" in result

0 commit comments

Comments
 (0)