Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api, shared-data): Move lid command implementation #17259

Merged
merged 14 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/src/opentrons/legacy_commands/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,18 @@ def stringify_labware_movement_command(
destination_text = _stringify_labware_movement_location(destination)
gripper_text = " with gripper" if use_gripper else ""
return f"Moving {source_labware_text} to {destination_text}{gripper_text}"


def stringify_lid_movement_command(
source: Union[
DeckLocation, OffDeckType, Labware, ModuleContext, WasteChute, TrashBin
],
destination: Union[
DeckLocation, OffDeckType, Labware, ModuleContext, WasteChute, TrashBin
],
use_gripper: bool,
) -> str:
source_labware_text = _stringify_labware_movement_location(source)
destination_text = _stringify_labware_movement_location(destination)
gripper_text = " with gripper" if use_gripper else ""
return f"Moving lid from {source_labware_text} to {destination_text}{gripper_text}"
7 changes: 7 additions & 0 deletions api/src/opentrons/legacy_commands/protocol_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,10 @@ def move_labware(text: str) -> command_types.MoveLabwareCommand:
"name": command_types.MOVE_LABWARE,
"payload": {"text": text},
}


def move_lid(text: str) -> command_types.MoveLidCommand:
return {
"name": command_types.MOVE_LID,
"payload": {"text": text},
}
12 changes: 12 additions & 0 deletions api/src/opentrons/legacy_commands/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
RESUME: Final = "command.RESUME"
COMMENT: Final = "command.COMMENT"
MOVE_LABWARE: Final = "command.MOVE_LABWARE"
MOVE_LID: Final = "command.MOVE_LID"

# Pipette #

Expand Down Expand Up @@ -540,6 +541,15 @@ class MoveLabwareCommand(TypedDict):
payload: MoveLabwareCommandPayload


class MoveLidCommandPayload(TextOnlyPayload):
pass


class MoveLidCommand(TypedDict):
name: Literal["command.MOVE_LID"]
payload: MoveLidCommandPayload


Command = Union[
DropTipCommand,
DropTipInDisposalLocationCommand,
Expand Down Expand Up @@ -588,6 +598,7 @@ class MoveLabwareCommand(TypedDict):
MoveToCommand,
MoveToDisposalLocationCommand,
MoveLabwareCommand,
MoveLidCommand,
]


Expand Down Expand Up @@ -637,6 +648,7 @@ class MoveLabwareCommand(TypedDict):
MoveToCommandPayload,
MoveToDisposalLocationCommandPayload,
MoveLabwareCommandPayload,
MoveLidCommandPayload,
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
OnLabwareLocation,
AddressableAreaLocation,
OFF_DECK_LOCATION,
INVALIDATED_LOCATION,
)
from opentrons.protocol_engine.errors.exceptions import LabwareNotLoadedOnModuleError
from opentrons.types import DeckSlotName, StagingSlotName, Point
Expand Down Expand Up @@ -245,7 +246,10 @@ def _map_labware(
# TODO(jbl 2023-06-08) check if we need to do any logic here or if this is correct
return None

elif location_from_engine == OFF_DECK_LOCATION:
elif (
location_from_engine == OFF_DECK_LOCATION
or location_from_engine == INVALIDATED_LOCATION
):
# This labware is off-deck. Exclude it from conflict checking.
# todo(mm, 2023-02-23): Move this logic into wrapped_deck_conflict.
return None
Expand Down
141 changes: 141 additions & 0 deletions api/src/opentrons/protocol_api/core/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
)
from .exceptions import InvalidModuleLocationError
from . import load_labware_params, deck_conflict, overlap_versions
from opentrons.protocol_engine.resources import labware_validation

if TYPE_CHECKING:
from ...labware import Labware
Expand Down Expand Up @@ -442,6 +443,146 @@ def move_labware(
existing_module_ids=list(self._module_cores_by_id.keys()),
)

def move_lid( # noqa: C901
self,
source_location: Union[DeckSlotName, StagingSlotName, LabwareCore],
new_location: Union[
DeckSlotName,
StagingSlotName,
LabwareCore,
OffDeckType,
WasteChute,
TrashBin,
],
use_gripper: bool,
pause_for_manual_move: bool,
pick_up_offset: Optional[Tuple[float, float, float]],
drop_offset: Optional[Tuple[float, float, float]],
) -> LabwareCore | None:
"""Move the given lid to a new location."""
if use_gripper:
strategy = LabwareMovementStrategy.USING_GRIPPER
elif pause_for_manual_move:
strategy = LabwareMovementStrategy.MANUAL_MOVE_WITH_PAUSE
else:
strategy = LabwareMovementStrategy.MANUAL_MOVE_WITHOUT_PAUSE

if isinstance(source_location, DeckSlotName) or isinstance(
source_location, StagingSlotName
):
# Find the source labware at the provided deck slot
labware_in_slot = self._engine_client.state.labware.get_by_slot(
source_location
)
if labware_in_slot is None:
raise LabwareNotLoadedOnLabwareError(
"Lid cannot be loaded on non-labware position."
)
else:
labware = LabwareCore(labware_in_slot.id, self._engine_client)
else:
labware = source_location

