-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprotocols.py
More file actions
133 lines (110 loc) · 4.36 KB
/
protocols.py
File metadata and controls
133 lines (110 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# SPDX-FileCopyrightText: 2025-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
from contextlib import AbstractAsyncContextManager
from types import TracebackType
from typing import TYPE_CHECKING, Any, Literal, Protocol, Self, TypeVar, overload
from deepset_mcp.api.transport import StreamingResponse, TransportResponse
if TYPE_CHECKING:
from deepset_mcp.api.custom_components.protocols import CustomComponentsProtocol
from deepset_mcp.api.haystack_service.protocols import HaystackServiceProtocol
from deepset_mcp.api.indexes.protocols import IndexResourceProtocol
from deepset_mcp.api.integrations.protocols import IntegrationResourceProtocol
from deepset_mcp.api.pipeline.protocols import PipelineResourceProtocol
from deepset_mcp.api.pipeline_template.protocols import PipelineTemplateResourceProtocol
from deepset_mcp.api.search_history.protocols import SearchHistoryResourceProtocol
from deepset_mcp.api.secrets.protocols import SecretResourceProtocol
from deepset_mcp.api.user.protocols import UserResourceProtocol
from deepset_mcp.api.workspace.protocols import WorkspaceResourceProtocol
T = TypeVar("T")
class AsyncClientProtocol(Protocol):
"""Protocol defining the implementation for AsyncClient."""
@overload
async def request(
self,
endpoint: str,
*,
response_type: type[T],
method: str = "GET",
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
timeout: float | None | Literal["config"] = "config",
**kwargs: Any,
) -> TransportResponse[T]: ...
@overload
async def request(
self,
endpoint: str,
*,
response_type: None = None,
method: str = "GET",
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
timeout: float | None | Literal["config"] = "config",
**kwargs: Any,
) -> TransportResponse[Any]: ...
async def request(
self,
endpoint: str,
*,
response_type: type[T] | None = None,
method: str = "GET",
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
timeout: float | None | Literal["config"] = "config",
**kwargs: Any,
) -> TransportResponse[Any]:
"""Make a request to the API."""
...
def stream_request(
self,
endpoint: str,
*,
method: str = "POST",
data: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
**kwargs: Any,
) -> AbstractAsyncContextManager[StreamingResponse]:
"""Make a streaming request to the API."""
...
async def close(self) -> None:
"""Close underlying transport resources."""
...
async def __aenter__(self) -> Self:
"""Enter the AsyncContextManager."""
...
async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> bool:
"""Exit the AsyncContextmanager and clean up resources."""
...
def pipelines(self, workspace: str) -> "PipelineResourceProtocol":
"""Access pipelines in the specified workspace."""
...
def haystack_service(self) -> "HaystackServiceProtocol":
"""Access the Haystack service."""
...
def pipeline_templates(self, workspace: str) -> "PipelineTemplateResourceProtocol":
"""Access pipeline templates in the specified workspace."""
...
def indexes(self, workspace: str) -> "IndexResourceProtocol":
"""Access indexes in the specified workspace."""
...
def custom_components(self, workspace: str) -> "CustomComponentsProtocol":
"""Access custom components in the specified workspace."""
...
def users(self) -> "UserResourceProtocol":
"""Access users."""
...
def secrets(self) -> "SecretResourceProtocol":
"""Access secrets."""
...
def workspaces(self) -> "WorkspaceResourceProtocol":
"""Access workspaces."""
...
def integrations(self) -> "IntegrationResourceProtocol":
"""Access integrations."""
...
def search_history(self, workspace: str) -> "SearchHistoryResourceProtocol":
"""Access search history in the specified workspace."""
...