1
1
"""Dispense command request, result, and implementation models."""
2
2
from __future__ import annotations
3
- from typing import TYPE_CHECKING , Optional , Type
3
+ from typing import TYPE_CHECKING , Optional , Type , Union
4
4
from typing_extensions import Literal
5
5
6
+ from opentrons_shared_data .errors .exceptions import PipetteOverpressureError
7
+
6
8
from pydantic import Field
7
9
8
10
from ..types import DeckPoint
13
15
WellLocationMixin ,
14
16
BaseLiquidHandlingResult ,
15
17
DestinationPositionResult ,
18
+ OverpressureError ,
19
+ OverpressureErrorInternalData ,
20
+ )
21
+ from .command import (
22
+ AbstractCommandImpl ,
23
+ BaseCommand ,
24
+ BaseCommandCreate ,
25
+ DefinedErrorData ,
26
+ SuccessData ,
16
27
)
17
- from .command import AbstractCommandImpl , BaseCommand , BaseCommandCreate , SuccessData
18
28
from ..errors .error_occurrence import ErrorOccurrence
19
29
20
30
if TYPE_CHECKING :
21
31
from ..execution import MovementHandler , PipettingHandler
32
+ from ..resources import ModelUtils
22
33
23
34
24
35
DispenseCommandType = Literal ["dispense" ]
@@ -41,41 +52,68 @@ class DispenseResult(BaseLiquidHandlingResult, DestinationPositionResult):
41
52
pass
42
53
43
54
44
- class DispenseImplementation (
45
- AbstractCommandImpl [DispenseParams , SuccessData [DispenseResult , None ]]
46
- ):
55
+ _ExecuteReturn = Union [
56
+ SuccessData [DispenseResult , None ],
57
+ DefinedErrorData [OverpressureError , OverpressureErrorInternalData ],
58
+ ]
59
+
60
+
61
+ class DispenseImplementation (AbstractCommandImpl [DispenseParams , _ExecuteReturn ]):
47
62
"""Dispense command implementation."""
48
63
49
64
def __init__ (
50
- self , movement : MovementHandler , pipetting : PipettingHandler , ** kwargs : object
65
+ self ,
66
+ movement : MovementHandler ,
67
+ pipetting : PipettingHandler ,
68
+ model_utils : ModelUtils ,
69
+ ** kwargs : object ,
51
70
) -> None :
52
71
self ._movement = movement
53
72
self ._pipetting = pipetting
73
+ self ._model_utils = model_utils
54
74
55
- async def execute (
56
- self , params : DispenseParams
57
- ) -> SuccessData [DispenseResult , None ]:
75
+ async def execute (self , params : DispenseParams ) -> _ExecuteReturn :
58
76
"""Move to and dispense to the requested well."""
59
77
position = await self ._movement .move_to_well (
60
78
pipette_id = params .pipetteId ,
61
79
labware_id = params .labwareId ,
62
80
well_name = params .wellName ,
63
81
well_location = params .wellLocation ,
64
82
)
65
- volume = await self ._pipetting .dispense_in_place (
66
- pipette_id = params .pipetteId ,
67
- volume = params .volume ,
68
- flow_rate = params .flowRate ,
69
- push_out = params .pushOut ,
70
- )
71
-
72
- return SuccessData (
73
- public = DispenseResult (
74
- volume = volume ,
75
- position = DeckPoint (x = position .x , y = position .y , z = position .z ),
76
- ),
77
- private = None ,
78
- )
83
+ try :
84
+ volume = await self ._pipetting .dispense_in_place (
85
+ pipette_id = params .pipetteId ,
86
+ volume = params .volume ,
87
+ flow_rate = params .flowRate ,
88
+ push_out = params .pushOut ,
89
+ )
90
+ except PipetteOverpressureError as e :
91
+ return DefinedErrorData (
92
+ public = OverpressureError (
93
+ id = self ._model_utils .generate_id (),
94
+ createdAt = self ._model_utils .get_timestamp (),
95
+ wrappedErrors = [
96
+ ErrorOccurrence .from_failed (
97
+ id = self ._model_utils .generate_id (),
98
+ createdAt = self ._model_utils .get_timestamp (),
99
+ error = e ,
100
+ )
101
+ ],
102
+ ),
103
+ private = OverpressureErrorInternalData (
104
+ position = DeckPoint .construct (
105
+ x = position .x , y = position .y , z = position .z
106
+ )
107
+ ),
108
+ )
109
+ else :
110
+ return SuccessData (
111
+ public = DispenseResult (
112
+ volume = volume ,
113
+ position = DeckPoint (x = position .x , y = position .y , z = position .z ),
114
+ ),
115
+ private = None ,
116
+ )
79
117
80
118
81
119
class Dispense (BaseCommand [DispenseParams , DispenseResult , ErrorOccurrence ]):
0 commit comments