Skip to content

Commit db59946

Browse files
authored
feat: add pagination for pipeline logs (#180)
1 parent 787f037 commit db59946

8 files changed

Lines changed: 265 additions & 37 deletions

File tree

src/deepset_mcp/api/pipeline/protocols.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
DeepsetSearchResponse,
1111
DeepsetStreamEvent,
1212
LogLevel,
13-
PipelineLogList,
13+
PipelineLog,
1414
PipelineValidationResult,
1515
)
1616
from deepset_mcp.api.shared_models import NoContentResponse, PaginatedResponse
@@ -49,7 +49,8 @@ async def get_logs(
4949
pipeline_name: str,
5050
limit: int = 30,
5151
level: LogLevel | None = None,
52-
) -> PipelineLogList:
52+
after: str | None = None,
53+
) -> PaginatedResponse[PipelineLog]:
5354
"""Fetch logs for a specific pipeline."""
5455
...
5556

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
DeepsetSearchResponse,
1515
DeepsetStreamEvent,
1616
LogLevel,
17-
PipelineLogList,
17+
PipelineLog,
1818
PipelineValidationResult,
1919
ValidationError,
2020
)
@@ -215,36 +215,62 @@ async def get_logs(
215215
pipeline_name: str,
216216
limit: int = 30,
217217
level: LogLevel | None = None,
218-
) -> PipelineLogList:
219-
"""Fetch logs for a specific pipeline.
218+
after: str | None = None,
219+
) -> PaginatedResponse[PipelineLog]:
220+
"""Fetch logs for a specific pipeline and returns the first page of results.
221+
222+
The returned object can be iterated over to fetch subsequent pages.
220223
221224
:param pipeline_name: Name of the pipeline to fetch logs for.
222-
:param limit: Maximum number of log entries to return.
225+
:param limit: Maximum number of log entries to return per page.
223226
:param level: Filter logs by level. If None, returns all levels.
224-
:returns: A PipelineLogList containing the log entries.
227+
:param after: The cursor to fetch the next page of results.
228+
:returns: A `PaginatedResponse` object containing the first page of logs.
225229
"""
226-
params: dict[str, Any] = {
230+
# 1. Prepare arguments for the initial API call
231+
request_params = {
227232
"limit": limit,
228233
"filter": "origin eq 'querypipeline'",
229234
}
230235

231236
# Add level filter if specified
232237
if level is not None:
233-
params["filter"] = f"level eq '{level}' and origin eq 'querypipeline'"
238+
request_params["filter"] = f"level eq '{level}' and origin eq 'querypipeline'"
239+
240+
# Add cursor if provided
241+
if after is not None:
242+
request_params["after"] = after
243+
244+
# Remove None values
245+
request_params = {k: v for k, v in request_params.items() if v is not None}
246+
247+
# 2. Make the first API call using a private, stateless method
248+
page = await self._get_logs_api_call(pipeline_name, **request_params)
249+
250+
# 3. Inject the logic needed for subsequent fetches into the response object
251+
page._inject_paginator(
252+
fetch_func=lambda **kwargs: self._get_logs_api_call(pipeline_name, **kwargs),
253+
# Base args for the *next* fetch don't include initial cursors
254+
base_args={"limit": limit, "filter": request_params["filter"]},
255+
cursor_param="after", # Logs use 'after' cursor, not 'before' like pipelines
256+
)
257+
return page
234258

259+
async def _get_logs_api_call(self, pipeline_name: str, **kwargs: Any) -> PaginatedResponse[PipelineLog]:
260+
"""A private, stateless method that performs the raw API call for logs."""
235261
resp = await self._client.request(
236262
endpoint=f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/logs",
237263
method="GET",
238-
params=params,
264+
params=kwargs,
239265
)
240266

241267
raise_for_status(resp)
242268

243269
if resp.json is not None:
244-
return PipelineLogList.model_validate(resp.json)
270+
return PaginatedResponse[PipelineLog].create_with_cursor_field(resp.json, "logged_at")
245271
else:
246-
# Return empty log list if no response
247-
return PipelineLogList(data=[], has_more=False, total=0)
272+
# Return empty paginated response if no JSON data
273+
return PaginatedResponse[PipelineLog](data=[], has_more=False, total=0)
248274

249275
async def deploy(self, pipeline_name: str) -> PipelineValidationResult:
250276
"""Deploy a pipeline to production.