# if this is a labware stack, we need to find the labware at the top of the stack
if labware_validation.is_lid_stack(labware.load_name):
lid_id = self._engine_client.state.labware.get_highest_child_labware(
labware.labware_id
)
# if this is a labware with a lid, we just need to find its lid_id
else:
lid = self._engine_client.state.labware.get_lid_by_labware_id(
labware.labware_id
)
if lid is not None:
lid_id = lid.id
else:
raise ValueError("Cannot move a lid off of a labware with no lid.")

_pick_up_offset = (
LabwareOffsetVector(
x=pick_up_offset[0], y=pick_up_offset[1], z=pick_up_offset[2]
)
if pick_up_offset
else None
)
_drop_offset = (
LabwareOffsetVector(x=drop_offset[0], y=drop_offset[1], z=drop_offset[2])
if drop_offset
else None
)

if isinstance(new_location, DeckSlotName) or isinstance(
new_location, StagingSlotName
):
# Find the destination labware at the provided deck slot
destination_labware_in_slot = self._engine_client.state.labware.get_by_slot(
new_location
)
if destination_labware_in_slot is None:
to_location = self._convert_labware_location(location=new_location)
else:
highest_child_location = (
self._engine_client.state.labware.get_highest_child_labware(
destination_labware_in_slot.id
)
)
to_location = self._convert_labware_location(
location=LabwareCore(highest_child_location, self._engine_client)
)
elif isinstance(new_location, LabwareCore):
highest_child_location = (
self._engine_client.state.labware.get_highest_child_labware(
new_location.labware_id
)
)
to_location = self._convert_labware_location(
location=LabwareCore(highest_child_location, self._engine_client)
)
else:
to_location = self._convert_labware_location(location=new_location)

self._engine_client.execute_command(
cmd.MoveLidParams(
labwareId=lid_id,
newLocation=to_location,
strategy=strategy,
pickUpOffset=_pick_up_offset,
dropOffset=_drop_offset,
)
)

if strategy == LabwareMovementStrategy.USING_GRIPPER:
# Clear out last location since it is not relevant to pipetting
# and we only use last location for in-place pipetting commands
self.set_last_location(location=None, mount=Mount.EXTENSION)

# FIXME(jbl, 2024-01-04) deck conflict after execution logic issue, read notes in load_labware for more info:
deck_conflict.check(
engine_state=self._engine_client.state,
new_labware_id=lid_id,
existing_disposal_locations=self._disposal_locations,
# TODO: We can now fetch these IDs from engine too.
# See comment in self.load_labware().
existing_labware_ids=[
labware_id
for labware_id in self._labware_cores_by_id
if labware_id != labware_id
],
existing_module_ids=list(self._module_cores_by_id.keys()),
)

# If we end up create a new lid stack, return the lid stack
parent_location = self._engine_client.state.labware.get_location(lid_id)
if isinstance(
parent_location, OnLabwareLocation
) and labware_validation.is_lid_stack(
self._engine_client.state.labware.get_load_name(parent_location.labwareId)
):
return LabwareCore(
labware_id=parent_location.labwareId, engine_client=self._engine_client
)
return None

def _resolve_module_hardware(
self, serial_number: str, model: ModuleModel
) -> AbstractModule:
Expand Down
3 changes: 3 additions & 0 deletions api/src/opentrons/protocol_api/core/engine/stringify.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def _labware_location_string(
elif location == "offDeck":
return "[off-deck]"

elif location == "invalidated":
return "[invalidated]"


def _labware_name(engine_client: SyncClient, labware_id: str) -> str:
"""Return the user-specified labware label, or fall back to the display name from the def."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,25 @@ def move_labware(
"""Move labware to new location."""
raise APIVersionError(api_element="Labware movement")

def move_lid(
self,
source_location: Union[DeckSlotName, StagingSlotName, LegacyLabwareCore],
new_location: Union[
DeckSlotName,
StagingSlotName,
LegacyLabwareCore,
OffDeckType,
WasteChute,
TrashBin,
],
use_gripper: bool,
pause_for_manual_move: bool,
pick_up_offset: Optional[Tuple[float, float, float]],
drop_offset: Optional[Tuple[float, float, float]],
) -> LegacyLabwareCore | None:
"""Move lid to new location."""
raise APIVersionError(api_element="Lid movement")

def load_module(
self,
model: ModuleModel,
Expand Down
19 changes: 19 additions & 0 deletions api/src/opentrons/protocol_api/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ def move_labware(
) -> None:
...

@abstractmethod
def move_lid(
self,
source_location: Union[DeckSlotName, StagingSlotName, LabwareCoreType],
new_location: Union[
DeckSlotName,
StagingSlotName,
LabwareCoreType,
OffDeckType,
WasteChute,
TrashBin,
],
use_gripper: bool,
pause_for_manual_move: bool,
pick_up_offset: Optional[Tuple[float, float, float]],
drop_offset: Optional[Tuple[float, float, float]],
) -> LabwareCoreType | None:
...

@abstractmethod
def load_module(
self,
Expand Down
Loading
Loading