1
1
"""Drop tip in place command request, result, and implementation models."""
2
2
from __future__ import annotations
3
3
4
- from typing import TYPE_CHECKING , Optional , Type , Any
4
+ from typing import TYPE_CHECKING , Optional , Type , Any , Union
5
5
6
6
from pydantic import Field , BaseModel
7
7
from pydantic .json_schema import SkipJsonSchema
8
8
from typing_extensions import Literal
9
9
10
+ from opentrons_shared_data .errors .exceptions import (
11
+ PipetteOverpressureError ,
12
+ StallOrCollisionDetectedError ,
13
+ )
14
+
10
15
from .command import (
11
16
AbstractCommandImpl ,
12
17
BaseCommand ,
13
18
BaseCommandCreate ,
14
19
DefinedErrorData ,
15
20
SuccessData ,
16
21
)
17
- from .pipetting_common import PipetteIdMixin , TipPhysicallyAttachedError
22
+ from .movement_common import StallOrCollisionError
23
+ from .pipetting_common import (
24
+ PipetteIdMixin ,
25
+ TipPhysicallyAttachedError ,
26
+ OverpressureError ,
27
+ )
18
28
from ..errors .exceptions import TipAttachedError
19
29
from ..errors .error_occurrence import ErrorOccurrence
20
30
from ..resources .model_utils import ModelUtils
@@ -51,9 +61,12 @@ class DropTipInPlaceResult(BaseModel):
51
61
pass
52
62
53
63
54
- _ExecuteReturn = (
55
- SuccessData [DropTipInPlaceResult ] | DefinedErrorData [TipPhysicallyAttachedError ]
56
- )
64
+ _ExecuteReturn = Union [
65
+ SuccessData [DropTipInPlaceResult ]
66
+ | DefinedErrorData [TipPhysicallyAttachedError ]
67
+ | DefinedErrorData [OverpressureError ]
68
+ | DefinedErrorData [StallOrCollisionError ]
69
+ ]
57
70
58
71
59
72
class DropTipInPlaceImplementation (
@@ -105,6 +118,50 @@ async def execute(self, params: DropTipInPlaceParams) -> _ExecuteReturn:
105
118
state_update = state_update ,
106
119
state_update_if_false_positive = state_update_if_false_positive ,
107
120
)
121
+ except PipetteOverpressureError as exception :
122
+ state_update_if_false_positive = update_types .StateUpdate ()
123
+ state_update_if_false_positive .update_pipette_tip_state (
124
+ pipette_id = params .pipetteId , tip_geometry = None
125
+ )
126
+ state_update .set_fluid_unknown (pipette_id = params .pipetteId )
127
+ return DefinedErrorData (
128
+ public = OverpressureError (
129
+ id = self ._model_utils .generate_id (),
130
+ createdAt = self ._model_utils .get_timestamp (),
131
+ wrappedErrors = [
132
+ ErrorOccurrence .from_failed (
133
+ id = self ._model_utils .generate_id (),
134
+ createdAt = self ._model_utils .get_timestamp (),
135
+ error = exception ,
136
+ )
137
+ ],
138
+ errorInfo = {"retryLocation" : retry_location },
139
+ ),
140
+ state_update = state_update ,
141
+ state_update_if_false_positive = state_update_if_false_positive ,
142
+ )
143
+ except StallOrCollisionDetectedError as exception :
144
+ state_update_if_false_positive = update_types .StateUpdate ()
145
+ state_update_if_false_positive .update_pipette_tip_state (
146
+ pipette_id = params .pipetteId , tip_geometry = None
147
+ )
148
+ state_update .set_fluid_unknown (pipette_id = params .pipetteId )
149
+ return DefinedErrorData (
150
+ public = StallOrCollisionError (
151
+ id = self ._model_utils .generate_id (),
152
+ createdAt = self ._model_utils .get_timestamp (),
153
+ wrappedErrors = [
154
+ ErrorOccurrence .from_failed (
155
+ id = self ._model_utils .generate_id (),
156
+ createdAt = self ._model_utils .get_timestamp (),
157
+ error = exception ,
158
+ )
159
+ ],
160
+ errorInfo = {"retryLocation" : retry_location },
161
+ ),
162
+ state_update = state_update ,
163
+ state_update_if_false_positive = state_update_if_false_positive ,
164
+ )
108
165
else :
109
166
state_update .set_fluid_unknown (pipette_id = params .pipetteId )
110
167
state_update .update_pipette_tip_state (
@@ -114,7 +171,11 @@ async def execute(self, params: DropTipInPlaceParams) -> _ExecuteReturn:
114
171
115
172
116
173
class DropTipInPlace (
117
- BaseCommand [DropTipInPlaceParams , DropTipInPlaceResult , TipPhysicallyAttachedError ]
174
+ BaseCommand [
175
+ DropTipInPlaceParams ,
176
+ DropTipInPlaceResult ,
177
+ TipPhysicallyAttachedError | OverpressureError | StallOrCollisionError ,
178
+ ]
118
179
):
119
180
"""Drop tip in place command model."""
120
181
0 commit comments