src/deepset_mcp/api/shared_models.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class PaginatedResponse(BaseModel, Generic[T]):
4343
# --- Internal Paginator State (Defaults to None) ---
4444
_fetch_func: Callable[..., Coroutine[Any, Any, "PaginatedResponse[T]"]] | None = PrivateAttr(default=None)
4545
_base_args: dict[str, Any] | None = PrivateAttr(default=None)
46+
_cursor_param: str = PrivateAttr(default="before")
4647

4748
@model_validator(mode="before")
4849
@classmethod
@@ -72,14 +73,18 @@ def create_with_cursor_field(cls, data: dict[str, Any], cursor_field: str) -> "P
7273
return cls.model_validate(data_copy)
7374

7475
def _inject_paginator(
75-
self, fetch_func: Callable[..., Awaitable["PaginatedResponse[T]"]], base_args: dict[str, Any]
76+
self,
77+
fetch_func: Callable[..., Awaitable["PaginatedResponse[T]"]],
78+
base_args: dict[str, Any],
79+
cursor_param: str = "before",
7680
) -> None:
7781
"""Injects the necessary components to make this object iterable."""
7882
# Convert Awaitable to Coroutine for typing compatibility
7983
if callable(fetch_func):
8084
# This is a runtime check - mypy doesn't understand the callable compatibility
8185
self._fetch_func = fetch_func # type: ignore
8286
self._base_args = {k: v for k, v in base_args.items() if v is not None}
87+
self._cursor_param = cursor_param
8388

8489
async def _get_next_page(self) -> "PaginatedResponse[T] | None":
8590
"""Fetches the next page of results using the stored fetch function."""
@@ -97,10 +102,10 @@ async def _get_next_page(self) -> "PaginatedResponse[T] | None":
97102
# TODO: while 'before' signals pipelines younger than the current cursor.
98103
# TODO: This is applied irrespective of any sort (e.g. name) that would conflict with this approach.
99104
# TODO: Change this to 'after' once the behaviour is fixed on the deepset API
100-
args["before"] = self.next_cursor
105+
args[self._cursor_param] = self.next_cursor
101106

102107
next_page = await self._fetch_func(**args)
103-
next_page._inject_paginator(self._fetch_func, self._base_args)
108+
next_page._inject_paginator(self._fetch_func, self._base_args, self._cursor_param)
104109
return next_page
105110

106111
async def items(self) -> AsyncIterator[T]:

src/deepset_mcp/tools/pipeline.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
DeepsetPipeline,
1212
DeepsetSearchResponse,
1313
LogLevel,
14-
PipelineLogList,
14+
PipelineLog,
1515
PipelineOperationWithErrors,
1616
PipelineValidationResult,
1717
PipelineValidationResultWithYaml,
@@ -211,8 +211,14 @@ async def update_pipeline(
211211

212212

213213
async def get_pipeline_logs(
214-
*, client: AsyncClientProtocol, workspace: str, pipeline_name: str, limit: int = 30, level: LogLevel | None = None
215-
) -> PipelineLogList | str:
214+
*,
215+
client: AsyncClientProtocol,
216+
workspace: str,
217+
pipeline_name: str,
218+
limit: int = 30,
219+
level: LogLevel | None = None,
220+
after: str | None = None,
221+
) -> PaginatedResponse[PipelineLog] | str:
216222
"""Fetches logs for a specific pipeline.
217223
218224
Retrieves log entries for the specified pipeline, with optional filtering by log level.
@@ -223,12 +229,13 @@ async def get_pipeline_logs(
223229
:param pipeline_name: Name of the pipeline to fetch logs for.
224230
:param limit: Maximum number of log entries to return (default: 30).
225231
:param level: Filter logs by level. If None, returns all levels.
232+
:param after: The cursor to fetch the next page of results.
226233
227234
:returns: Pipeline logs or error message.
228235
"""
229236
try:
230237
return await client.pipelines(workspace=workspace).get_logs(
231-
pipeline_name=pipeline_name, limit=limit, level=level
238+
pipeline_name=pipeline_name, limit=limit, level=level, after=after
232239
)
233240
except ResourceNotFoundError:
234241
return f"There is no pipeline named '{pipeline_name}' in workspace '{workspace}'."

test/integration/test_integration_pipeline_logs.py

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
import pytest
88

