Skip to content

Commit 4a78974

Browse files
authored
feat: add get_logs method to PipelineHandle and PipelineResource (#56)
* feat: add log models for pipeline logs * feat: add PipelineLogList import to handle * feat: add get_logs method to PipelineHandle * feat: add PipelineLogList import to resource * feat: add get_logs method to PipelineResource * feat: add PipelineLogList import to protocols * feat: add get_logs method to PipelineResourceProtocol * feat: add comprehensive unit tests for pipeline logs functionality * feat: add default implementation for pipelines method in BaseFakeClient * feat: add integration tests for pipeline logs functionality * feat: export log models in pipeline package init * refactor: remove PipelineHandle class in favor of simpler approach * refactor: remove PipelineHandle import after deleting handle * refactor: change list method to return DeepsetPipeline instead of handles * refactor: change get method to return DeepsetPipeline instead of handle * refactor: remove PipelineHandle import and add DeepsetPipeline import * refactor: update protocol signatures to return DeepsetPipeline instead of handles * refactor: remove handle tests since we're removing the handle class * refactor: remove PipelineHandle import from logs tests * feat: create clean pipeline logs tests without handle references * refactor: remove old logs test file with handle references * feat: recreate pipeline logs tests without handle references * refactor: remove duplicate test file * refactor: update integration tests to use resource directly instead of handle * refactor: update integration test to use resource directly for error logs * refactor: remove PipelineHandle import from resource tests * refactor: update list test to expect DeepsetPipeline instead of handle * refactor: update get test to expect DeepsetPipeline instead of handle * refactor: update get without yaml test to expect DeepsetPipeline instead of handle * refactor: update special characters test to expect DeepsetPipeline instead of handle * refactor: remove PipelineHandle from pipeline package __init__.py * fix: tests
1 parent 0f7711a commit 4a78974

10 files changed

Lines changed: 440 additions & 219 deletions

File tree

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
1-
from .handle import PipelineHandle
2-
from .models import DeepsetPipeline, NoContentResponse, PipelineValidationResult, ValidationError
1+
from .models import (
2+
DeepsetPipeline,
3+
NoContentResponse,
4+
PipelineLog,
5+
PipelineLogList,
6+
PipelineValidationResult,
7+
ValidationError,
8+
)
39
from .resource import PipelineResource
410

511
__all__ = [
6-
"PipelineHandle",
712
"DeepsetPipeline",
813
"NoContentResponse",
914
"PipelineValidationResult",
1015
"ValidationError",
1116
"PipelineResource",
17+
"PipelineLog",
18+
"PipelineLogList",
1219
]

src/deepset_mcp/api/pipeline/handle.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

src/deepset_mcp/api/pipeline/models.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from datetime import datetime
22
from enum import StrEnum
3+
from typing import Any
34

45
from pydantic import BaseModel, Field
56

@@ -59,3 +60,23 @@ class NoContentResponse(BaseModel):
5960

6061
success: bool = True
6162
message: str = "No content"
63+
64+
65+
class PipelineLog(BaseModel):
66+
"""Model representing a single log entry from a pipeline."""
67+
68+
log_id: str
69+
message: str
70+
logged_at: datetime
71+
level: str
72+
origin: str
73+
exceptions: str | None = None
74+
extra_fields: dict[str, Any] = Field(default_factory=dict)
75+
76+
77+
class PipelineLogList(BaseModel):
78+
"""Model representing a paginated list of pipeline logs."""
79+
80+
data: list[PipelineLog]
81+
has_more: bool
82+
total: int

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
from deepset_mcp.api.pipeline.models import (
55
DeepsetPipeline,
66
NoContentResponse,
7+
PipelineLogList,
78
PipelineValidationResult,
89
ValidationError,
910
)
1011
from deepset_mcp.api.transport import raise_for_status
1112

1213
if TYPE_CHECKING:
13-
from deepset_mcp.api.pipeline.handle import PipelineHandle
1414
from deepset_mcp.api.protocols import AsyncClientProtocol
1515

1616

@@ -67,16 +67,14 @@ async def list(
6767
self,
6868
page_number: int = 1,
6969
limit: int = 10,
70-
) -> list["PipelineHandle"]:
70+
) -> list[DeepsetPipeline]:
7171
"""
7272
Retrieve pipeline in the configured workspace with optional pagination.
7373
7474
:param page_number: Page number for paging.
7575
:param limit: Max number of items to return.
76-
:return: List of PipelineHandle instances.
76+
:return: List of DeepsetPipeline instances.
7777
"""
78-
from deepset_mcp.api.pipeline.handle import PipelineHandle
79-
8078
params: dict[str, Any] = {
8179
"page_number": page_number,
8280
"limit": limit,
@@ -97,12 +95,10 @@ async def list(
9795
else:
9896
pipelines = []
9997

100-
return [PipelineHandle(pipeline=pipeline, resource=self) for pipeline in pipelines]
98+
return pipelines
10199

102-
async def get(self, pipeline_name: str, include_yaml: bool = True) -> "PipelineHandle":
100+
async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPipeline:
103101
"""Fetch a single pipeline by its name."""
104-
from deepset_mcp.api.pipeline.handle import PipelineHandle
105-
106102
resp = await self._client.request(endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}")
107103
raise_for_status(resp)
108104

@@ -118,7 +114,7 @@ async def get(self, pipeline_name: str, include_yaml: bool = True) -> "PipelineH
118114
if yaml_response.json is not None:
119115
pipeline.yaml_config = yaml_response.json["query_yaml"]
120116

121-
return PipelineHandle(pipeline=pipeline, resource=self)
117+
return pipeline
122118

123119
async def create(self, name: str, yaml_config: str) -> NoContentResponse:
124120
"""Create a new pipeline with a name and YAML config."""
@@ -172,3 +168,40 @@ async def update(
172168
return response
173169

174170
raise ValueError("Either `updated_pipeline_name` or `yaml_config` must be provided.")
171+
172+
async def get_logs(
173+
self,
174+
pipeline_name: str,
175+
limit: int = 30,
176+
level: str | None = None,
177+
) -> PipelineLogList:
178+
"""Fetch logs for a specific pipeline.
179+
180+
:param pipeline_name: Name of the pipeline to fetch logs for.
181+
:param limit: Maximum number of log entries to return.
182+
:param level: Filter logs by level (info, warning, error). If None, returns all levels.
183+
184+
:returns: A PipelineLogList containing the log entries.
185+
"""
186+
params: dict[str, Any] = {
187+
"limit": limit,
188+
"filter": "origin eq 'querypipeline'",
189+
}
190+
191+
# Add level filter if specified
192+
if level is not None:
193+
params["filter"] = f"level eq '{level}' and origin eq 'querypipeline'"
194+
195+
resp = await self._client.request(
196+
endpoint=f"v1/workspaces/{self._workspace}/pipelines/{pipeline_name}/logs",
197+
method="GET",
198+
params=params,
199+
)
200+
201+
raise_for_status(resp)
202+
203+
if resp.json is not None:
204+
return PipelineLogList.model_validate(resp.json)
205+
else:
206+
# Return empty log list if no response
207+
return PipelineLogList(data=[], has_more=False, total=0)

src/deepset_mcp/api/protocols.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
from types import TracebackType
2-
from typing import TYPE_CHECKING, Any, Protocol, Self, TypeVar, overload
2+
from typing import Any, Protocol, Self, TypeVar, overload
33

44
from deepset_mcp.api.indexes.models import Index, IndexList
5-
from deepset_mcp.api.pipeline.models import NoContentResponse, PipelineValidationResult
5+
from deepset_mcp.api.pipeline.models import (
6+
DeepsetPipeline,
7+
NoContentResponse,
8+
PipelineLogList,
9+
PipelineValidationResult,
10+
)
611
from deepset_mcp.api.pipeline_template.models import PipelineTemplate
712
from deepset_mcp.api.transport import TransportResponse
813

9-
if TYPE_CHECKING:
10-
from deepset_mcp.api.pipeline.handle import PipelineHandle
11-
1214

1315
class HaystackServiceProtocol(Protocol):
1416
"""Protocol defining the implementation for HaystackService."""
@@ -151,15 +153,15 @@ async def validate(self, yaml_config: str) -> PipelineValidationResult:
151153
"""Validate a pipeline's YAML configuration against the API."""
152154
...
153155

154-
async def get(self, pipeline_name: str, include_yaml: bool = True) -> "PipelineHandle":
156+
async def get(self, pipeline_name: str, include_yaml: bool = True) -> DeepsetPipeline:
155157
"""Fetch a single pipeline by its name."""
156158
...
157159

158160
async def list(
159161
self,
160162
page_number: int = 1,
161163
limit: int = 10,
162-
) -> list["PipelineHandle"]:
164+
) -> list[DeepsetPipeline]:
163165
"""List pipelines in the configured workspace with optional pagination."""
164166
...
165167

@@ -175,3 +177,12 @@ async def update(
175177
) -> NoContentResponse:
176178
"""Update name and/or YAML config of an existing pipeline."""
177179
...
180+
181+
async def get_logs(
182+
self,
183+
pipeline_name: str,
184+
limit: int = 30,
185+
level: str | None = None,
186+
) -> PipelineLogList:
187+
"""Fetch logs for a specific pipeline."""
188+
...

src/deepset_mcp/tools/pipeline.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
async def list_pipelines(client: AsyncClientProtocol, workspace: str) -> str:
99
"""Retrieves a list of all pipeline available within the currently configured deepset workspace."""
1010
response = await client.pipelines(workspace=workspace).list()
11-
formatted_pipelines = [pipeline_to_llm_readable_string(h.pipeline) for h in response]
11+
formatted_pipelines = [pipeline_to_llm_readable_string(p) for p in response]
1212

1313
return "\n\n".join(formatted_pipelines)
1414

1515

1616
async def get_pipeline(client: AsyncClientProtocol, workspace: str, pipeline_name: str) -> str:
1717
"""Fetches detailed configuration information for a specific pipeline, identified by its unique `pipeline_name`."""
1818
response = await client.pipelines(workspace=workspace).get(pipeline_name)
19-
return pipeline_to_llm_readable_string(response.pipeline)
19+
return pipeline_to_llm_readable_string(response)
2020

2121

2222
async def validate_pipeline(client: AsyncClientProtocol, workspace: str, yaml_configuration: str) -> str:
@@ -70,14 +70,14 @@ async def update_pipeline(
7070
replacement snippet is used to update the pipeline's configuration in the target workspace.
7171
"""
7272
try:
73-
original_handle = await client.pipelines(workspace=workspace).get(pipeline_name=pipeline_name)
73+
original_pipeline = await client.pipelines(workspace=workspace).get(pipeline_name=pipeline_name)
7474
except ResourceNotFoundError:
7575
return f"There is no pipeline named '{pipeline_name}'. Did you mean to create it?"
7676

77-
if original_handle.yaml_config is None:
77+
if original_pipeline.yaml_config is None:
7878
raise ValueError("The pipeline does not have a YAML configuration.")
7979

80-
occurrences = original_handle.yaml_config.count(original_config_snippet)
80+
occurrences = original_pipeline.yaml_config.count(original_config_snippet)
8181

8282
if occurrences == 0:
8383
return f"No occurrences of the provided configuration snippet were found in the pipeline '{pipeline_name}'."
@@ -88,7 +88,7 @@ async def update_pipeline(
8888
f"'{pipeline_name}'. Specify a more precise snippet to proceed with the update."
8989
)
9090

91-
updated_yaml_configuration = original_handle.yaml_config.replace(
91+
updated_yaml_configuration = original_pipeline.yaml_config.replace(
9292
original_config_snippet, replacement_config_snippet, 1
9393
)
9494

0 commit comments

Comments
 (0)