-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy path_exceptions.py
More file actions
151 lines (117 loc) · 4.42 KB
/
_exceptions.py
File metadata and controls
151 lines (117 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from grpc import StatusCode
from grpc.aio import AioRpcError as BaseAioRpcError
from grpc.aio import Metadata
from ._auth import BaseAuth
if TYPE_CHECKING:
# pylint: disable=cyclic-import
from ._client import StubType
from ._datasets.validation import DatasetValidationResult
from ._types.operation import OperationErrorInfo
class YCloudMLError(Exception):
pass
class UnknownEndpointError(YCloudMLError):
pass
class RunError(YCloudMLError):
def __init__(self, code: int, message: str, details: list[Any] | None, operation_id: str):
self.code = code
self.message = message
self.details = details or []
self.operation_id = operation_id
def __str__(self):
message = self.message or "<Empty message>"
message = f'Operation {self.operation_id} failed with message: {message} (code {self.code})'
message += '\n' + '\n'.join(repr(d) for d in self.details)
return message
@classmethod
def from_proro_status(cls, status: OperationErrorInfo, operation_id: str):
return cls(
code=status.code,
message=status.message,
details=list(status.details) if status.details else None,
operation_id=operation_id,
)
class AsyncOperationError(YCloudMLError):
pass
class WrongAsyncOperationStatusError(AsyncOperationError):
pass
class DatasetValidationError(AsyncOperationError):
def __init__(self, validation_result: DatasetValidationResult):
self._result = validation_result
errors_str = '\n'.join(str(error) for error in self.errors)
message = f"Dataset validation for dataset_id={self.dataset_id} failed with next errors:\n{errors_str}"
super().__init__(message)
@property
def errors(self):
return self._result.errors
@property
def dataset_id(self):
return self._result.dataset_id
class AioRpcError(BaseAioRpcError):
_initial_metadata: Metadata | None
_trailing_metadata: Metadata | None
def __init__(
self,
*args,
endpoint: str,
auth: BaseAuth | None,
stub_class: type[StubType],
**kwargs,
):
super().__init__(*args, **kwargs)
self._endpoint = endpoint
self._auth = auth
self._stub_class = stub_class
self._client_request_id: str
initial = self._initial_metadata
trailing = self._trailing_metadata
if (
initial is not None and not isinstance(initial, Metadata) or
trailing is not None and not isinstance(trailing, Metadata)
):
self._client_request_id = "grpc metadata was replaced with non-Metadata object"
else:
self._client_request_id = (
initial and initial.get('x-client-request-id') or
trailing and trailing.get('x-client-request-id') or
""
)
@classmethod
def from_base_rpc_error(
cls,
original: BaseAioRpcError,
endpoint: str,
auth: BaseAuth | None,
stub_class: type[StubType],
) -> AioRpcError:
return cls(
code=original.code(),
initial_metadata=original.initial_metadata(),
trailing_metadata=original.trailing_metadata(),
details=original.details(),
debug_error_string=original.debug_error_string(),
endpoint=endpoint,
auth=auth,
stub_class=stub_class,
).with_traceback(original.__traceback__)
def __str__(self):
parts = [
f"code = {self._code}",
f'details = "{self._details}"',
f'debug_error_string = "{self._debug_error_string}"',
f'endpoint = "{self._endpoint}"',
f'stub_class = {self._stub_class.__name__}'
]
if self._client_request_id:
parts.append(f'x-client-request-id = "{self._client_request_id}"')
if self._code == StatusCode.UNAUTHENTICATED:
auth = self._auth.__class__.__name__ if self._auth else None
parts.append(
f"auth_provider = {auth}"
)
body = '\n'.join(f'\t{part}' for part in parts)
return f"<{self.__class__.__name__} of RPC that terminated with:\n{body}\n>"
class TuningError(RunError):
def __str__(self):
return f'Tuning task {self.operation_id} failed'