99
from deepset_mcp.api.client import AsyncDeepsetClient
10-
from deepset_mcp.api.pipeline.models import DeepsetPipeline, PipelineLogList
10+
from deepset_mcp.api.pipeline.models import DeepsetPipeline
1111
from deepset_mcp.api.pipeline.resource import PipelineResource
12+
from deepset_mcp.api.shared_models import PaginatedResponse
1213

1314
pytestmark = pytest.mark.integration
1415

@@ -145,7 +146,7 @@ async def test_get_logs_for_deployed_pipeline(
145146
logs = await pipeline_resource.get_logs(pipeline_name=pipeline_name)
146147

147148
# Verify the response structure
148-
assert isinstance(logs, PipelineLogList)
149+
assert isinstance(logs, PaginatedResponse)
149150
assert isinstance(logs.data, list)
150151
assert isinstance(logs.has_more, bool)
151152
assert isinstance(logs.total, int)
@@ -178,7 +179,7 @@ async def test_get_logs_for_non_deployed_pipeline(
178179
logs = await pipeline_resource.get_logs(pipeline_name=pipeline_name)
179180

180181
# Should return a valid response structure even if empty
181-
assert isinstance(logs, PipelineLogList)
182+
assert isinstance(logs, PaginatedResponse)
182183
assert isinstance(logs.data, list)
183184
assert isinstance(logs.has_more, bool)
184185
assert isinstance(logs.total, int)
@@ -209,3 +210,73 @@ async def test_deployment_timeout_handling(
209210
timeout_seconds=1, # Very short timeout
210211
poll_interval=1,
211212
)
213+
214+
215+
@pytest.mark.extra_slow
216+
@pytest.mark.asyncio
217+
async def test_get_logs_pagination(
218+
pipeline_resource: PipelineResource,
219+
simple_yaml_config: str,
220+
) -> None:
221+
"""
222+
Test pagination functionality for pipeline logs.
223+
224+
This test:
225+
1. Creates and deploys a pipeline
226+
2. Waits for deployment and potentially some logs
227+
3. Tests pagination by requesting logs with small limit
228+
4. Verifies cursor-based pagination works correctly
229+
"""
230+
pipeline_name = "test-logs-pagination-pipeline"
231+
232+
# Step 1: Create and deploy a pipeline
233+
await pipeline_resource.create(pipeline_name=pipeline_name, yaml_config=simple_yaml_config)
234+
deploy_result = await pipeline_resource.deploy(pipeline_name=pipeline_name)
235+
assert deploy_result.valid is True, f"Pipeline deployment failed: {deploy_result.errors}"
236+
237+
# Step 2: Wait for the pipeline to be deployed
238+
deployed_pipeline = await wait_for_pipeline_deployment(
239+
pipeline_resource=pipeline_resource,
240+
pipeline_name=pipeline_name,
241+
timeout_seconds=300, # 5 minutes timeout
242+
poll_interval=15, # Check every 15 seconds
243+
)
244+
245+
assert deployed_pipeline.status == "DEPLOYED"
246+
247+
# Step 3: Get first page of logs with small limit to test pagination
248+
first_page = await pipeline_resource.get_logs(pipeline_name=pipeline_name, limit=5)
249+
250+
# Verify the response structure
251+
assert isinstance(first_page, PaginatedResponse)
252+
assert isinstance(first_page.data, list)
253+
assert isinstance(first_page.has_more, bool)
254+
assert isinstance(first_page.total, int | type(None))
255+
256+
# Step 4: If there are more logs available, test cursor-based pagination
257+
if first_page.has_more and first_page.next_cursor:
258+
second_page = await pipeline_resource.get_logs(
259+
pipeline_name=pipeline_name, limit=5, after=first_page.next_cursor
260+
)
261+
262+
# Verify second page structure
263+
assert isinstance(second_page, PaginatedResponse)
264+
assert isinstance(second_page.data, list)
265+
266+
# Ensure we got different logs (no duplicates between pages)
267+
first_page_log_ids = {log.log_id for log in first_page.data}
268+
second_page_log_ids = {log.log_id for log in second_page.data}
269+
270+
# There should be no overlap between pages
271+
assert first_page_log_ids.isdisjoint(second_page_log_ids), "Found duplicate logs across pages"
272+
273+
# Step 5: Test async iteration over all logs
274+
all_logs_via_iteration = []
275+
async for log in first_page:
276+
all_logs_via_iteration.append(log)
277+
# Limit to avoid infinite loops in case of issues
278+
if len(all_logs_via_iteration) > 100:
279+
break
280+
281+
# Should have at least the logs from the first page
282+
assert len(all_logs_via_iteration) >= len(first_page.data)

test/unit/api/pipeline/test_pipeline_resource.py

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
DeepsetPipeline,
1313
LogLevel,
1414
PipelineLog,
15-
PipelineLogList,
1615
PipelineServiceLevel,
1716
PipelineValidationResult,
1817
)
@@ -795,7 +794,7 @@ async def test_get_logs_default_params(self) -> None:
795794
result = await resource.get_logs(pipeline_name="test-pipeline")
796795

797796
# Verify results
798-
assert isinstance(result, PipelineLogList)
797+
assert isinstance(result, PaginatedResponse)
799798
assert len(result.data) == 2
800799
assert isinstance(result.data[0], PipelineLog)
801800
assert result.data[0].log_id == "log1"
@@ -1008,6 +1007,85 @@ async def test_get_logs_preserves_extra_fields(self) -> None:
10081007
assert "custom_field" in result.data[0].extra_fields
10091008
assert result.data[0].extra_fields["custom_field"] == "custom_value"
10101009

1010+
@pytest.mark.asyncio
1011+
async def test_get_logs_with_pagination(self) -> None:
1012+
"""Test getting logs with pagination parameters."""
1013+
# Create sample logs
1014+
sample_logs = [
1015+
create_sample_log(log_id="log1", message="First log entry"),
1016+
create_sample_log(log_id="log2", message="Second log entry"),
1017+
]
1018+
1019+
# Create client with predefined response
1020+
client = DummyClient(
1021+
responses={
1022+
"test-workspace/pipelines/test-pipeline/logs": {
1023+
"data": sample_logs,
1024+
"has_more": True,
1025+
"total": 10,
1026+
}
1027+
}
1028+
)
1029+
1030+
# Create resource and call get_logs method with pagination
1031+
resource = PipelineResource(client=client, workspace="test-workspace")
1032+
result = await resource.get_logs(pipeline_name="test-pipeline", limit=5, after="some_cursor")
1033+
1034+
# Verify results
1035+
assert isinstance(result, PaginatedResponse)
1036+
assert len(result.data) == 2
1037+
assert result.data[0].log_id == "log1"
1038+
assert result.data[1].log_id == "log2"
1039+
assert result.has_more is True
1040+
assert result.total == 10
1041+
1042+
# Verify request
1043+
assert client.requests[0]["endpoint"] == "v1/workspaces/test-workspace/pipelines/test-pipeline/logs"
1044+
# Logs should use 'after' parameter (not 'before' like pipelines)
1045+
assert client.requests[0]["params"] == {
1046+
"limit": 5,
1047+
"filter": "origin eq 'querypipeline'",
1048+
"after": "some_cursor",
1049+
}
1050+
1051+
@pytest.mark.asyncio
1052+
async def test_get_logs_pagination_with_level_filter(self) -> None:
1053+
"""Test getting logs with both pagination and level filter."""
1054+
# Create sample error logs
1055+
sample_logs = [
1056+
create_sample_log(log_id="error1", message="First error", level="error"),
1057+
create_sample_log(log_id="error2", message="Second error", level="error"),
1058+
]
1059+
1060+
# Create client with predefined response
1061+
client = DummyClient(
1062+
responses={
1063+
"test-workspace/pipelines/test-pipeline/logs": {
1064+
"data": sample_logs,
1065+
"has_more": False,
1066+
"total": 2,
1067+
}
1068+
}
1069+
)
1070+
1071+
# Create resource and call get_logs method with level filter and pagination
1072+
resource = PipelineResource(client=client, workspace="test-workspace")
1073+
result = await resource.get_logs(
1074+
pipeline_name="test-pipeline", limit=10, level=LogLevel.ERROR, after="some_cursor"
1075+
)
1076+
1077+
# Verify results
1078+
assert len(result.data) == 2
1079+
assert all(log.level == "error" for log in result.data)
1080+
1081+
# Verify request with both level filter and cursor
1082+
expected_params = {
1083+
"limit": 10,
1084+
"filter": "level eq 'error' and origin eq 'querypipeline'",
1085+
"after": "some_cursor",
1086+
}
1087+
assert client.requests[0]["params"] == expected_params
1088+
10111089
@pytest.mark.asyncio
10121090
async def test_deploy_pipeline_success(self) -> None:
10131091
"""Test successful pipeline deployment."""

0 commit comments

Comments
 (0)