Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(api): use labware pool in stacker engine #17574

Merged
merged 11 commits into from
Feb 26, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from .exceptions import InvalidMagnetEngageHeightError
from . import load_labware_params


# Valid wavelength range for absorbance reader
ABS_WAVELENGTH_MIN = 350
ABS_WAVELENGTH_MAX = 1000
Expand Down
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_api/module_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,11 @@ def serial_number(self) -> str:
def retrieve(self) -> Labware:
"""Release and return a labware at the bottom of the labware stack."""
self._core.retrieve()

labware_core = self._protocol_core.get_labware_on_module(self._core)
if labware_core is not None and labware_core.is_adapter():
labware_core = self._protocol_core.get_labware_on_labware(labware_core)

# the core retrieve command should have already raised the error
# if labware_core is None, this is just to satisfy the type checker
assert labware_core is not None, "Retrieve failed to return labware"
Expand Down
289 changes: 247 additions & 42 deletions api/src/opentrons/protocol_engine/commands/flex_stacker/retrieve.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Command models to retrieve a labware from a Flex Stacker."""

from __future__ import annotations
from typing import Optional, Literal, TYPE_CHECKING
from typing import Optional, Literal, TYPE_CHECKING, Any, Dict
from typing_extensions import Type

from pydantic import BaseModel, Field
Expand All @@ -11,41 +11,100 @@
ErrorOccurrence,
CannotPerformModuleAction,
LocationIsOccupiedError,
FlexStackerLabwarePoolNotYetDefinedError,
)
from ...state import update_types
from ...types import (
ModuleLocation,
OnLabwareLocation,
LabwareLocationSequence,
LabwareLocation,
)
from opentrons_shared_data.labware.labware_definition import LabwareDefinition
from pydantic.json_schema import SkipJsonSchema
from opentrons.calibration_storage.helpers import uri_from_details

if TYPE_CHECKING:
from opentrons.protocol_engine.state.state import StateView
from opentrons.protocol_engine.state.module_substates import FlexStackerSubState
from opentrons.protocol_engine.execution import EquipmentHandler

RetrieveCommandType = Literal["flexStacker/retrieve"]


def _remove_default(s: dict[str, Any]) -> None:
s.pop("default", None)


class RetrieveParams(BaseModel):
"""Input parameters for a labware retrieval command."""

moduleId: str = Field(
...,
description="Unique ID of the Flex Stacker.",
)
labwareId: str | SkipJsonSchema[None] = Field(
None,
description="An optional ID to assign to this labware. If None, an ID "
"will be generated.",
json_schema_extra=_remove_default,
)
displayName: str | SkipJsonSchema[None] = Field(
None,
description="An optional user-specified display name "
"or label for this labware.",
json_schema_extra=_remove_default,
)
adapterId: str | SkipJsonSchema[None] = Field(
None,
description="An optional ID to assign to an adapter. If None, an ID "
"will be generated.",
json_schema_extra=_remove_default,
)
lidId: str | SkipJsonSchema[None] = Field(
None,
description="An optional ID to assign to a lid. If None, an ID "
"will be generated.",
json_schema_extra=_remove_default,
)


class RetrieveResult(BaseModel):
"""Result data from a labware retrieval command."""

labware_id: str = Field(
labwareId: str = Field(
...,
description="The labware ID of the retrieved labware.",
description="The labware ID of the primary retrieved labware.",
)
adapterId: str | None = Field(
None,
description="The optional Adapter Labware ID of the adapter under a primary labware.",
)
lidId: str | None = Field(
None,
description="The optional Lid Labware ID of the lid on a primary labware.",
)
originLocationSequence: LabwareLocationSequence | None = Field(
None, description="The origin location of the labware."
primaryLocationSequence: LabwareLocationSequence = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this location refer to? The deck location of the stacker where a labware started? Or is it an on-deck location or either one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a sequence of the location of the primary labware all the way down to the deck, so it will usually be [OnModuleLocation(moduleId=stackerid), OnCutoutFixtureLocation(...)] or, if there's an adapter, [OnLabwareLocation(labwareId=adapter-id), OnModuleLocation(moduleId=stacker-id), OnCutoutFixtureLocation(...)]. It's the position after the labware is retrieved, e.g. on the stacker shuttle.

..., description="The origin location of the primary labware."
)
eventualDestinationLocationSequence: LabwareLocationSequence | None = Field(
None, description="The eventual destination of the labware."
lidLocationSequence: LabwareLocationSequence | None = Field(
None,
description="The origin location of the adapter labware under a primary labware.",
)
adapterLocationSequence: LabwareLocationSequence | None = Field(
None, description="The origin location of the lid labware on a primary labware."
)
primaryLabwareURI: str = Field(
...,
description="The labware definition URI of the primary labware.",
)
adapterLabwareURI: str | None = Field(
None,
description="The labware definition URI of the adapter labware.",
)
lidLabwareURI: str | None = Field(
None,
description="The labware definition URI of the lid labware.",
)


Expand All @@ -61,6 +120,155 @@ def __init__(
self._state_view = state_view
self._equipment = equipment

async def _load_labware_from_pool(
self, params: RetrieveParams, stacker_state: FlexStackerSubState
) -> tuple[RetrieveResult, update_types.StateUpdate]:
state_update = update_types.StateUpdate()

# If there is an adapter load it
adapter_lw = None
lid_lw = None
definitions_by_id: Dict[str, LabwareDefinition] = {}
offset_ids_by_id: Dict[str, str | None] = {}
display_names_by_id: Dict[str, str | None] = {}
new_locations_by_id: Dict[str, LabwareLocation] = {}
if stacker_state.pool_adapter_definition is not None:
adapter_lw = await self._equipment.load_labware_from_definition(
definition=stacker_state.pool_adapter_definition,
location=ModuleLocation(moduleId=params.moduleId),
labware_id=params.adapterId,
)
definitions_by_id[adapter_lw.labware_id] = adapter_lw.definition
offset_ids_by_id[adapter_lw.labware_id] = adapter_lw.offsetId
display_names_by_id[
adapter_lw.labware_id
] = adapter_lw.definition.metadata.displayName
new_locations_by_id[adapter_lw.labware_id] = ModuleLocation(
moduleId=params.moduleId
)
# Always load the primary labware
if stacker_state.pool_primary_definition is None:
raise CannotPerformModuleAction(
f"Flex Stacker {params.moduleId} has no labware to retrieve"
)
loaded_labware = await self._equipment.load_labware_from_definition(
definition=stacker_state.pool_primary_definition,
location=(
ModuleLocation(moduleId=params.moduleId)
if adapter_lw is None
else OnLabwareLocation(labwareId=adapter_lw.labware_id)
),
labware_id=params.labwareId,
)
definitions_by_id[loaded_labware.labware_id] = loaded_labware.definition
offset_ids_by_id[loaded_labware.labware_id] = loaded_labware.offsetId
display_names_by_id[
loaded_labware.labware_id
] = loaded_labware.definition.metadata.displayName
new_locations_by_id[loaded_labware.labware_id] = (
ModuleLocation(moduleId=params.moduleId)
if adapter_lw is None
else OnLabwareLocation(labwareId=adapter_lw.labware_id)
)
# If there is a lid load it
if stacker_state.pool_lid_definition is not None:
lid_lw = await self._equipment.load_labware_from_definition(
definition=stacker_state.pool_lid_definition,
location=OnLabwareLocation(labwareId=loaded_labware.labware_id),
labware_id=params.lidId,
)
definitions_by_id[lid_lw.labware_id] = lid_lw.definition
offset_ids_by_id[lid_lw.labware_id] = lid_lw.offsetId
display_names_by_id[
lid_lw.labware_id
] = lid_lw.definition.metadata.displayName
new_locations_by_id[lid_lw.labware_id] = OnLabwareLocation(
labwareId=loaded_labware.labware_id
)

# Get the labware dimensions for the labware being retrieved,
# which is the first one in the hopper labware id list
primary_location_sequence = (
self._state_view.geometry.get_predicted_location_sequence(
new_locations_by_id[loaded_labware.labware_id]
)
)
adapter_location_sequence = (
self._state_view.geometry.get_predicted_location_sequence(
new_locations_by_id[adapter_lw.labware_id]
)
if adapter_lw is not None
else None
)
lid_location_sequence = (
self._state_view.geometry.get_predicted_location_sequence(
new_locations_by_id[lid_lw.labware_id]
)
if lid_lw is not None
else None
)

# Get the Labware URIs where relevant
primary_uri = str(
uri_from_details(
namespace=loaded_labware.definition.namespace,
load_name=loaded_labware.definition.parameters.loadName,
version=loaded_labware.definition.version,
)
)
adapter_uri = (
str(
uri_from_details(
namespace=adapter_lw.definition.namespace,
load_name=adapter_lw.definition.parameters.loadName,
version=adapter_lw.definition.version,
)
)
if adapter_lw is not None
else None
)
lid_uri = (
str(
uri_from_details(
namespace=lid_lw.definition.namespace,
load_name=lid_lw.definition.parameters.loadName,
version=lid_lw.definition.version,
)
)
if lid_lw is not None
else None
)
state_update.set_batch_loaded_labware(
definitions_by_id=definitions_by_id,
display_names_by_id=display_names_by_id,
offset_ids_by_id=offset_ids_by_id,
new_locations_by_id=new_locations_by_id,
)
state_update.update_flex_stacker_labware_pool_count(
module_id=params.moduleId, count=stacker_state.pool_count - 1
)

if lid_lw is not None:
state_update.set_lids(
parent_labware_ids=[loaded_labware.labware_id],
lid_ids=[lid_lw.labware_id],
)

return (
RetrieveResult(
labwareId=loaded_labware.labware_id,
adapterId=adapter_lw.labware_id if adapter_lw is not None else None,
lidId=lid_lw.labware_id if lid_lw is not None else None,
primaryLocationSequence=primary_location_sequence,
adapterLocationSequence=adapter_location_sequence,
lidLocationSequence=lid_location_sequence,
primaryLabwareURI=primary_uri,
adapterLabwareURI=adapter_uri,
lidLabwareURI=lid_uri,
),
state_update,
)

async def execute(self, params: RetrieveParams) -> SuccessData[RetrieveResult]:
"""Execute the labware retrieval command."""
stacker_state = self._state_view.modules.get_flex_stacker_substate(
Expand All @@ -72,62 +280,59 @@ async def execute(self, params: RetrieveParams) -> SuccessData[RetrieveResult]:
"Cannot retrieve labware from Flex Stacker while in static mode"
)

stacker_loc = ModuleLocation(moduleId=params.moduleId)
# Allow propagation of ModuleNotAttachedError.
stacker_hw = self._equipment.get_module_hardware_api(stacker_state.module_id)
pool_definitions = stacker_state.get_pool_definition_ordered_list()
if pool_definitions is None:
location = self._state_view.modules.get_location(params.moduleId)
raise FlexStackerLabwarePoolNotYetDefinedError(
message=f"The Flex Stacker in {location} has not been configured yet and cannot be filled."
)

if not stacker_state.hopper_labware_ids:
if stacker_state.pool_count == 0:
raise CannotPerformModuleAction(
f"Flex Stacker {params.moduleId} has no labware to retrieve"
message="Cannot retrieve labware from Flex Stacker because it contains no labware"
)

stacker_loc = ModuleLocation(moduleId=params.moduleId)
# Allow propagation of ModuleNotAttachedError.
stacker_hw = self._equipment.get_module_hardware_api(stacker_state.module_id)

try:
self._state_view.labware.raise_if_labware_in_location(stacker_loc)
except LocationIsOccupiedError:
raise CannotPerformModuleAction(
"Cannot retrieve a labware from Flex Stacker if the carriage is occupied"
)

state_update = update_types.StateUpdate()

# Get the labware dimensions for the labware being retrieved,
# which is the first one in the hopper labware id list
lw_id = stacker_state.hopper_labware_ids[0]
original_location_sequence = self._state_view.geometry.get_location_sequence(
lw_id
retrieve_result, state_update = await self._load_labware_from_pool(
params, stacker_state
)
destination_location_sequence = (
self._state_view.geometry.get_predicted_location_sequence(
ModuleLocation(moduleId=params.moduleId)
)

labware_height = self._state_view.geometry.get_height_of_labware_stack(
definitions=pool_definitions
)
labware = self._state_view.labware.get(lw_id)
labware_height = self._state_view.labware.get_dimensions(labware_id=lw_id).z
if labware.lid_id is not None:
lid_def = self._state_view.labware.get_definition(labware.lid_id)
offset = self._state_view.labware.get_labware_overlap_offsets(
lid_def, labware.loadName
).z
labware_height = labware_height + lid_def.dimensions.zDimension - offset

if stacker_hw is not None:
await stacker_hw.dispense_labware(labware_height=labware_height)

# update the state to reflect the labware is now in the flex stacker slot
state_update.set_labware_location(
labware_id=lw_id,
new_location=ModuleLocation(moduleId=params.moduleId),
new_offset_id=None,
# Update the state to reflect the labware is now in the Flex Stacker slot
# todo(chb, 2025-02-19): This ModuleLocation piece should probably instead be an AddressableAreaLocation
# but that has implications for where labware are set by things like module.load_labware(..) and what
# happens when we move labware.
stacker_area = (
self._state_view.modules.ensure_and_convert_module_fixture_location(
deck_slot=self._state_view.modules.get_location(
params.moduleId
).slotName,
model=self._state_view.modules.get(params.moduleId).model,
)
)
state_update.set_addressable_area_used(stacker_area)

state_update.retrieve_flex_stacker_labware(
module_id=params.moduleId, labware_id=lw_id
module_id=params.moduleId, labware_id=retrieve_result.labwareId
)
return SuccessData(
public=RetrieveResult(
labware_id=lw_id,
originLocationSequence=original_location_sequence,
eventualDestinationLocationSequence=destination_location_sequence,
),
public=retrieve_result,
state_update=state_update,
)

Expand Down
Loading
Loading