-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathresource.py
More file actions
127 lines (102 loc) · 4.29 KB
/
resource.py
File metadata and controls
127 lines (102 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# SPDX-FileCopyrightText: 2025-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
"""Resource implementation for search history API."""
from typing import TYPE_CHECKING
from urllib.parse import quote
from deepset_mcp.api.search_history.models import SearchHistoryEntry
from deepset_mcp.api.search_history.protocols import SearchHistoryResourceProtocol
from deepset_mcp.api.shared_models import PaginatedResponse
from deepset_mcp.api.transport import raise_for_status
if TYPE_CHECKING:
from deepset_mcp.api.protocols import AsyncClientProtocol
class SearchHistoryResource(SearchHistoryResourceProtocol):
"""Manages interactions with the deepset search history API."""
def __init__(self, client: "AsyncClientProtocol", workspace: str) -> None:
"""Initialize the search history resource.
:param client: The async REST client.
:param workspace: The workspace to use.
"""
self._client = client
self._workspace = workspace
def _base_path(self) -> str:
return f"v1/workspaces/{quote(self._workspace, safe='')}/search_history"
def _pipeline_path(self, pipeline_name: str) -> str:
return (
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/search_history"
)
async def list(self, limit: int = 10, after: str | None = None) -> PaginatedResponse[SearchHistoryEntry]:
"""List search history entries in the workspace.
:param limit: Maximum number of entries to return per page.
:param after: Cursor to fetch the next page of results.
:returns: Paginated response of search history entries.
"""
params: dict[str, str | int] = {"limit": limit}
if after is not None:
params["after"] = after
resp = await self._client.request(
endpoint=self._base_path(),
method="GET",
params=params,
timeout=70.0,
)
raise_for_status(resp)
if resp.json is None:
return PaginatedResponse(
data=[],
has_more=False,
total=0,
next_cursor=None,
)
# API may return paginated shape: { "data": [...], "has_more": bool, "total": int }
data = resp.json if isinstance(resp.json, dict) else {"data": resp.json}
items = data.get("data", [])
if not isinstance(items, list):
items = []
return PaginatedResponse[SearchHistoryEntry].create_with_cursor_field(
{
"data": items,
"has_more": data.get("has_more", False),
"total": data.get("total"),
},
"created_at",
)
async def list_pipeline(
self, pipeline_name: str, limit: int = 10, after: str | None = None
) -> PaginatedResponse[SearchHistoryEntry]:
"""List search history entries for a specific pipeline with pagination.
Uses the pipeline search history archive endpoint (full history, most recent first).
:param pipeline_name: Name of the pipeline.
:param limit: Maximum number of entries to return per page.
:param after: Cursor to fetch the next page of results.
:returns: Paginated response of search history entries.
"""
params: dict[str, str | int] = {"limit": limit}
if after is not None:
params["after"] = after
resp = await self._client.request(
endpoint=f"{self._pipeline_path(pipeline_name)}_archive",
method="GET",
params=params,
timeout=70.0,
)
raise_for_status(resp)
if resp.json is None:
return PaginatedResponse(
data=[],
has_more=False,
total=0,
next_cursor=None,
)
data = resp.json if isinstance(resp.json, dict) else {"data": resp.json}
items = data.get("data", [])
if not isinstance(items, list):
items = []
return PaginatedResponse[SearchHistoryEntry].create_with_cursor_field(
{
"data": items,
"has_more": data.get("has_more", False),
"total": data.get("total"),
},
"created_at",
)