Skip to content

Commit

Permalink
fix(engine): deck configuration pipette movement related errors (#14067)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbleon95 authored Nov 30, 2023
1 parent 9dd2d1f commit 9d4cf06
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from typing import TYPE_CHECKING, Optional, Type
from typing_extensions import Literal

from ..errors import LocationNotAccessibleByPipetteError
from ..types import DeckPoint, AddressableOffsetVector
from ..resources import fixture_validation
from .pipetting_common import (
PipetteIdMixin,
MovementMixin,
Expand Down Expand Up @@ -71,6 +73,11 @@ async def execute(
self, params: MoveToAddressableAreaParams
) -> MoveToAddressableAreaResult:
"""Move the requested pipette to the requested addressable area."""
if fixture_validation.is_staging_slot(params.addressableAreaName):
raise LocationNotAccessibleByPipetteError(
f"Cannot move pipette to staging slot {params.addressableAreaName}"
)

x, y, z = await self._movement.move_to_addressable_area(
pipette_id=params.pipetteId,
addressable_area_name=params.addressableAreaName,
Expand Down
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_engine/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
LabwareMovementNotAllowedError,
LabwareIsNotAllowedInLocationError,
LocationIsOccupiedError,
LocationNotAccessibleByPipetteError,
LocationIsStagingSlotError,
InvalidAxisForRobotType,
NotSupportedOnRobotType,
)
Expand Down Expand Up @@ -126,6 +128,8 @@
"LabwareMovementNotAllowedError",
"LabwareIsNotAllowedInLocationError",
"LocationIsOccupiedError",
"LocationNotAccessibleByPipetteError",
"LocationIsStagingSlotError",
"InvalidAxisForRobotType",
"NotSupportedOnRobotType",
# error occurrence models
Expand Down
26 changes: 26 additions & 0 deletions api/src/opentrons/protocol_engine/errors/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,32 @@ def __init__(
super().__init__(ErrorCodes.GENERAL_ERROR, message, details, wrapping)


class LocationNotAccessibleByPipetteError(ProtocolEngineError):
"""Raised when attempting to move pipette to an inaccessible location."""

def __init__(
self,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
wrapping: Optional[Sequence[EnumeratedError]] = None,
) -> None:
"""Build a LocationNotAccessibleByPipetteError."""
super().__init__(ErrorCodes.GENERAL_ERROR, message, details, wrapping)


class LocationIsStagingSlotError(ProtocolEngineError):
"""Raised when referencing a labware on a staging slot when trying to get standard deck slot."""

def __init__(
self,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
wrapping: Optional[Sequence[EnumeratedError]] = None,
) -> None:
"""Build a LocationIsStagingSlotError."""
super().__init__(ErrorCodes.GENERAL_ERROR, message, details, wrapping)


class FirmwareUpdateRequired(ProtocolEngineError):
"""Raised when the firmware needs to be updated."""

Expand Down
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_engine/execution/movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ async def move_to_well(
speed: Optional[float] = None,
) -> Point:
"""Move to a specific well."""
self._state_store.labware.raise_if_labware_inaccessible_by_pipette(
labware_id=labware_id
)

self._state_store.labware.raise_if_labware_has_labware_on_top(
labware_id=labware_id
)
Expand Down
25 changes: 22 additions & 3 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ def get_min_travel_z(

def get_labware_parent_nominal_position(self, labware_id: str) -> Point:
"""Get the position of the labware's uncalibrated parent slot (deck, module, or another labware)."""
slot_name = self.get_ancestor_slot_name(labware_id)
slot_pos = self._addressable_areas.get_addressable_area_position(slot_name.id)
try:
slot_name = self.get_ancestor_slot_name(labware_id).id
except errors.LocationIsStagingSlotError:
slot_name = self._get_staging_slot_name(labware_id)
slot_pos = self._addressable_areas.get_addressable_area_position(slot_name)
labware_data = self._labware.get(labware_id)
offset = self._get_labware_position_offset(labware_id, labware_data.location)

Expand Down Expand Up @@ -459,6 +462,22 @@ def get_checked_tip_drop_location(
),
)

# TODO(jbl 11-30-2023) fold this function into get_ancestor_slot_name see RSS-411
def _get_staging_slot_name(self, labware_id: str) -> str:
"""Get the staging slot name that the labware is on."""
labware_location = self._labware.get(labware_id).location
if isinstance(labware_location, OnLabwareLocation):
below_labware_id = labware_location.labwareId
return self._get_staging_slot_name(below_labware_id)
elif isinstance(
labware_location, AddressableAreaLocation
) and fixture_validation.is_staging_slot(labware_location.addressableAreaName):
return labware_location.addressableAreaName
else:
raise ValueError(
"Cannot get staging slot name for labware not on staging slot."
)

def get_ancestor_slot_name(self, labware_id: str) -> DeckSlotName:
"""Get the slot name of the labware or the module that the labware is on."""
labware = self._labware.get(labware_id)
Expand All @@ -477,7 +496,7 @@ def get_ancestor_slot_name(self, labware_id: str) -> DeckSlotName:
# TODO we might want to eventually return some sort of staging slot name when we're ready to work through
# the linting nightmare it will create
if fixture_validation.is_staging_slot(area_name):
raise ValueError(
raise errors.LocationIsStagingSlotError(
"Cannot get ancestor slot name for labware on staging slot."
)
slot_name = DeckSlotName.from_primitive(area_name)
Expand Down
21 changes: 20 additions & 1 deletion api/src/opentrons/protocol_engine/state/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,26 @@ def get_fixed_trash_id(self) -> Optional[str]:

def is_fixed_trash(self, labware_id: str) -> bool:
"""Check if labware is fixed trash."""
return self.get_fixed_trash_id() == labware_id
return self.get_has_quirk(labware_id, "fixedTrash")

def raise_if_labware_inaccessible_by_pipette(self, labware_id: str) -> None:
"""Raise an error if the specified location cannot be reached via a pipette."""
labware = self.get(labware_id)
labware_location = labware.location
if isinstance(labware_location, OnLabwareLocation):
return self.raise_if_labware_inaccessible_by_pipette(
labware_location.labwareId
)
elif isinstance(labware_location, AddressableAreaLocation):
if fixture_validation.is_staging_slot(labware_location.addressableAreaName):
raise errors.LocationNotAccessibleByPipetteError(
f"Cannot move pipette to {labware.loadName},"
f" labware is on staging slot {labware_location.addressableAreaName}"
)
elif labware_location == OFF_DECK_LOCATION:
raise errors.LocationNotAccessibleByPipetteError(
f"Cannot move pipette to {labware.loadName}, labware is off-deck."
)

def raise_if_labware_in_location(
self,
Expand Down
62 changes: 62 additions & 0 deletions api/tests/opentrons/protocol_engine/state/test_geometry_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
ModuleOffsetVector,
ModuleOffsetData,
LoadedLabware,
Expand Down Expand Up @@ -529,6 +530,67 @@ def test_get_all_labware_highest_z(
assert all_z == max(plate_z, reservoir_z)


def test_get_all_labware_highest_z_with_staging_area(
decoy: Decoy,
well_plate_def: LabwareDefinition,
falcon_tuberack_def: LabwareDefinition,
labware_view: LabwareView,
module_view: ModuleView,
addressable_area_view: AddressableAreaView,
subject: GeometryView,
) -> None:
"""It should get the highest Z amongst all labware including staging area."""
plate = LoadedLabware(
id="plate-id",
loadName="plate-load-name",
definitionUri="plate-definition-uri",
location=DeckSlotLocation(slotName=DeckSlotName.SLOT_3),
offsetId="plate-offset-id",
)
staging_lw = LoadedLabware(
id="staging-id",
loadName="staging-load-name",
definitionUri="staging-definition-uri",
location=AddressableAreaLocation(addressableAreaName="D4"),
offsetId="plate-offset-id",
)

plate_offset = LabwareOffsetVector(x=1, y=-2, z=3)
staging_lw_offset = LabwareOffsetVector(x=1, y=-2, z=3)

decoy.when(module_view.get_all()).then_return([])
decoy.when(addressable_area_view.get_all()).then_return([])

decoy.when(labware_view.get_all()).then_return([plate, staging_lw])
decoy.when(labware_view.get("plate-id")).then_return(plate)
decoy.when(labware_view.get("staging-id")).then_return(staging_lw)

decoy.when(labware_view.get_definition("plate-id")).then_return(well_plate_def)
decoy.when(labware_view.get_definition("staging-id")).then_return(
falcon_tuberack_def # Something tall.
)

decoy.when(labware_view.get_labware_offset_vector("plate-id")).then_return(
plate_offset
)
decoy.when(labware_view.get_labware_offset_vector("staging-id")).then_return(
staging_lw_offset
)

decoy.when(
addressable_area_view.get_addressable_area_position(DeckSlotName.SLOT_3.id)
).then_return(Point(1, 2, 3))
decoy.when(addressable_area_view.get_addressable_area_position("D4")).then_return(
Point(4, 5, 6)
)

staging_z = subject.get_labware_highest_z("staging-id")
all_z = subject.get_all_labware_highest_z()

# Should exclude the off-deck plate.
assert all_z == staging_z


def test_get_all_labware_highest_z_with_modules(
decoy: Decoy,
labware_view: LabwareView,
Expand Down
62 changes: 62 additions & 0 deletions api/tests/opentrons/protocol_engine/state/test_labware_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ModuleLocation,
OnLabwareLocation,
LabwareLocation,
AddressableAreaLocation,
OFF_DECK_LOCATION,
OverlapOffset,
LabwareMovementOffsetData,
Expand Down Expand Up @@ -1196,6 +1197,67 @@ def test_get_all_labware_definition_empty() -> None:
assert result == []


def test_raise_if_labware_inaccessible_by_pipette_staging_area() -> None:
"""It should raise if the labware is on a staging slot."""
subject = get_labware_view(
labware_by_id={
"labware-id": LoadedLabware(
id="labware-id",
loadName="test",
definitionUri="def-uri",
location=AddressableAreaLocation(addressableAreaName="B4"),
)
},
)

with pytest.raises(
errors.LocationNotAccessibleByPipetteError, match="on staging slot"
):
subject.raise_if_labware_inaccessible_by_pipette("labware-id")


def test_raise_if_labware_inaccessible_by_pipette_off_deck() -> None:
"""It should raise if the labware is off-deck."""
subject = get_labware_view(
labware_by_id={
"labware-id": LoadedLabware(
id="labware-id",
loadName="test",
definitionUri="def-uri",
location=OFF_DECK_LOCATION,
)
},
)

with pytest.raises(errors.LocationNotAccessibleByPipetteError, match="off-deck"):
subject.raise_if_labware_inaccessible_by_pipette("labware-id")


def test_raise_if_labware_inaccessible_by_pipette_stacked_labware_on_staging_area() -> None:
"""It should raise if the labware is stacked on a staging slot."""
subject = get_labware_view(
labware_by_id={
"labware-id": LoadedLabware(
id="labware-id",
loadName="test",
definitionUri="def-uri",
location=OnLabwareLocation(labwareId="lower-labware-id"),
),
"lower-labware-id": LoadedLabware(
id="lower-labware-id",
loadName="test",
definitionUri="def-uri",
location=AddressableAreaLocation(addressableAreaName="B4"),
),
},
)

with pytest.raises(
errors.LocationNotAccessibleByPipetteError, match="on staging slot"
):
subject.raise_if_labware_inaccessible_by_pipette("labware-id")


def test_raise_if_labware_cannot_be_stacked_is_adapter() -> None:
"""It should raise if the labware trying to be stacked is an adapter."""
subject = get_labware_view()
Expand Down

0 comments on commit 9d4cf06

Please sign in to comment.