Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(engine): consolidate_liquid engine core implementation #17458

Open
wants to merge 8 commits into
base: edge
Choose a base branch
from
149 changes: 145 additions & 4 deletions api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from opentrons_shared_data.errors.exceptions import (
UnsupportedHardwareCommand,
)
from opentrons_shared_data.liquid_classes.liquid_class_definition import BlowoutLocation
from opentrons.protocol_api._nozzle_layout import NozzleLayout
from . import overlap_versions, pipette_movement_conflict
from . import transfer_components_executor as tx_comps_executor
Expand Down Expand Up @@ -1227,7 +1228,7 @@ def distribute_liquid(
) -> None:
pass

def consolidate_liquid(
def consolidate_liquid( # noqa: C901
self,
liquid_class: LiquidClass,
volume: float,
Expand All @@ -1237,7 +1238,146 @@ def consolidate_liquid(
tip_racks: List[Tuple[Location, LabwareCore]],
trash_location: Union[Location, TrashBin, WasteChute],
) -> None:
pass
if not tip_racks:
raise RuntimeError(
"No tipracks found for pipette in order to perform transfer"
)
tiprack_uri_for_transfer_props = tip_racks[0][1].get_uri()
transfer_props = liquid_class.get_for(
pipette=self.get_pipette_name(), tip_rack=tiprack_uri_for_transfer_props
)
blow_out_properties = transfer_props.dispense.retract.blowout
if (
blow_out_properties.enabled
and blow_out_properties.location == BlowoutLocation.SOURCE
):
raise RuntimeError(
'Blowout location "source" incompatible with consolidate liquid.'
' Please choose "destination" or "trash".'
)
Comment on lines +1250 to +1257
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should highlight this and mix, pre-wet behavior in docs. I guess it's okay to raise this error instead of simply skipping the blowout, as long as our built-in classes don't have blowout with source location enabled by default, because otherwise a lot of protocols will run into errors due to no fault of the users. Liquid classes are just supposed to work without having to modify for each transfer.


# TODO: use the ID returned by load_liquid_class in command annotations
self.load_liquid_class(
name=liquid_class.name,
transfer_properties=transfer_props,
tiprack_uri=tiprack_uri_for_transfer_props,
)

max_volume = min(
self.get_max_volume(),
self._engine_client.state.geometry.get_nominal_tip_geometry(
pipette_id=self.pipette_id,
labware_id=tip_racks[0][1].labware_id,
well_name=None,
).volume,
)

# TODO: add multi-channel pipette handling here
source_per_volume_step = tx_commons.expand_for_volume_constraints(
volumes=[volume for _ in range(len(source))],
targets=source,
max_volume=max_volume,
)

def _drop_tip() -> None:
if isinstance(trash_location, (TrashBin, WasteChute)):
self.drop_tip_in_disposal_location(
disposal_location=trash_location,
home_after=False,
alternate_tip_drop=True,
)
elif isinstance(trash_location, Location):
self.drop_tip(
location=trash_location,
well_core=trash_location.labware.as_well()._core, # type: ignore[arg-type]
home_after=False,
alternate_drop_location=True,
)

def _pick_up_tip() -> None:
next_tip = self.get_next_tip(
tip_racks=[core for loc, core in tip_racks],
starting_well=None,
)
if next_tip is None:
raise RuntimeError(
f"No tip available among {tip_racks} for this transfer."
)
(
tiprack_loc,
tiprack_uri,
tip_well,
) = self._get_location_and_well_core_from_next_tip_info(next_tip, tip_racks)
if tiprack_uri != tiprack_uri_for_transfer_props:
raise RuntimeError(
f"Tiprack {tiprack_uri} does not match the tiprack designated "
f"for this transfer- {tiprack_uri_for_transfer_props}."
)
self.pick_up_tip(
location=tiprack_loc,
well_core=tip_well,
presses=None,
increment=None,
)

if new_tip == TransferTipPolicyV2.ONCE:
_pick_up_tip()

prev_src: Optional[Tuple[Location, WellCore]] = None
tip_contents = [
tx_comps_executor.LiquidAndAirGapPair(
liquid=0,
air_gap=0,
)
]
next_step_volume, next_source = next(source_per_volume_step)
is_last_step = False
while not is_last_step:
total_dispense_volume = 0.0
vol_aspirate_combo = []
# Take air gap into account because there will be a final air gap before the dispense
while total_dispense_volume + next_step_volume <= max_volume:
total_dispense_volume += next_step_volume
vol_aspirate_combo.append((next_step_volume, next_source))
try:
next_step_volume, next_source = next(source_per_volume_step)
except StopIteration:
is_last_step = True
break

if new_tip == TransferTipPolicyV2.ALWAYS:
if prev_src is not None:
_drop_tip()
_pick_up_tip()
tip_contents = [
tx_comps_executor.LiquidAndAirGapPair(
liquid=0,
air_gap=0,
)
]
for step_volume, step_source in vol_aspirate_combo:
tip_contents = self.aspirate_liquid_class(
volume=step_volume,
source=step_source,
transfer_properties=transfer_props,
transfer_type=tx_comps_executor.TransferType.MANY_TO_ONE,
tip_contents=tip_contents,
)
tip_contents = self.dispense_liquid_class(
volume=total_dispense_volume,
dest=dest,
source=None, # Cannot have source as location for blowout so hardcoded to None
transfer_properties=transfer_props,
transfer_type=tx_comps_executor.TransferType.MANY_TO_ONE,
tip_contents=tip_contents,
add_final_air_gap=False
if is_last_step and new_tip == TransferTipPolicyV2.NEVER
else True,
trash_location=trash_location,
)
prev_src = next_source
if new_tip != TransferTipPolicyV2.NEVER:
_drop_tip()

def _get_location_and_well_core_from_next_tip_info(
self,
Expand Down Expand Up @@ -1280,9 +1420,10 @@ def aspirate_liquid_class(
"""
aspirate_props = transfer_properties.aspirate
# TODO (spp, 2025-01-30): check if check_valid_volume_parameters is necessary and is enough.
tx_commons.check_valid_volume_parameters(
disposal_volume=0, # No disposal volume for 1-to-1 transfer
tx_commons.check_valid_liquid_class_volume_parameters(
aspirate_volume=volume,
air_gap=aspirate_props.retract.air_gap_by_volume.get_for_volume(volume),
disposal_volume=0,
max_volume=self.get_working_volume(),
)
source_loc, source_well = source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def submerge(
minimum_z_height=None,
speed=None,
)
if self._transfer_type == TransferType.ONE_TO_ONE:
if self._transfer_type != TransferType.ONE_TO_MANY:
self._remove_air_gap(location=submerge_start_location)
self._instrument.move_to(
location=self._target_location,
Expand Down Expand Up @@ -217,7 +217,10 @@ def mix(self, mix_properties: MixProperties, last_dispense_push_out: bool) -> No
NOTE: For most of our built-in definitions, we will keep _mix_ off because it is a very application specific thing.
We should mention in our docs that users should adjust this property according to their application.
"""
if not mix_properties.enabled:
if (
not mix_properties.enabled
or self._transfer_type == TransferType.MANY_TO_ONE
):
Comment on lines +220 to +223
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is my fault; the docstring I wrote is misleading. I think we do want to run the post-dispense mix, while skipping the pre-aspirate mix. One way to do that is do the gating inside InstrumentCore.aspirate_liquid() and InstrumentCore.dispense_liquid() and the other is to somehow pass this function (or the class) the info that this mix is a part of aspirate/dispense.

return
# Assertion only for mypy purposes
assert (
Expand Down Expand Up @@ -248,7 +251,10 @@ def pre_wet(
- No push out
- No pre-wet for consolidation
"""
if not self._transfer_properties.aspirate.pre_wet:
if (
not self._transfer_properties.aspirate.pre_wet
or self._transfer_type == TransferType.MANY_TO_ONE
):
return
mix_props = MixProperties(_enabled=True, _repetitions=1, _volume=volume)
self.mix(mix_properties=mix_props, last_dispense_push_out=False)
Expand Down Expand Up @@ -313,9 +319,15 @@ def retract_after_aspiration(self, volume: float) -> None:
# Full speed because the tip will already be out of the liquid
speed=None,
)
# For consolidate, we need to know the total amount that is in the pipette since this
# may not be the first aspirate
if self._transfer_type == TransferType.MANY_TO_ONE:
volume_for_air_gap = self._instrument.get_current_volume()
else:
volume_for_air_gap = volume
self._add_air_gap(
air_gap_volume=self._transfer_properties.aspirate.retract.air_gap_by_volume.get_for_volume(
volume
volume_for_air_gap
)
)

Expand Down
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_api/instrument_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,10 @@ def consolidate_liquid(
)
else:
tip_racks = [self._last_tip_picked_up_from.parent]
elif valid_new_tip == TransferTipPolicyV2.PER_SOURCE:
raise RuntimeError(
'Tip transfer policy "per source" incompatible with consolidate.'
)
else:
tip_racks = self._tip_racks
if self.current_volume != 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@ def check_valid_volume_parameters(
)


def check_valid_liquid_class_volume_parameters(
aspirate_volume: float, air_gap: float, disposal_volume: float, max_volume: float
) -> None:
if air_gap + aspirate_volume > max_volume:
raise ValueError(
f"Cannot have an air gap of {air_gap} µL for an aspiration of {aspirate_volume} µL"
f" with a max volume of {max_volume} µL. Please adjust the retract air gap to fit within"
f" the bounds of the tip."
)
elif disposal_volume + aspirate_volume > max_volume:
raise ValueError(
f"Cannot have a dispense volume of {disposal_volume} µL for an aspiration of {aspirate_volume} µL"
f" with a max volume of {max_volume} µL. Please adjust the dispense volume to fit within"
f" the bounds of the tip."
)


def expand_for_volume_constraints(
volumes: Iterable[float],
targets: Iterable[Target],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ def patch_mock_pipette_movement_safety_check(


@pytest.fixture(autouse=True)
def patch_mock_check_valid_volume_parameters(
def patch_mock_check_valid_liquid_class_volume_parameters(
decoy: Decoy, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Replace tx_commons.check_valid_volume_parameters() with a mock."""
mock = decoy.mock(func=tx_commons.check_valid_volume_parameters)
monkeypatch.setattr(tx_commons, "check_valid_volume_parameters", mock)
"""Replace tx_commons.check_valid_liquid_class_volume_parameters() with a mock."""
mock = decoy.mock(func=tx_commons.check_valid_liquid_class_volume_parameters)
monkeypatch.setattr(tx_commons, "check_valid_liquid_class_volume_parameters", mock)


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -1879,7 +1879,8 @@ def test_aspirate_liquid_class_raises_for_more_than_max_volume(
mock_engine_client.state.pipettes.get_working_volume("abc123")
).then_return(100)
decoy.when(
tx_commons.check_valid_volume_parameters(
tx_commons.check_valid_liquid_class_volume_parameters(
aspirate_volume=123,
disposal_volume=0,
air_gap=test_transfer_properties.aspirate.retract.air_gap_by_volume.get_for_volume(
123
Expand Down
41 changes: 31 additions & 10 deletions api/tests/opentrons/protocol_api/test_instrument_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2350,16 +2350,6 @@ def test_consolidate_liquid_raises_if_tip_has_liquid(
decoy.when(mock_validation.ensure_new_tip_policy("never")).then_return(
TransferTipPolicyV2.ONCE
)
decoy.when(mock_instrument_core.get_nozzle_map()).then_return(MOCK_MAP)
decoy.when(mock_instrument_core.get_active_channels()).then_return(2)
decoy.when(
labware.next_available_tip(
starting_tip=None,
tip_racks=tip_racks,
channels=2,
nozzle_map=MOCK_MAP,
)
).then_return((decoy.mock(cls=Labware), decoy.mock(cls=Well)))
decoy.when(mock_instrument_core.get_current_volume()).then_return(1000)
with pytest.raises(RuntimeError, match="liquid already in the tip"):
subject.consolidate_liquid(
Expand All @@ -2371,6 +2361,37 @@ def test_consolidate_liquid_raises_if_tip_has_liquid(
)


@pytest.mark.parametrize("robot_type", ["OT-2 Standard", "OT-3 Standard"])
def test_consolidate_liquid_raises_if_tip_policy_per_source(
decoy: Decoy,
mock_protocol_core: ProtocolCore,
mock_instrument_core: InstrumentCore,
subject: InstrumentContext,
robot_type: RobotType,
minimal_liquid_class_def2: LiquidClassSchemaV1,
) -> None:
"""It should raise errors if the tip policy is "per source"."""
test_liq_class = LiquidClass.create(minimal_liquid_class_def2)
mock_well = decoy.mock(cls=Well)

decoy.when(
mock_validation.ensure_valid_flat_wells_list_for_transfer_v2([mock_well])
).then_return([mock_well])
decoy.when(mock_validation.ensure_new_tip_policy("per source")).then_return(
TransferTipPolicyV2.PER_SOURCE
)
with pytest.raises(
RuntimeError, match='"per source" incompatible with consolidate.'
):
subject.consolidate_liquid(
liquid_class=test_liq_class,
volume=10,
source=[mock_well],
dest=mock_well,
new_tip="per source",
)


@pytest.mark.parametrize("robot_type", ["OT-2 Standard", "OT-3 Standard"])
def test_consolidate_liquid_delegates_to_engine_core(
decoy: Decoy,
Expand Down
Loading