-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathresource.py
More file actions
154 lines (117 loc) · 5.6 KB
/
resource.py
File metadata and controls
154 lines (117 loc) · 5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# SPDX-FileCopyrightText: 2025-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
from urllib.parse import quote
from deepset_mcp.api.exceptions import UnexpectedAPIError
from deepset_mcp.api.indexes.models import Index, IndexList
from deepset_mcp.api.indexes.protocols import IndexResourceProtocol
from deepset_mcp.api.pipeline.models import PipelineValidationResult, ValidationError
from deepset_mcp.api.protocols import AsyncClientProtocol
from deepset_mcp.api.transport import raise_for_status
class IndexResource(IndexResourceProtocol):
"""Resource for interacting with deepset indexes."""
def __init__(self, client: AsyncClientProtocol, workspace: str) -> None:
"""Initialize the index resource.
:param client: The async REST client.
:param workspace: The workspace to use.
"""
self._client = client
self._workspace = workspace
async def list(self, limit: int = 10, page_number: int = 1) -> IndexList:
"""List all indexes.
:param limit: Maximum number of indexes to return.
:param page_number: Page number for pagination.
:returns: List of indexes.
"""
params = {
"limit": limit,
"page_number": page_number,
}
response = await self._client.request(
f"/v1/workspaces/{quote(self._workspace, safe='')}/indexes", params=params
)
raise_for_status(response)
return IndexList.model_validate(response.json)
async def get(self, index_name: str) -> Index:
"""Get a specific index.
:param index_name: Name of the index.
:returns: Index details.
"""
response = await self._client.request(
f"/v1/workspaces/{quote(self._workspace, safe='')}/indexes/{quote(index_name, safe='')}"
)
raise_for_status(response)
return Index.model_validate(response.json)
async def create(self, index_name: str, yaml_config: str, description: str | None = None) -> Index:
"""Create a new index with the given name and configuration.
:param index_name: Name of the index
:param yaml_config: YAML configuration for the index
:param description: Optional description for the index
:returns: Created index details
"""
data = {
"name": index_name,
"config_yaml": yaml_config,
}
if description is not None:
data["description"] = description
response = await self._client.request(
f"v1/workspaces/{quote(self._workspace, safe='')}/indexes", method="POST", data=data
)
raise_for_status(response)
return Index.model_validate(response.json)
async def update(
self, index_name: str, updated_index_name: str | None = None, yaml_config: str | None = None
) -> Index:
"""Update name and/or configuration of an existing index.
:param index_name: Name of the index to update
:param updated_index_name: Optional new name for the index
:param yaml_config: Optional new YAML configuration
:returns: Updated index details
"""
data = {}
if updated_index_name is not None:
data["name"] = updated_index_name
if yaml_config is not None:
data["config_yaml"] = yaml_config
if not data:
raise ValueError("At least one of updated_index_name or yaml_config must be provided")
response = await self._client.request(
f"/v1/workspaces/{quote(self._workspace, safe='')}/indexes/{quote(index_name, safe='')}",
method="PATCH",
data=data,
)
raise_for_status(response)
return Index.model_validate(response.json)
async def delete(self, index_name: str) -> None:
"""Delete an index.
:param index_name: Name of the index to delete.
"""
response = await self._client.request(
f"/v1/workspaces/{quote(self._workspace, safe='')}/indexes/{quote(index_name, safe='')}", method="DELETE"
)
raise_for_status(response)
async def deploy(self, index_name: str) -> PipelineValidationResult:
"""Deploy an index.
:param index_name: Name of the index to deploy.
:returns: PipelineValidationResult containing deployment status and any errors.
:raises UnexpectedAPIError: If the API returns an unexpected status code.
"""
resp = await self._client.request(
endpoint=f"v1/workspaces/{quote(self._workspace, safe='')}/indexes/{quote(index_name, safe='')}/deploy",
method="POST",
)
# If successful (status 200), the deployment was successful
if resp.success:
return PipelineValidationResult(valid=True)
# Handle validation errors (422)
if resp.status_code == 422 and resp.json is not None and isinstance(resp.json, dict) and "details" in resp.json:
errors = [ValidationError(code=error["code"], message=error["message"]) for error in resp.json["details"]]
return PipelineValidationResult(valid=False, errors=errors)
# Handle other 4xx errors (400, 404, 424)
if 400 <= resp.status_code < 500:
# For non-validation errors, create a generic error
error_message = resp.text if resp.text else f"HTTP {resp.status_code} error"
errors = [ValidationError(code="DEPLOYMENT_ERROR", message=error_message)]
return PipelineValidationResult(valid=False, errors=errors)
raise UnexpectedAPIError(status_code=resp.status_code, message=resp.text, detail=resp.json)