diff --git a/api/src/opentrons/hardware_control/motion_utilities.py b/api/src/opentrons/hardware_control/motion_utilities.py index dd59437f7dc..0dc1bd489c1 100644 --- a/api/src/opentrons/hardware_control/motion_utilities.py +++ b/api/src/opentrons/hardware_control/motion_utilities.py @@ -192,6 +192,22 @@ def target_position_from_plunger( return all_axes_pos +def target_positions_from_plunger_tracking( + mount: Union[Mount, OT3Mount], + plunger_delta: float, + z_delta: float, + current_position: Dict[Axis, float], +) -> "OrderedDict[Axis, float]": + """Create a target position for machine axes including plungers. + + The x/y axis remain constant but the plunger and Z move to create a tracking action. + """ + all_axes_pos = target_position_from_plunger(mount, plunger_delta, current_position) + z_ax = Axis.by_mount(mount) + all_axes_pos[z_ax] = current_position[z_ax] + z_delta + return all_axes_pos + + def deck_point_from_machine_point( machine_point: Point, attitude: AttitudeMatrix, offset: Point ) -> Point: diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index 6295757e7ab..9c27af722d9 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -126,6 +126,7 @@ target_position_from_absolute, target_position_from_relative, target_position_from_plunger, + target_positions_from_plunger_tracking, offset_for_mount, deck_from_machine, machine_from_deck, @@ -2750,7 +2751,7 @@ async def liquid_probe( # noqa: C901 if not probe_settings: probe_settings = deepcopy(self.config.liquid_sense) - # We need to significatly slow down the 96 channel liquid probe + # We need to significantly slow down the 96 channel liquid probe if self.gantry_load == GantryLoad.HIGH_THROUGHPUT: max_plunger_speed = self.config.motion_settings.max_speed_discontinuity[ GantryLoad.HIGH_THROUGHPUT @@ -2964,6 +2965,93 @@ async def capacitive_sweep( AMKey = TypeVar("AMKey") + async def aspirate_while_tracking( + self, + mount: Union[top_types.Mount, OT3Mount], + z_distance: float, + flow_rate: float, + volume: float, + ) -> None: + """ + Aspirate a volume of liquid (in microliters/uL) using this pipette.""" + realmount = OT3Mount.from_mount(mount) + aspirate_spec = self._pipette_handler.plan_check_aspirate( + realmount, volume, flow_rate + ) + if not aspirate_spec: + return + + target_pos = target_positions_from_plunger_tracking( + realmount, + aspirate_spec.plunger_distance, + z_distance, + self._current_position, + ) + + try: + await self._backend.set_active_current( + {aspirate_spec.axis: aspirate_spec.current} + ) + async with self.restore_system_constrants(): + await self.set_system_constraints_for_plunger_acceleration( + realmount, aspirate_spec.acceleration + ) + await self._move( + target_pos, + speed=aspirate_spec.speed, + home_flagged_axes=False, + ) + except Exception: + self._log.exception("Aspirate failed") + aspirate_spec.instr.set_current_volume(0) + raise + else: + aspirate_spec.instr.add_current_volume(aspirate_spec.volume) + + async def dispense_while_tracking( + self, + mount: Union[top_types.Mount, OT3Mount], + z_distance: float, + flow_rate: float, + volume: float, + push_out: Optional[float], + ) -> None: + """ + Aspirate a volume of liquid (in microliters/uL) using this pipette.""" + realmount = OT3Mount.from_mount(mount) + dispense_spec = self._pipette_handler.plan_check_dispense( + realmount, volume, flow_rate, push_out + ) + if not dispense_spec: + return + + target_pos = target_positions_from_plunger_tracking( + realmount, + dispense_spec.plunger_distance, + z_distance, + self._current_position, + ) + + try: + await self._backend.set_active_current( + {dispense_spec.axis: dispense_spec.current} + ) + async with self.restore_system_constrants(): + await self.set_system_constraints_for_plunger_acceleration( + realmount, dispense_spec.acceleration + ) + await self._move( + target_pos, + speed=dispense_spec.speed, + home_flagged_axes=False, + ) + except Exception: + self._log.exception("dispense failed") + dispense_spec.instr.set_current_volume(0) + raise + else: + dispense_spec.instr.remove_current_volume(dispense_spec.volume) + @property def attached_subsystems(self) -> Dict[SubSystem, SubSystemState]: """Get a view of the state of the currently-attached subsystems.""" diff --git a/api/src/opentrons/hardware_control/protocols/liquid_handler.py b/api/src/opentrons/hardware_control/protocols/liquid_handler.py index 2aea15bd55b..a1fa54db7fe 100644 --- a/api/src/opentrons/hardware_control/protocols/liquid_handler.py +++ b/api/src/opentrons/hardware_control/protocols/liquid_handler.py @@ -215,3 +215,22 @@ async def liquid_probe( max_z_dist : maximum depth to probe for liquid """ ... + + async def aspirate_while_tracking( + self, + mount: MountArgType, + z_distance: float, + flow_rate: float, + volume: float, + ) -> None: + ... + + async def dispense_while_tracking( + self, + mount: MountArgType, + z_distance: float, + flow_rate: float, + volume: float, + push_out: Optional[float], + ) -> None: + ... diff --git a/api/src/opentrons/protocol_api/core/engine/instrument.py b/api/src/opentrons/protocol_api/core/engine/instrument.py index 010f3110fdb..407334bfc45 100644 --- a/api/src/opentrons/protocol_api/core/engine/instrument.py +++ b/api/src/opentrons/protocol_api/core/engine/instrument.py @@ -2,8 +2,14 @@ from __future__ import annotations -from typing import Optional, TYPE_CHECKING, cast, Union, List -from opentrons.types import Location, Mount, NozzleConfigurationType, NozzleMapInterface +from typing import Optional, TYPE_CHECKING, cast, Union, List, Tuple +from opentrons.types import ( + Location, + Mount, + NozzleConfigurationType, + NozzleMapInterface, + MeniscusTracking, +) from opentrons.hardware_control import SyncHardwareAPI from opentrons.hardware_control.dev_types import PipetteDict from opentrons.protocols.api_support.util import FlowRates, find_value_for_api_version @@ -134,7 +140,7 @@ def aspirate( rate: float, flow_rate: float, in_place: bool, - is_meniscus: Optional[bool] = None, + meniscus_tracking: Optional[MeniscusTracking] = None, ) -> None: """Aspirate a given volume of liquid from the specified location. Args: @@ -144,6 +150,7 @@ def aspirate( rate: Not used in this core. flow_rate: The flow rate in µL/s to aspirate at. in_place: whether this is a in-place command. + meniscus_tracking: Optional data about where to aspirate from. """ if well_core is None: if not in_place: @@ -173,10 +180,15 @@ def aspirate( labware_id=labware_id, well_name=well_name, absolute_point=location.point, - is_meniscus=is_meniscus, + meniscus_tracking=meniscus_tracking, ) - if well_location.origin == WellOrigin.MENISCUS: - well_location.volumeOffset = "operationVolume" + dynamic_liquid_tracking = False + # caila bookmark + if meniscus_tracking: + if meniscus_tracking.target == "end": + well_location.volumeOffset = "operationVolume" + elif meniscus_tracking.target == "dynamic_meniscus": + dynamic_liquid_tracking = True pipette_movement_conflict.check_safe_for_pipette_movement( engine_state=self._engine_client.state, pipette_id=self._pipette_id, @@ -184,16 +196,28 @@ def aspirate( well_name=well_name, well_location=well_location, ) - self._engine_client.execute_command( - cmd.AspirateParams( - pipetteId=self._pipette_id, - labwareId=labware_id, - wellName=well_name, - wellLocation=well_location, - volume=volume, - flowRate=flow_rate, + if dynamic_liquid_tracking: + self._engine_client.execute_command( + cmd.AspirateWhileTrackingParams( + pipetteId=self._pipette_id, + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + volume=volume, + flowRate=flow_rate, + ) + ) + else: + self._engine_client.execute_command( + cmd.AspirateParams( + pipetteId=self._pipette_id, + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + volume=volume, + flowRate=flow_rate, + ) ) - ) self._protocol_core.set_last_location(location=location, mount=self.get_mount()) @@ -206,7 +230,7 @@ def dispense( flow_rate: float, in_place: bool, push_out: Optional[float], - is_meniscus: Optional[bool] = None, + meniscus_tracking: Optional[MeniscusTracking] = None, ) -> None: """Dispense a given volume of liquid into the specified location. Args: @@ -217,7 +241,9 @@ def dispense( flow_rate: The flow rate in µL/s to dispense at. in_place: whether this is a in-place command. push_out: The amount to push the plunger below bottom position. + meniscus_tracking: Optional data about where to dispense from. """ + # raise ValueError(f"well location = {location}") if self._protocol_core.api_version < _DISPENSE_VOLUME_VALIDATION_ADDED_IN: # In older API versions, when you try to dispense more than you can, # it gets clamped. @@ -266,8 +292,14 @@ def dispense( labware_id=labware_id, well_name=well_name, absolute_point=location.point, - is_meniscus=is_meniscus, + meniscus_tracking=meniscus_tracking, ) + dynamic_liquid_tracking = False + if meniscus_tracking: + if meniscus_tracking.target == "end": + well_location.volumeOffset = "operationVolume" + elif meniscus_tracking.target == "dynamic_meniscus": + dynamic_liquid_tracking = True pipette_movement_conflict.check_safe_for_pipette_movement( engine_state=self._engine_client.state, pipette_id=self._pipette_id, @@ -275,17 +307,30 @@ def dispense( well_name=well_name, well_location=well_location, ) - self._engine_client.execute_command( - cmd.DispenseParams( - pipetteId=self._pipette_id, - labwareId=labware_id, - wellName=well_name, - wellLocation=well_location, - volume=volume, - flowRate=flow_rate, - pushOut=push_out, + if dynamic_liquid_tracking: + self._engine_client.execute_command( + cmd.DispenseWhileTrackingParams( + pipetteId=self._pipette_id, + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + volume=volume, + flowRate=flow_rate, + pushOut=push_out, + ) + ) + else: + self._engine_client.execute_command( + cmd.DispenseParams( + pipetteId=self._pipette_id, + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + volume=volume, + flowRate=flow_rate, + pushOut=push_out, + ) ) - ) if isinstance(location, (TrashBin, WasteChute)): self._protocol_core.set_last_location(location=None, mount=self.get_mount()) @@ -968,6 +1013,43 @@ def liquid_probe_with_recovery(self, well_core: WellCore, loc: Location) -> None self._protocol_core.set_last_location(location=loc, mount=self.get_mount()) + def liquid_probe_testing_data( + self, + well_core: WellCore, + loc: Location, + operation_volume: float, + ) -> Tuple[float, float, float]: + labware_id = well_core.labware_id + well_name = well_core.get_name() + well_location = WellLocation( + origin=WellOrigin.TOP, offset=WellOffset(x=0, y=0, z=2) + ) + result = self._engine_client.execute_command_without_recovery( + cmd.LiquidProbeParams( + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + pipetteId=self.pipette_id, + ) + ) + + self._protocol_core.set_last_location(location=loc, mount=self.get_mount()) + projected_current_volume = ( + self._engine_client.state.geometry.get_well_volume_at_height( + labware_id=labware_id, well_name=well_name, height=result.z_position + ) + ) + projected_final_height = ( + self._engine_client.state.geometry.get_well_height_after_volume( + labware_id=labware_id, + well_name=well_name, + initial_height=result.z_position, + volume=operation_volume, + ) + ) + + return result.z_position, projected_current_volume, projected_final_height + def liquid_probe_without_recovery( self, well_core: WellCore, loc: Location ) -> float: diff --git a/api/src/opentrons/protocol_api/core/instrument.py b/api/src/opentrons/protocol_api/core/instrument.py index bc1ec3669df..f76ad2eadc8 100644 --- a/api/src/opentrons/protocol_api/core/instrument.py +++ b/api/src/opentrons/protocol_api/core/instrument.py @@ -3,7 +3,7 @@ from __future__ import annotations from abc import abstractmethod, ABC -from typing import Any, Generic, Optional, TypeVar, Union, List +from typing import Any, Generic, Optional, TypeVar, Union, List, Tuple from opentrons import types from opentrons.hardware_control.dev_types import PipetteDict @@ -41,7 +41,7 @@ def aspirate( rate: float, flow_rate: float, in_place: bool, - is_meniscus: Optional[bool] = None, + meniscus_tracking: Optional[types.MeniscusTracking] = None, ) -> None: """Aspirate a given volume of liquid from the specified location. Args: @@ -51,6 +51,7 @@ def aspirate( rate: The rate for how quickly to aspirate. flow_rate: The flow rate in µL/s to aspirate at. in_place: Whether this is in-place. + meniscus_tracking: Optional data about where to aspirate from. """ ... @@ -64,7 +65,7 @@ def dispense( flow_rate: float, in_place: bool, push_out: Optional[float], - is_meniscus: Optional[bool] = None, + meniscus_tracking: Optional[types.MeniscusTracking] = None, ) -> None: """Dispense a given volume of liquid into the specified location. Args: @@ -75,6 +76,7 @@ def dispense( flow_rate: The flow rate in µL/s to dispense at. in_place: Whether this is in-place. push_out: The amount to push the plunger below bottom position. + meniscus_tracking: Optional data about where to dispense from. """ ... @@ -365,6 +367,15 @@ def liquid_probe_without_recovery( """Do a liquid probe to find the level of the liquid in the well.""" ... + @abstractmethod + def liquid_probe_testing_data( + self, + well_core: types.WellCore, + loc: types.Location, + operation_volume: float, + ) -> Tuple[float, float, float]: + ... + @abstractmethod def nozzle_configuration_valid_for_lld(self) -> bool: """Check if the nozzle configuration currently supports LLD.""" diff --git a/api/src/opentrons/protocol_api/core/legacy/legacy_instrument_core.py b/api/src/opentrons/protocol_api/core/legacy/legacy_instrument_core.py index d2d25051d49..abf0e352e10 100644 --- a/api/src/opentrons/protocol_api/core/legacy/legacy_instrument_core.py +++ b/api/src/opentrons/protocol_api/core/legacy/legacy_instrument_core.py @@ -84,7 +84,7 @@ def aspirate( rate: float, flow_rate: float, in_place: bool, - is_meniscus: Optional[bool] = None, + meniscus_tracking: Optional[types.MeniscusTracking] = None, ) -> None: """Aspirate a given volume of liquid from the specified location. Args: @@ -94,6 +94,7 @@ def aspirate( rate: The rate in µL/s to aspirate at. flow_rate: Not used in this core. in_place: Whether we should move_to location. + meniscus_tracking: Optional data about where to aspirate from. """ if self.get_current_volume() == 0: # Make sure we're at the top of the labware and clear of any @@ -127,7 +128,7 @@ def dispense( flow_rate: float, in_place: bool, push_out: Optional[float], - is_meniscus: Optional[bool] = None, + meniscus_tracking: Optional[types.MeniscusTracking] = None, ) -> None: """Dispense a given volume of liquid into the specified location. Args: @@ -138,6 +139,7 @@ def dispense( flow_rate: Not used in this core. in_place: Whether we should move_to location. push_out: The amount to push the plunger below bottom position. + meniscus_tracking: Optional data about where to dispense from. """ if isinstance(location, (TrashBin, WasteChute)): raise APIVersionError( diff --git a/api/src/opentrons/protocol_api/core/legacy_simulator/legacy_instrument_core.py b/api/src/opentrons/protocol_api/core/legacy_simulator/legacy_instrument_core.py index ec194874528..3e6d02c2ab9 100644 --- a/api/src/opentrons/protocol_api/core/legacy_simulator/legacy_instrument_core.py +++ b/api/src/opentrons/protocol_api/core/legacy_simulator/legacy_instrument_core.py @@ -95,7 +95,6 @@ def aspirate( rate: float, flow_rate: float, in_place: bool, - is_meniscus: Optional[bool] = None, ) -> None: if self.get_current_volume() == 0: # Make sure we're at the top of the labware and clear of any @@ -137,7 +136,6 @@ def dispense( flow_rate: float, in_place: bool, push_out: Optional[float], - is_meniscus: Optional[bool] = None, ) -> None: if isinstance(location, (TrashBin, WasteChute)): raise APIVersionError( diff --git a/api/src/opentrons/protocol_api/instrument_context.py b/api/src/opentrons/protocol_api/instrument_context.py index 9c6338270c7..4cca7d7655e 100644 --- a/api/src/opentrons/protocol_api/instrument_context.py +++ b/api/src/opentrons/protocol_api/instrument_context.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging from contextlib import ExitStack -from typing import Any, List, Optional, Sequence, Union, cast, Dict +from typing import Any, List, Optional, Sequence, Union, cast, Dict, Tuple, Literal from opentrons_shared_data.errors.exceptions import ( CommandPreconditionViolated, CommandParameterLimitViolated, @@ -172,6 +172,7 @@ def aspirate( volume: Optional[float] = None, location: Optional[Union[types.Location, labware.Well]] = None, rate: float = 1.0, + # meniscus_tracking: Optional[Literal["start", "end", "dynamic_meniscus"]] = None ) -> InstrumentContext: """ Draw liquid into a pipette tip. @@ -207,6 +208,11 @@ def aspirate( as ``rate`` multiplied by :py:attr:`flow_rate.aspirate `. If not specified, defaults to 1.0. See :ref:`new-plunger-flow-rates`. + :param meniscus_tracking: Whether to aspirate relative to the liquid meniscus + "start" - aspirate from the current liquid meniscus at the start of the operation + "end" - aspirate from the projected liquid meniscus at the end of the operation + "dynamic_meniscus" - move the pipette down while aspirating to maintain a given depth relative + to the liquid meniscus. :type rate: float :returns: This instance. @@ -226,7 +232,6 @@ def aspirate( move_to_location: types.Location well: Optional[labware.Well] = None - is_meniscus: Optional[bool] = None last_location = self._get_last_location_by_api_version() try: target = validation.validate_location( @@ -244,7 +249,7 @@ def aspirate( raise ValueError( "Trash Bin and Waste Chute are not acceptable location parameters for Aspirate commands." ) - move_to_location, well, is_meniscus = self._handle_aspirate_target( + move_to_location, well, meniscus_tracking = self._handle_aspirate_target( target=target ) if self.api_version >= APIVersion(2, 11): @@ -287,7 +292,259 @@ def aspirate( rate=rate, flow_rate=flow_rate, in_place=target.in_place, - is_meniscus=is_meniscus, + meniscus_tracking=meniscus_tracking, + ) + + return self + + @requires_version(2, 0) + def aspirate_while_tracking( + self, + volume: Optional[float] = None, + location: Optional[Union[types.Location, labware.Well]] = None, + rate: float = 1.0, + ) -> InstrumentContext: + """ + Draw liquid into a pipette tip. + + See :ref:`new-aspirate` for more details and examples. + + :param volume: The volume to aspirate, measured in µL. If unspecified, + defaults to the maximum volume for the pipette and its currently + attached tip. + + If ``aspirate`` is called with a volume of precisely 0, its behavior + depends on the API level of the protocol. On API levels below 2.16, + it will behave the same as a volume of ``None``/unspecified: aspirate + until the pipette is full. On API levels at or above 2.16, no liquid + will be aspirated. + :type volume: int or float + :param location: Tells the robot where to aspirate from. The location can be + a :py:class:`.Well` or a :py:class:`.Location`. + + - If the location is a ``Well``, the robot will aspirate at + or above the bottom center of the well. The distance (in mm) + from the well bottom is specified by + :py:obj:`well_bottom_clearance.aspirate + `. + + - If the location is a ``Location`` (e.g., the result of + :py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the robot + will aspirate from that specified position. + + - If the ``location`` is unspecified, the robot will + aspirate from its current position. + :param rate: A multiplier for the default flow rate of the pipette. Calculated + as ``rate`` multiplied by :py:attr:`flow_rate.aspirate + `. If not specified, defaults to 1.0. See + :ref:`new-plunger-flow-rates`. + :type rate: float + :returns: This instance. + + .. note:: + + If ``aspirate`` is called with a single, unnamed argument, it will treat + that argument as ``volume``. If you want to call ``aspirate`` with only + ``location``, specify it as a keyword argument: + ``pipette.aspirate(location=plate['A1'])`` + + """ + _log.debug( + "aspirate {} from {} at {}".format( + volume, location if location else "current position", rate + ) + ) + + move_to_location: types.Location + well: Optional[labware.Well] = None + last_location = self._get_last_location_by_api_version() + try: + target = validation.validate_location( + location=location, last_location=last_location + ) + except validation.NoLocationError as e: + raise RuntimeError( + "If aspirate is called without an explicit location, another" + " method that moves to a location (such as move_to or " + "dispense) must previously have been called so the robot " + "knows where it is." + ) from e + + if isinstance(target, (TrashBin, WasteChute)): + raise ValueError( + "Trash Bin and Waste Chute are not acceptable location parameters for Aspirate commands." + ) + move_to_location, well, meniscus_tracking = self._handle_aspirate_target( + target=target + ) + if self.api_version >= APIVersion(2, 11): + instrument.validate_takes_liquid( + location=move_to_location, + reject_module=self.api_version >= APIVersion(2, 13), + reject_adapter=self.api_version >= APIVersion(2, 15), + ) + + if self.api_version >= APIVersion(2, 16): + c_vol = self._core.get_available_volume() if volume is None else volume + else: + c_vol = self._core.get_available_volume() if not volume else volume + flow_rate = self._core.get_aspirate_flow_rate(rate) + + if ( + self.api_version >= APIVersion(2, 20) + and well is not None + and self.liquid_presence_detection + and self._core.nozzle_configuration_valid_for_lld() + and self._core.get_current_volume() == 0 + ): + self._raise_if_pressure_not_supported_by_pipette() + self.require_liquid_presence(well=well) + + with publisher.publish_context( + broker=self.broker, + command=cmds.aspirate( + instrument=self, + volume=c_vol, + location=move_to_location, + flow_rate=flow_rate, + rate=rate, + ), + ): + self._core.aspirate( + location=move_to_location, + well_core=well._core if well is not None else None, + volume=c_vol, + rate=rate, + flow_rate=flow_rate, + in_place=target.in_place, + meniscus_tracking=meniscus_tracking, + ) + + return self + + @requires_version(2, 0) + def dispense_while_tracking( + self, + volume: Optional[float] = None, + location: Optional[Union[types.Location, labware.Well]] = None, + rate: float = 1.0, + push_out: Optional[float] = None, + ) -> InstrumentContext: + """ + Draw liquid into a pipette tip. + + See :ref:`new-dispense` for more details and examples. + + :param volume: The volume to dispense, measured in µL. If unspecified, + defaults to the maximum volume for the pipette and its currently + attached tip. + + If ``dispense`` is called with a volume of precisely 0, its behavior + depends on the API level of the protocol. On API levels below 2.16, + it will behave the same as a volume of ``None``/unspecified: dispense + until the pipette is full. On API levels at or above 2.16, no liquid + will be dispensed. + :type volume: int or float + :param location: Tells the robot where to dispense from. The location can be + a :py:class:`.Well` or a :py:class:`.Location`. + + - If the location is a ``Well``, the robot will dispense at + or above the bottom center of the well. The distance (in mm) + from the well bottom is specified by + :py:obj:`well_bottom_clearance.dispense + `. + + - If the location is a ``Location`` (e.g., the result of + :py:meth:`.Well.top` or :py:meth:`.Well.bottom`), the robot + will dispense from that specified position. + + - If the ``location`` is unspecified, the robot will + dispense from its current position. + :param rate: A multiplier for the default flow rate of the pipette. Calculated + as ``rate`` multiplied by :py:attr:`flow_rate.dispense + `. If not specified, defaults to 1.0. See + :ref:`new-plunger-flow-rates`. + :type rate: float + :returns: This instance. + + .. note:: + + If ``dispense`` is called with a single, unnamed argument, it will treat + that argument as ``volume``. If you want to call ``dispense`` with only + ``location``, specify it as a keyword argument: + ``pipette.dispense(location=plate['A1'])`` + + """ + _log.debug( + "dispense {} from {} at {}".format( + volume, location if location else "current position", rate + ) + ) + + move_to_location: types.Location + well: Optional[labware.Well] = None + last_location = self._get_last_location_by_api_version() + try: + target = validation.validate_location( + location=location, last_location=last_location + ) + except validation.NoLocationError as e: + raise RuntimeError( + "If dispense is called without an explicit location, another" + " method that moves to a location (such as move_to or " + "dispense) must previously have been called so the robot " + "knows where it is." + ) from e + + if isinstance(target, (TrashBin, WasteChute)): + raise ValueError( + "Trash Bin and Waste Chute are not acceptable location parameters for dispense commands." + ) + move_to_location, well, meniscus_tracking = self._handle_dispense_target( + target=target + ) + if self.api_version >= APIVersion(2, 11): + instrument.validate_takes_liquid( + location=move_to_location, + reject_module=self.api_version >= APIVersion(2, 13), + reject_adapter=self.api_version >= APIVersion(2, 15), + ) + + if self.api_version >= APIVersion(2, 16): + c_vol = self._core.get_available_volume() if volume is None else volume + else: + c_vol = self._core.get_available_volume() if not volume else volume + flow_rate = self._core.get_dispense_flow_rate(rate) + + if ( + self.api_version >= APIVersion(2, 20) + and well is not None + and self.liquid_presence_detection + and self._core.nozzle_configuration_valid_for_lld() + and self._core.get_current_volume() == 0 + ): + self._raise_if_pressure_not_supported_by_pipette() + self.require_liquid_presence(well=well) + + with publisher.publish_context( + broker=self.broker, + command=cmds.dispense( + instrument=self, + volume=c_vol, + location=move_to_location, + flow_rate=flow_rate, + rate=rate, + ), + ): + self._core.dispense( + location=move_to_location, + well_core=well._core if well is not None else None, + volume=c_vol, + rate=rate, + flow_rate=flow_rate, + in_place=target.in_place, + push_out=push_out, + # meniscus_tracking=meniscus_tracking, ) return self @@ -389,8 +646,7 @@ def dispense( # noqa: C901 volume, location if location else "current position", rate ) ) - well: Optional[labware.Well] = None - is_meniscus: Optional[bool] = None + # well: Optional[labware.Well] = None last_location = self._get_last_location_by_api_version() try: @@ -405,19 +661,9 @@ def dispense( # noqa: C901 "knows where it is." ) from e - if isinstance(target, validation.WellTarget): - well = target.well - if target.location: - move_to_location = target.location - is_meniscus = target.location.is_meniscus - elif well.parent._core.is_fixed_trash(): - move_to_location = target.well.top() - else: - move_to_location = target.well.bottom( - z=self._well_bottom_clearances.dispense - ) - if isinstance(target, validation.PointTarget): - move_to_location = target.location + move_to_location, well, meniscus_tracking = self._handle_dispense_target( + target=target + ) if self.api_version >= APIVersion(2, 11) and not isinstance( target, (TrashBin, WasteChute) @@ -454,6 +700,7 @@ def dispense( # noqa: C901 flow_rate=flow_rate, in_place=False, push_out=push_out, + meniscus_tracking=None, ) return self @@ -475,7 +722,7 @@ def dispense( # noqa: C901 flow_rate=flow_rate, in_place=target.in_place, push_out=push_out, - is_meniscus=is_meniscus, + meniscus_tracking=meniscus_tracking, ) return self @@ -2282,8 +2529,32 @@ def measure_liquid_height(self, well: labware.Well) -> float: self._raise_if_pressure_not_supported_by_pipette() loc = well.top() height = self._core.liquid_probe_without_recovery(well._core, loc) + # return current projected volume, height of liquid found, projected height after aspirate return height + @requires_version(2, 20) + def hw_testing_liquid_probe( + self, well: labware.Well, operation_volume: float + ) -> Tuple[float, float, float]: + """Check the height of the liquid within a well. + + :returns: The height, in mm, of the liquid from the deck. + + :meta private: + + This is intended for Opentrons internal use only and is not a guaranteed API. + """ + self._raise_if_pressure_not_supported_by_pipette() + loc = well.top() + ( + current_height, + estimated_current_volume, + projected_final_height, + ) = self._core.liquid_probe_testing_data( + well_core=well._core, loc=loc, operation_volume=operation_volume + ) + return current_height, estimated_current_volume, projected_final_height + def _raise_if_configuration_not_supported_by_pipette( self, style: NozzleLayout ) -> None: @@ -2308,15 +2579,15 @@ def _raise_if_pressure_not_supported_by_pipette(self) -> None: def _handle_aspirate_target( self, target: validation.ValidTarget - ) -> tuple[types.Location, Optional[labware.Well], Optional[bool]]: + ) -> tuple[types.Location, Optional[labware.Well], Optional[types.MeniscusTracking]]: move_to_location: types.Location well: Optional[labware.Well] = None - is_meniscus: Optional[bool] = None + meniscus_tracking: Optional[types.MeniscusTracking] = None if isinstance(target, validation.WellTarget): well = target.well if target.location: move_to_location = target.location - is_meniscus = target.location.is_meniscus + meniscus_tracking = target.location.meniscus_tracking else: move_to_location = target.well.bottom( @@ -2324,7 +2595,28 @@ def _handle_aspirate_target( ) if isinstance(target, validation.PointTarget): move_to_location = target.location - return (move_to_location, well, is_meniscus) + return move_to_location, well, meniscus_tracking + + def _handle_dispense_target( + self, target: validation.ValidTarget + ) -> tuple[types.Location, Optional[labware.Well], Optional[types.MeniscusTracking]]: + move_to_location: types.Location + well: Optional[labware.Well] = None + meniscus_tracking: Optional[types.MeniscusTracking] = None + if isinstance(target, validation.WellTarget): + well = target.well + if target.location: + move_to_location = target.location + meniscus_tracking = target.location.meniscus_tracking + elif well.parent._core.is_fixed_trash(): + move_to_location = target.well.top() + else: + move_to_location = target.well.bottom( + z=self._well_bottom_clearances.dispense + ) + if isinstance(target, validation.PointTarget): + move_to_location = target.location + return move_to_location, well, meniscus_tracking class AutoProbeDisable: diff --git a/api/src/opentrons/protocol_api/labware.py b/api/src/opentrons/protocol_api/labware.py index bb8a094e4c2..778cf8cf106 100644 --- a/api/src/opentrons/protocol_api/labware.py +++ b/api/src/opentrons/protocol_api/labware.py @@ -24,11 +24,12 @@ cast, Sequence, Mapping, + Literal, ) from opentrons_shared_data.labware.types import LabwareDefinition, LabwareParameters -from opentrons.types import Location, Point, NozzleMapInterface +from opentrons.types import Location, Point, NozzleMapInterface, MeniscusTracking from opentrons.protocols.api_support.types import APIVersion from opentrons.protocols.api_support.util import ( requires_version, @@ -233,16 +234,21 @@ def center(self) -> Location: return Location(self._core.get_center(), self) @requires_version(2, 21) - def meniscus(self, z: float = 0.0) -> Location: + def meniscus( + self, target: Literal["beginning", "end", "dynamic_meniscus"], z: float = 0.0 + ) -> Location: """ :param z: An offset on the z-axis, in mm. Positive offsets are higher and negative offsets are lower. + :param target: The relative position inside the well to target with respect to a liquid handling operation. :return: A :py:class:`~opentrons.types.Location` that indicates location is meniscus and that holds the ``z`` offset in its point.z field. :meta private: """ return Location( - point=Point(x=0, y=0, z=z), labware=self, _ot_internal_is_meniscus=True + point=Point(x=0, y=0, z=z), + labware=self, + _meniscus_tracking=MeniscusTracking(target=target), ) @requires_version(2, 8) diff --git a/api/src/opentrons/protocol_engine/commands/__init__.py b/api/src/opentrons/protocol_engine/commands/__init__.py index 4ad91012b11..acb7d74b69f 100644 --- a/api/src/opentrons/protocol_engine/commands/__init__.py +++ b/api/src/opentrons/protocol_engine/commands/__init__.py @@ -60,6 +60,14 @@ AspirateCommandType, ) +from .aspirate_while_tracking import ( + AspirateWhileTracking, + AspirateWhileTrackingParams, + AspirateWhileTrackingCreate, + AspirateWhileTrackingResult, + AspirateWhileTrackingCommandType, +) + from .aspirate_in_place import ( AspirateInPlace, AspirateInPlaceParams, @@ -92,6 +100,14 @@ DispenseCommandType, ) +from .dispense_while_tracking import ( + DispenseWhileTracking, + DispenseWhileTrackingParams, + DispenseWhileTrackingCreate, + DispenseWhileTrackingResult, + DispenseWhileTrackingCommandType, +) + from .dispense_in_place import ( DispenseInPlace, DispenseInPlaceParams, @@ -413,6 +429,12 @@ "AspirateParams", "AspirateResult", "AspirateCommandType", + # aspirate while tracking command models + "AspirateWhileTracking", + "AspirateWhileTrackingCreate", + "AspirateWhileTrackingParams", + "AspirateWhileTrackingResult", + "AspirateWhileTrackingCommandType", # aspirate in place command models "AspirateInPlace", "AspirateInPlaceCreate", @@ -437,6 +459,12 @@ "DispenseParams", "DispenseResult", "DispenseCommandType", + # dispense while tracking command models + "DispenseWhileTracking", + "DispenseWhileTrackingCreate", + "DispenseWhileTrackingParams", + "DispenseWhileTrackingResult", + "DispenseWhileTrackingCommandType", # dispense in place command models "DispenseInPlace", "DispenseInPlaceCreate", diff --git a/api/src/opentrons/protocol_engine/commands/aspirate.py b/api/src/opentrons/protocol_engine/commands/aspirate.py index 9664d733b8a..b07cd522f93 100644 --- a/api/src/opentrons/protocol_engine/commands/aspirate.py +++ b/api/src/opentrons/protocol_engine/commands/aspirate.py @@ -47,7 +47,10 @@ class AspirateParams( - PipetteIdMixin, AspirateVolumeMixin, FlowRateMixin, LiquidHandlingWellLocationMixin + PipetteIdMixin, + AspirateVolumeMixin, + FlowRateMixin, + LiquidHandlingWellLocationMixin, ): """Parameters required to aspirate from a specific well.""" diff --git a/api/src/opentrons/protocol_engine/commands/aspirate_while_tracking.py b/api/src/opentrons/protocol_engine/commands/aspirate_while_tracking.py new file mode 100644 index 00000000000..5ea29af8f6e --- /dev/null +++ b/api/src/opentrons/protocol_engine/commands/aspirate_while_tracking.py @@ -0,0 +1,249 @@ +"""Aspirate command request, result, and implementation models.""" + +from __future__ import annotations +from typing import TYPE_CHECKING, Optional, Type, Union +from typing_extensions import Literal + +from .pipetting_common import ( + OverpressureError, + PipetteIdMixin, + AspirateVolumeMixin, + FlowRateMixin, + BaseLiquidHandlingResult, + aspirate_while_tracking, + prepare_for_aspirate, +) +from .movement_common import ( + LiquidHandlingWellLocationMixin, + DestinationPositionResult, + StallOrCollisionError, + move_to_well, +) +from .command import ( + AbstractCommandImpl, + BaseCommand, + BaseCommandCreate, + DefinedErrorData, + SuccessData, +) + +from opentrons.hardware_control import HardwareControlAPI + +from ..state.update_types import StateUpdate, CLEAR +from ..types import ( + WellLocation, + WellOrigin, + CurrentWell, +) + +if TYPE_CHECKING: + from ..execution import MovementHandler, PipettingHandler + from ..resources import ModelUtils + from ..state.state import StateView + from ..notes import CommandNoteAdder + + +AspirateWhileTrackingCommandType = Literal["aspirateWhileTracking"] + + +class AspirateWhileTrackingParams( + PipetteIdMixin, + AspirateVolumeMixin, + FlowRateMixin, + LiquidHandlingWellLocationMixin, +): + """Parameters required to aspirate from a specific well.""" + + pass + + +class AspirateWhileTrackingResult(BaseLiquidHandlingResult, DestinationPositionResult): + """Result data from execution of an Aspirate command.""" + + pass + + +_ExecuteReturn = Union[ + SuccessData[AspirateWhileTrackingResult], + DefinedErrorData[OverpressureError] | DefinedErrorData[StallOrCollisionError], +] + + +class AspirateWhileTrackingImplementation( + AbstractCommandImpl[AspirateWhileTrackingParams, _ExecuteReturn] +): + """AspirateWhileTracking command implementation.""" + + def __init__( + self, + pipetting: PipettingHandler, + state_view: StateView, + hardware_api: HardwareControlAPI, + movement: MovementHandler, + command_note_adder: CommandNoteAdder, + model_utils: ModelUtils, + **kwargs: object, + ) -> None: + self._pipetting = pipetting + self._state_view = state_view + self._hardware_api = hardware_api + self._movement = movement + self._command_note_adder = command_note_adder + self._model_utils = model_utils + + async def execute(self, params: AspirateWhileTrackingParams) -> _ExecuteReturn: + """Move to and aspirate from the requested well. + + Raises: + TipNotAttachedError: if no tip is attached to the pipette. + """ + pipette_id = params.pipetteId + labware_id = params.labwareId + well_name = params.wellName + well_location = params.wellLocation + + state_update = StateUpdate() + + final_location = self._state_view.geometry.get_well_position( + labware_id=labware_id, + well_name=well_name, + well_location=well_location, + operation_volume=-params.volume, + pipette_id=pipette_id, + ) + + ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate( + pipette_id=pipette_id + ) + + current_well = None + + if not ready_to_aspirate: + move_result = await move_to_well( + movement=self._movement, + model_utils=self._model_utils, + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + well_location=WellLocation(origin=WellOrigin.TOP), + ) + state_update.append(move_result.state_update) + if isinstance(move_result, DefinedErrorData): + return DefinedErrorData(move_result.public, state_update=state_update) + + prepare_result = await prepare_for_aspirate( + pipette_id=pipette_id, + pipetting=self._pipetting, + model_utils=self._model_utils, + # Note that the retryLocation is the final location, inside the liquid, + # because that's where we'd want the client to try re-aspirating if this + # command fails and the run enters error recovery. + location_if_error={"retryLocation": final_location}, + ) + state_update.append(prepare_result.state_update) + if isinstance(prepare_result, DefinedErrorData): + return DefinedErrorData( + public=prepare_result.public, state_update=state_update + ) + + # set our current deck location to the well now that we've made + # an intermediate move for the "prepare for aspirate" step + current_well = CurrentWell( + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + ) + move_result = await move_to_well( + movement=self._movement, + model_utils=self._model_utils, + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + well_location=well_location, + current_well=current_well, + ) + state_update.append(move_result.state_update) + if isinstance(move_result, DefinedErrorData): + return DefinedErrorData( + public=move_result.public, state_update=state_update + ) + + aspirate_result = await aspirate_while_tracking( + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + volume=params.volume, + flow_rate=params.flowRate, + location_if_error={ + "retryLocation": ( + move_result.public.position.x, + move_result.public.position.y, + move_result.public.position.z, + ) + }, + command_note_adder=self._command_note_adder, + pipetting=self._pipetting, + model_utils=self._model_utils, + ) + state_update.append(aspirate_result.state_update) + if isinstance(aspirate_result, DefinedErrorData): + state_update.set_liquid_operated( + labware_id=labware_id, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( + labware_id, + well_name, + params.pipetteId, + ), + volume_added=CLEAR, + ) + return DefinedErrorData( + public=aspirate_result.public, state_update=state_update + ) + + state_update.set_liquid_operated( + labware_id=labware_id, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( + labware_id, well_name, pipette_id + ), + volume_added=-aspirate_result.public.volume + * self._state_view.geometry.get_nozzles_per_well( + labware_id, + well_name, + params.pipetteId, + ), + ) + + return SuccessData( + public=AspirateWhileTrackingResult( + volume=aspirate_result.public.volume, + position=move_result.public.position, + ), + state_update=state_update, + ) + + +class AspirateWhileTracking( + BaseCommand[ + AspirateWhileTrackingParams, + AspirateWhileTrackingResult, + OverpressureError | StallOrCollisionError, + ] +): + """AspirateWhileTracking command model.""" + + commandType: AspirateWhileTrackingCommandType = "aspirateWhileTracking" + params: AspirateWhileTrackingParams + result: Optional[AspirateWhileTrackingResult] = None + + _ImplementationCls: Type[ + AspirateWhileTrackingImplementation + ] = AspirateWhileTrackingImplementation + + +class AspirateWhileTrackingCreate(BaseCommandCreate[AspirateWhileTrackingParams]): + """Create aspirate command request model.""" + + commandType: AspirateWhileTrackingCommandType = "aspirateWhileTracking" + params: AspirateWhileTrackingParams + + _CommandCls: Type[AspirateWhileTracking] = AspirateWhileTracking diff --git a/api/src/opentrons/protocol_engine/commands/command_unions.py b/api/src/opentrons/protocol_engine/commands/command_unions.py index b04b381ae6b..72151002811 100644 --- a/api/src/opentrons/protocol_engine/commands/command_unions.py +++ b/api/src/opentrons/protocol_engine/commands/command_unions.py @@ -57,6 +57,14 @@ AspirateInPlaceCommandType, ) +from .aspirate_while_tracking import ( + AspirateWhileTracking, + AspirateWhileTrackingParams, + AspirateWhileTrackingCreate, + AspirateWhileTrackingResult, + AspirateWhileTrackingCommandType +) + from .comment import ( Comment, CommentParams, @@ -81,6 +89,14 @@ DispenseCommandType, ) +from .dispense_while_tracking import ( + DispenseWhileTracking, + DispenseWhileTrackingParams, + DispenseWhileTrackingCreate, + DispenseWhileTrackingResult, + DispenseWhileTrackingCommandType +) + from .dispense_in_place import ( DispenseInPlace, DispenseInPlaceParams, @@ -365,10 +381,12 @@ AirGapInPlace, Aspirate, AspirateInPlace, + AspirateWhileTracking, Comment, Custom, Dispense, DispenseInPlace, + DispenseWhileTracking, BlowOut, BlowOutInPlace, ConfigureForVolume, @@ -452,6 +470,7 @@ CommandParams = Union[ AirGapInPlaceParams, AspirateParams, + AspirateWhileTrackingParams, AspirateInPlaceParams, CommentParams, ConfigureForVolumeParams, @@ -459,6 +478,7 @@ CustomParams, DispenseParams, DispenseInPlaceParams, + DispenseWhileTrackingParams, BlowOutParams, BlowOutInPlaceParams, DropTipParams, @@ -538,6 +558,7 @@ CommandType = Union[ AirGapInPlaceCommandType, AspirateCommandType, + AspirateWhileTrackingCommandType, AspirateInPlaceCommandType, CommentCommandType, ConfigureForVolumeCommandType, @@ -545,6 +566,7 @@ CustomCommandType, DispenseCommandType, DispenseInPlaceCommandType, + DispenseWhileTrackingCommandType, BlowOutCommandType, BlowOutInPlaceCommandType, DropTipCommandType, @@ -625,6 +647,7 @@ Union[ AirGapInPlaceCreate, AspirateCreate, + AspirateWhileTrackingCreate, AspirateInPlaceCreate, CommentCreate, ConfigureForVolumeCreate, @@ -632,6 +655,7 @@ CustomCreate, DispenseCreate, DispenseInPlaceCreate, + DispenseWhileTrackingCreate, BlowOutCreate, BlowOutInPlaceCreate, DropTipCreate, @@ -720,6 +744,7 @@ CommandResult = Union[ AirGapInPlaceResult, AspirateResult, + AspirateWhileTrackingResult, AspirateInPlaceResult, CommentResult, ConfigureForVolumeResult, @@ -727,6 +752,7 @@ CustomResult, DispenseResult, DispenseInPlaceResult, + DispenseWhileTrackingResult, BlowOutResult, BlowOutInPlaceResult, DropTipResult, diff --git a/api/src/opentrons/protocol_engine/commands/dispense.py b/api/src/opentrons/protocol_engine/commands/dispense.py index 8ad2365ccb5..3b4b84f6b75 100644 --- a/api/src/opentrons/protocol_engine/commands/dispense.py +++ b/api/src/opentrons/protocol_engine/commands/dispense.py @@ -45,7 +45,10 @@ def _remove_default(s: dict[str, Any]) -> None: class DispenseParams( - PipetteIdMixin, DispenseVolumeMixin, FlowRateMixin, LiquidHandlingWellLocationMixin + PipetteIdMixin, + DispenseVolumeMixin, + FlowRateMixin, + LiquidHandlingWellLocationMixin, ): """Payload required to dispense to a specific well.""" @@ -89,7 +92,6 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn: well_location = params.wellLocation labware_id = params.labwareId well_name = params.wellName - volume = params.volume # TODO(pbm, 10-15-24): call self._state_view.geometry.validate_dispense_volume_into_well() @@ -105,7 +107,7 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn: return move_result dispense_result = await dispense_in_place( pipette_id=params.pipetteId, - volume=volume, + volume=params.volume, flow_rate=params.flowRate, push_out=params.pushOut, location_if_error={ diff --git a/api/src/opentrons/protocol_engine/commands/dispense_while_tracking.py b/api/src/opentrons/protocol_engine/commands/dispense_while_tracking.py new file mode 100644 index 00000000000..de4904f5295 --- /dev/null +++ b/api/src/opentrons/protocol_engine/commands/dispense_while_tracking.py @@ -0,0 +1,202 @@ +"""Dispense command request, result, and implementation models.""" + +from __future__ import annotations +from typing import TYPE_CHECKING, Optional, Type, Union, Any +from typing_extensions import Literal + + +from pydantic import Field +from pydantic.json_schema import SkipJsonSchema + +from ..state.update_types import StateUpdate, CLEAR +from .pipetting_common import ( + PipetteIdMixin, + DispenseVolumeMixin, + FlowRateMixin, + BaseLiquidHandlingResult, + OverpressureError, + dispense_while_tracking, +) +from .movement_common import ( + LiquidHandlingWellLocationMixin, + DestinationPositionResult, + StallOrCollisionError, + move_to_well, +) +from .command import ( + AbstractCommandImpl, + BaseCommand, + BaseCommandCreate, + DefinedErrorData, + SuccessData, +) + +if TYPE_CHECKING: + from ..execution import MovementHandler, PipettingHandler + from ..resources import ModelUtils + from ..state.state import StateView + + +DispenseWhileTrackingCommandType = Literal["dispenseWhileTracking"] + + +def _remove_default(s: dict[str, Any]) -> None: + s.pop("default", None) + + +class DispenseWhileTrackingParams( + PipetteIdMixin, + DispenseVolumeMixin, + FlowRateMixin, + LiquidHandlingWellLocationMixin, +): + """Payload required to dispense to a specific well.""" + + pushOut: float | SkipJsonSchema[None] = Field( + None, + description="push the plunger a small amount farther than necessary for accurate low-volume dispensing", + json_schema_extra=_remove_default, + ) + + +class DispenseWhileTrackingResult(BaseLiquidHandlingResult, DestinationPositionResult): + """Result data from the execution of a Dispense command.""" + + pass + + +_ExecuteReturn = Union[ + SuccessData[DispenseWhileTrackingResult], + DefinedErrorData[OverpressureError] | DefinedErrorData[StallOrCollisionError], +] + + +class DispenseWhileTrackingImplementation( + AbstractCommandImpl[DispenseWhileTrackingParams, _ExecuteReturn] +): + """Dispense command implementation.""" + + def __init__( + self, + state_view: StateView, + movement: MovementHandler, + pipetting: PipettingHandler, + model_utils: ModelUtils, + **kwargs: object, + ) -> None: + self._state_view = state_view + self._movement = movement + self._pipetting = pipetting + self._model_utils = model_utils + + async def execute(self, params: DispenseWhileTrackingParams) -> _ExecuteReturn: + """Move to and dispense to the requested well.""" + well_location = params.wellLocation + labware_id = params.labwareId + well_name = params.wellName + + # TODO(pbm, 10-15-24): call self._state_view.geometry.validate_dispense_volume_into_well() + + move_result = await move_to_well( + movement=self._movement, + model_utils=self._model_utils, + pipette_id=params.pipetteId, + labware_id=labware_id, + well_name=well_name, + well_location=well_location, + ) + if isinstance(move_result, DefinedErrorData): + return move_result + dispense_result = await dispense_while_tracking( + pipette_id=params.pipetteId, + labware_id=labware_id, + well_name=well_name, + volume=params.volume, + flow_rate=params.flowRate, + push_out=params.pushOut, + location_if_error={ + "retryLocation": ( + move_result.public.position.x, + move_result.public.position.y, + move_result.public.position.z, + ) + }, + pipetting=self._pipetting, + model_utils=self._model_utils, + ) + + if isinstance(dispense_result, DefinedErrorData): + return DefinedErrorData( + public=dispense_result.public, + state_update=( + StateUpdate.reduce( + move_result.state_update, dispense_result.state_update + ).set_liquid_operated( + labware_id=labware_id, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( + labware_id, well_name, params.pipetteId + ), + volume_added=CLEAR, + ) + ), + state_update_if_false_positive=StateUpdate.reduce( + move_result.state_update, + dispense_result.state_update_if_false_positive, + ), + ) + else: + volume_added = ( + self._state_view.pipettes.get_liquid_dispensed_by_ejecting_volume( + pipette_id=params.pipetteId, volume=dispense_result.public.volume + ) + ) + if volume_added is not None: + volume_added *= self._state_view.geometry.get_nozzles_per_well( + labware_id, well_name, params.pipetteId + ) + return SuccessData( + public=DispenseWhileTrackingResult( + volume=dispense_result.public.volume, + position=move_result.public.position, + ), + state_update=( + StateUpdate.reduce( + move_result.state_update, dispense_result.state_update + ).set_liquid_operated( + labware_id=labware_id, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( + labware_id, well_name, params.pipetteId + ), + volume_added=volume_added + if volume_added is not None + else CLEAR, + ) + ), + ) + + +class DispenseWhileTracking( + BaseCommand[ + DispenseWhileTrackingParams, + DispenseWhileTrackingResult, + OverpressureError | StallOrCollisionError, + ] +): + """Dispense command model.""" + + commandType: DispenseWhileTrackingCommandType = "dispenseWhileTracking" + params: DispenseWhileTrackingParams + result: Optional[DispenseWhileTrackingResult] = None + + _ImplementationCls: Type[ + DispenseWhileTrackingImplementation + ] = DispenseWhileTrackingImplementation + + +class DispenseWhileTrackingCreate(BaseCommandCreate[DispenseWhileTrackingParams]): + """Create dispense command request model.""" + + commandType: DispenseWhileTrackingCommandType = "dispenseWhileTracking" + params: DispenseWhileTrackingParams + + _CommandCls: Type[DispenseWhileTracking] = DispenseWhileTracking diff --git a/api/src/opentrons/protocol_engine/commands/movement_common.py b/api/src/opentrons/protocol_engine/commands/movement_common.py index babf70b29d9..5cff8d5f358 100644 --- a/api/src/opentrons/protocol_engine/commands/movement_common.py +++ b/api/src/opentrons/protocol_engine/commands/movement_common.py @@ -61,7 +61,7 @@ class LiquidHandlingWellLocationMixin(BaseModel): ) wellLocation: LiquidHandlingWellLocation = Field( default_factory=LiquidHandlingWellLocation, - description="Relative well location at which to perform the operation", + description="Relative wwellell location at which to perform the operation", ) diff --git a/api/src/opentrons/protocol_engine/commands/pipetting_common.py b/api/src/opentrons/protocol_engine/commands/pipetting_common.py index c373642a02e..6740a4babb3 100644 --- a/api/src/opentrons/protocol_engine/commands/pipetting_common.py +++ b/api/src/opentrons/protocol_engine/commands/pipetting_common.py @@ -217,6 +217,104 @@ async def aspirate_in_place( ) +async def aspirate_while_tracking( + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, + flow_rate: float, + location_if_error: ErrorLocationInfo, + command_note_adder: CommandNoteAdder, + pipetting: PipettingHandler, + model_utils: ModelUtils, +) -> SuccessData[BaseLiquidHandlingResult] | DefinedErrorData[OverpressureError]: + """Execute an aspirate while tracking microoperation.""" + try: + volume_aspirated = await pipetting.aspirate_while_tracking( + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + volume=volume, + flow_rate=flow_rate, + command_note_adder=command_note_adder, + ) + except PipetteOverpressureError as e: + return DefinedErrorData( + public=OverpressureError( + id=model_utils.generate_id(), + createdAt=model_utils.get_timestamp(), + wrappedErrors=[ + ErrorOccurrence.from_failed( + id=model_utils.generate_id(), + createdAt=model_utils.get_timestamp(), + error=e, + ) + ], + errorInfo=location_if_error, + ), + state_update=StateUpdate().set_fluid_unknown(pipette_id=pipette_id), + ) + else: + return SuccessData( + public=BaseLiquidHandlingResult( + volume=volume_aspirated, + ), + state_update=StateUpdate().set_fluid_aspirated( + pipette_id=pipette_id, + fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=volume_aspirated), + ), + ) + + +async def dispense_while_tracking( + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, + flow_rate: float, + push_out: float | None, + location_if_error: ErrorLocationInfo, + pipetting: PipettingHandler, + model_utils: ModelUtils, +) -> SuccessData[BaseLiquidHandlingResult] | DefinedErrorData[OverpressureError]: + """Execute an dispense while tracking microoperation.""" + try: + volume_dispensed = await pipetting.dispense_while_tracking( + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + volume=volume, + flow_rate=flow_rate, + push_out=push_out, + ) + except PipetteOverpressureError as e: + return DefinedErrorData( + public=OverpressureError( + id=model_utils.generate_id(), + createdAt=model_utils.get_timestamp(), + wrappedErrors=[ + ErrorOccurrence.from_failed( + id=model_utils.generate_id(), + createdAt=model_utils.get_timestamp(), + error=e, + ) + ], + errorInfo=location_if_error, + ), + state_update=StateUpdate().set_fluid_unknown(pipette_id=pipette_id), + ) + else: + return SuccessData( + public=BaseLiquidHandlingResult( + volume=volume_dispensed, + ), + state_update=StateUpdate().set_fluid_ejected( + pipette_id=pipette_id, + volume=volume_dispensed, + ), + ) + + async def dispense_in_place( pipette_id: str, volume: float, diff --git a/api/src/opentrons/protocol_engine/execution/pipetting.py b/api/src/opentrons/protocol_engine/execution/pipetting.py index 10d613e4dcf..4f850592b25 100644 --- a/api/src/opentrons/protocol_engine/execution/pipetting.py +++ b/api/src/opentrons/protocol_engine/execution/pipetting.py @@ -1,5 +1,5 @@ """Pipetting command handling.""" -from typing import Optional, Iterator +from typing import Optional, Iterator, Tuple from typing_extensions import Protocol as TypingProtocol from contextlib import contextmanager @@ -45,6 +45,28 @@ async def aspirate_in_place( ) -> float: """Set flow-rate and aspirate.""" + async def aspirate_while_tracking( + self, + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, + flow_rate: float, + command_note_adder: CommandNoteAdder, + ) -> float: + """Set flow-rate and aspirate while tracking.""" + + async def dispense_while_tracking( + self, + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, + flow_rate: float, + push_out: Optional[float], + ) -> float: + """Set flow-rate and dispense while tracking.""" + async def dispense_in_place( self, pipette_id: str, @@ -99,10 +121,48 @@ async def prepare_for_aspirate(self, pipette_id: str) -> None: hw_mount = self._state_view.pipettes.get_mount(pipette_id).to_hw_mount() await self._hardware_api.prepare_for_aspirate(mount=hw_mount) - async def aspirate_in_place( + def get_hw_aspirate_params( self, pipette_id: str, volume: float, + command_note_adder: CommandNoteAdder, + ) -> Tuple[HardwarePipette, float]: + """Get params for hardware aspirate.""" + _adjusted_volume = _validate_aspirate_volume( + state_view=self._state_view, + pipette_id=pipette_id, + aspirate_volume=volume, + command_note_adder=command_note_adder, + ) + _hw_pipette = self._state_view.pipettes.get_hardware_pipette( + pipette_id=pipette_id, + attached_pipettes=self._hardware_api.attached_instruments, + ) + return _hw_pipette, _adjusted_volume + + def get_hw_dispense_params( + self, + pipette_id: str, + volume: float, + ) -> Tuple[HardwarePipette, float]: + """Get params for hardware dispense.""" + _adjusted_volume = _validate_dispense_volume( + state_view=self._state_view, + pipette_id=pipette_id, + dispense_volume=volume, + ) + _hw_pipette = self._state_view.pipettes.get_hardware_pipette( + pipette_id=pipette_id, + attached_pipettes=self._hardware_api.attached_instruments, + ) + return _hw_pipette, _adjusted_volume + + async def aspirate_while_tracking( + self, + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, flow_rate: float, command_note_adder: CommandNoteAdder, ) -> float: @@ -112,15 +172,73 @@ async def aspirate_in_place( PipetteOverpressureError, propagated as-is from the hardware controller. """ # get mount and config data from state and hardware controller - adjusted_volume = _validate_aspirate_volume( - state_view=self._state_view, - pipette_id=pipette_id, - aspirate_volume=volume, - command_note_adder=command_note_adder, + hw_pipette, adjusted_volume = self.get_hw_aspirate_params( + pipette_id, volume, command_note_adder ) - hw_pipette = self._state_view.pipettes.get_hardware_pipette( - pipette_id=pipette_id, - attached_pipettes=self._hardware_api.attached_instruments, + + aspirate_z_distance = self._state_view.geometry.get_liquid_handling_z_change( + labware_id=labware_id, + well_name=well_name, + operation_volume=volume * -1, + ) + with self._set_flow_rate(pipette=hw_pipette, aspirate_flow_rate=flow_rate): + await self._hardware_api.aspirate_while_tracking( + mount=hw_pipette.mount, + z_distance=aspirate_z_distance, + flow_rate=flow_rate, + volume=adjusted_volume, + ) + # raise ValueError(f"adjusted volume = {adjusted_volume}\n volume = {volume}") + return adjusted_volume + + async def dispense_while_tracking( + self, + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, + flow_rate: float, + push_out: Optional[float], + ) -> float: + """Set flow-rate and dispense. + + Raises: + PipetteOverpressureError, propagated as-is from the hardware controller. + """ + # get mount and config data from state and hardware controller + hw_pipette, adjusted_volume = self.get_hw_dispense_params(pipette_id, volume) + + dispense_z_distance = self._state_view.geometry.get_liquid_handling_z_change( + labware_id=labware_id, + well_name=well_name, + operation_volume=volume, + ) + with self._set_flow_rate(pipette=hw_pipette, dispense_flow_rate=flow_rate): + await self._hardware_api.dispense_while_tracking( + mount=hw_pipette.mount, + z_distance=dispense_z_distance, + flow_rate=flow_rate, + volume=adjusted_volume, + push_out=push_out, + ) + + return adjusted_volume + + async def aspirate_in_place( + self, + pipette_id: str, + volume: float, + flow_rate: float, + command_note_adder: CommandNoteAdder, + ) -> float: + """Set flow-rate and aspirate. + + Raises: + PipetteOverpressureError, propagated as-is from the hardware controller. + """ + # get mount and config data from state and hardware controller + hw_pipette, adjusted_volume = self.get_hw_aspirate_params( + pipette_id, volume, command_note_adder ) with self._set_flow_rate(pipette=hw_pipette, aspirate_flow_rate=flow_rate): await self._hardware_api.aspirate( @@ -137,13 +255,7 @@ async def dispense_in_place( push_out: Optional[float], ) -> float: """Dispense liquid without moving the pipette.""" - adjusted_volume = _validate_dispense_volume( - state_view=self._state_view, pipette_id=pipette_id, dispense_volume=volume - ) - hw_pipette = self._state_view.pipettes.get_hardware_pipette( - pipette_id=pipette_id, - attached_pipettes=self._hardware_api.attached_instruments, - ) + hw_pipette, adjusted_volume = self.get_hw_dispense_params(pipette_id, volume) # TODO (tz, 8-23-23): add a check for push_out not larger that the max volume allowed when working on this https://opentrons.atlassian.net/browse/RSS-329 if push_out and push_out < 0: raise InvalidPushOutVolumeError( @@ -272,6 +384,7 @@ async def dispense_in_place( raise InvalidPushOutVolumeError( "push out value cannot have a negative value." ) + raise ValueError("why am i here") self._validate_tip_attached(pipette_id=pipette_id, command_name="dispense") return _validate_dispense_volume( state_view=self._state_view, pipette_id=pipette_id, dispense_volume=volume @@ -303,6 +416,47 @@ def _validate_tip_attached(self, pipette_id: str, command_name: str) -> None: f"Cannot perform {command_name} without a tip attached" ) + async def aspirate_while_tracking( + self, + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, + flow_rate: float, + command_note_adder: CommandNoteAdder, + ) -> float: + """Virtually aspirate (no-op).""" + self._validate_tip_attached(pipette_id=pipette_id, command_name="aspirate") + + return _validate_aspirate_volume( + state_view=self._state_view, + pipette_id=pipette_id, + aspirate_volume=volume, + command_note_adder=command_note_adder, + ) + + async def dispense_while_tracking( + self, + pipette_id: str, + labware_id: str, + well_name: str, + volume: float, + flow_rate: float, + push_out: Optional[float], + ) -> float: + """Virtually dispense (no-op).""" + # TODO (tz, 8-23-23): add a check for push_out not larger that the max volume allowed when working on this https://opentrons.atlassian.net/browse/RSS-329 + if push_out and push_out < 0: + raise InvalidPushOutVolumeError( + "push out value cannot have a negative value." + ) + # raise ValueError("here") + self._validate_tip_attached(pipette_id=pipette_id, command_name="dispense") + return _validate_dispense_volume( + state_view=self._state_view, pipette_id=pipette_id, dispense_volume=volume + ) + # return volume + def create_pipetting_handler( state_view: StateView, hardware_api: HardwareControlAPI diff --git a/api/src/opentrons/protocol_engine/state/frustum_helpers.py b/api/src/opentrons/protocol_engine/state/frustum_helpers.py index b28fb936be7..3c1fa867759 100644 --- a/api/src/opentrons/protocol_engine/state/frustum_helpers.py +++ b/api/src/opentrons/protocol_engine/state/frustum_helpers.py @@ -362,7 +362,9 @@ def find_volume_at_well_height( volumetric_capacity = get_well_volumetric_capacity(well_geometry) max_height = volumetric_capacity[-1][0] if target_height < 0 or target_height > max_height: - raise InvalidLiquidHeightFound("Invalid target height.") + # find where we fail bc height < lld minimum and kick off error recovery + # to allow the pipette to go to well.bottom() rather than probe in the future + raise InvalidLiquidHeightFound(f"Invalid target height {target_height}.") # volumes in volumetric_capacity are relative to each frustum, # so we have to find the volume of all the full sections enclosed # beneath the target height @@ -423,7 +425,9 @@ def find_height_at_well_volume( volumetric_capacity = get_well_volumetric_capacity(well_geometry) max_volume = sum(row[1] for row in volumetric_capacity) if target_volume < 0 or target_volume > max_volume: - raise InvalidLiquidHeightFound("Invalid target volume.") + raise InvalidLiquidHeightFound( + f"Invalid target volume {target_volume}, max vol {max_volume}." + ) sorted_well = sorted(well_geometry.sections, key=lambda section: section.topHeight) # find the section the target volume is in and compute the height diff --git a/api/src/opentrons/protocol_engine/state/geometry.py b/api/src/opentrons/protocol_engine/state/geometry.py index 7c725d88c62..19a34ee3fa4 100644 --- a/api/src/opentrons/protocol_engine/state/geometry.py +++ b/api/src/opentrons/protocol_engine/state/geometry.py @@ -1,13 +1,21 @@ """Geometry state getters.""" import enum + +# import logging from numpy import array, dot, double as npdouble from numpy.typing import NDArray from typing import Optional, List, Tuple, Union, cast, TypeVar, Dict from dataclasses import dataclass from functools import cached_property -from opentrons.types import Point, DeckSlotName, StagingSlotName, MountType +from opentrons.types import ( + Point, + DeckSlotName, + StagingSlotName, + MountType, + MeniscusTracking, +) from opentrons_shared_data.errors.exceptions import InvalidStoredData from opentrons_shared_data.labware.constants import WELL_NAME_PATTERN @@ -463,7 +471,8 @@ def validate_well_position( ) else: raise OperationLocationNotInWellError( - f"Specifying {well_location.origin} with an offset of {well_location.offset} and a volume offset of {well_location.volumeOffset} results in an operation location below the bottom of the well" + # f"Specifying {well_location.origin} with an offset of {well_location.offset} and a volume offset of {well_location.volumeOffset} results in an operation location below the bottom of the well" + f"well = {well_location}" ) def get_well_position( @@ -532,13 +541,13 @@ def get_relative_liquid_handling_well_location( labware_id: str, well_name: str, absolute_point: Point, - is_meniscus: Optional[bool] = None, + meniscus_tracking: Optional[MeniscusTracking] = None, ) -> LiquidHandlingWellLocation: """Given absolute position, get relative location of a well in a labware. If is_meniscus is True, absolute_point will hold the z-offset in its z field. """ - if is_meniscus: + if meniscus_tracking: return LiquidHandlingWellLocation( origin=WellOrigin.MENISCUS, offset=WellOffset(x=0, y=0, z=absolute_point.z), @@ -1404,6 +1413,24 @@ def get_offset_location(self, labware_id: str) -> Optional[LabwareOffsetLocation return None + def get_liquid_handling_z_change( + self, + labware_id: str, + well_name: str, + operation_volume: float, + ) -> float: + """Get the change in height from a liquid handling operation.""" + initial_handling_height = self.get_meniscus_height( + labware_id=labware_id, well_name=well_name + ) + final_height = self.get_well_height_after_volume( + labware_id=labware_id, + well_name=well_name, + initial_height=initial_handling_height, + volume=operation_volume, + ) + return final_height - initial_handling_height + def get_well_offset_adjustment( self, labware_id: str, @@ -1423,6 +1450,11 @@ def get_well_offset_adjustment( well_location=well_location, well_depth=well_depth, ) + if ( + well_location.origin == WellOrigin.MENISCUS + and not well_location.volumeOffset + ): + return initial_handling_height if isinstance(well_location, PickUpTipWellLocation): volume = 0.0 elif isinstance(well_location.volumeOffset, float): @@ -1449,6 +1481,9 @@ def get_meniscus_height( well_liquid = self._wells.get_well_liquid_info( labware_id=labware_id, well_name=well_name ) + # raise ValueError(f"well = {well_liquid}") + # raise ValueError(f"prbed_height not none{well_liquid.probed_height is not None}\n \ + # height.height is not None {well_liquid.probed_height.height is not None}") if ( well_liquid.probed_height is not None and well_liquid.probed_height.height is not None @@ -1491,6 +1526,7 @@ def get_well_handling_height( elif well_location.origin == WellOrigin.CENTER: handling_height = well_depth / 2.0 elif well_location.origin == WellOrigin.MENISCUS: + # baddie here handling_height = self.get_meniscus_height( labware_id=labware_id, well_name=well_name ) diff --git a/api/src/opentrons/protocol_engine/state/motion.py b/api/src/opentrons/protocol_engine/state/motion.py index 855025d01b6..15a8d6a633c 100644 --- a/api/src/opentrons/protocol_engine/state/motion.py +++ b/api/src/opentrons/protocol_engine/state/motion.py @@ -129,7 +129,7 @@ def get_movement_waypoints_to_well( extra_waypoints = self._geometry.get_extra_waypoints( location=location, to_slot=destination_slot ) - + # baddie here try: return motion_planning.get_waypoints( move_type=move_type, diff --git a/api/src/opentrons/types.py b/api/src/opentrons/types.py index 09e138513c1..7551b8a67f2 100644 --- a/api/src/opentrons/types.py +++ b/api/src/opentrons/types.py @@ -11,6 +11,7 @@ Optional, Protocol, Dict, + Literal, ) from opentrons_shared_data.robot.types import RobotType @@ -88,6 +89,20 @@ def magnitude_to(self, other: Any) -> float: ] +class MeniscusTracking: + def __init__( + self, + target: Union[ + Literal["beginning"], Literal["end"], Literal["dynamic_meniscus"] + ] = "end", + ) -> None: + self._target = target + + @property + def target(self) -> str: + return str(self._target) + + class Location: """Location(point: Point, labware: Union["Labware", "Well", str, "ModuleGeometry", LabwareLike, None, "ModuleContext"]) @@ -129,12 +144,12 @@ def __init__( "ModuleContext", ], *, - _ot_internal_is_meniscus: Optional[bool] = None, + _meniscus_tracking: Optional[MeniscusTracking] = None, ): self._point = point self._given_labware = labware self._labware = LabwareLike(labware) - self._is_meniscus = _ot_internal_is_meniscus + self._meniscus_tracking = _meniscus_tracking # todo(mm, 2021-10-01): Figure out how to get .point and .labware to show up # in the rendered docs, and then update the class docstring to use cross-references. @@ -148,8 +163,8 @@ def labware(self) -> LabwareLike: return self._labware @property - def is_meniscus(self) -> Optional[bool]: - return self._is_meniscus + def meniscus_tracking(self) -> Optional[MeniscusTracking]: + return self._meniscus_tracking def __iter__(self) -> Iterator[Union[Point, LabwareLike]]: """Iterable interface to support unpacking. Like a tuple. @@ -167,7 +182,7 @@ def __eq__(self, other: object) -> bool: isinstance(other, Location) and other._point == self._point and other._labware == self._labware - and other._is_meniscus == self._is_meniscus + and other._meniscus_tracking == self._meniscus_tracking ) def move(self, point: Point) -> "Location": @@ -193,7 +208,7 @@ def move(self, point: Point) -> "Location": return Location(point=self.point + point, labware=self._given_labware) def __repr__(self) -> str: - return f"Location(point={repr(self._point)}, labware={self._labware}, is_meniscus={self._is_meniscus if self._is_meniscus is not None else False})" + return f"Location(point={repr(self._point)}, labware={self._labware}, meniscus_tracking={self._meniscus_tracking})" # TODO(mc, 2020-10-22): use MountType implementation for Mount diff --git a/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py b/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py index 73f39006299..23d2f4bcf58 100644 --- a/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py +++ b/api/tests/opentrons/protocol_api/core/engine/test_instrument_core.py @@ -508,7 +508,6 @@ def test_aspirate_from_well( labware_id="123abc", well_name="my cool well", absolute_point=Point(1, 2, 3), - is_meniscus=None, ) ).then_return( LiquidHandlingWellLocation( @@ -607,7 +606,6 @@ def test_aspirate_from_meniscus( labware_id="123abc", well_name="my cool well", absolute_point=Point(1, 2, 3), - is_meniscus=True, ) ).then_return( LiquidHandlingWellLocation( @@ -622,7 +620,6 @@ def test_aspirate_from_meniscus( rate=5.6, flow_rate=7.8, in_place=False, - is_meniscus=True, ) decoy.verify( @@ -805,7 +802,6 @@ def test_dispense_to_well( labware_id="123abc", well_name="my cool well", absolute_point=Point(1, 2, 3), - is_meniscus=None, ) ).then_return( LiquidHandlingWellLocation( diff --git a/api/tests/opentrons/protocol_api/test_instrument_context.py b/api/tests/opentrons/protocol_api/test_instrument_context.py index 3f639aff922..6a72054a02f 100644 --- a/api/tests/opentrons/protocol_api/test_instrument_context.py +++ b/api/tests/opentrons/protocol_api/test_instrument_context.py @@ -342,7 +342,6 @@ def test_aspirate( volume=42.0, rate=1.23, flow_rate=5.67, - is_meniscus=None, ), times=1, ) @@ -380,7 +379,6 @@ def test_aspirate_well_location( volume=42.0, rate=1.23, flow_rate=5.67, - is_meniscus=None, ), times=1, ) @@ -394,9 +392,7 @@ def test_aspirate_meniscus_well_location( ) -> None: """It should aspirate to a well.""" mock_well = decoy.mock(cls=Well) - input_location = Location( - point=Point(2, 2, 2), labware=mock_well, _ot_internal_is_meniscus=True - ) + input_location = Location(point=Point(2, 2, 2), labware=mock_well) last_location = Location(point=Point(9, 9, 9), labware=None) decoy.when(mock_instrument_core.get_mount()).then_return(Mount.RIGHT) @@ -420,7 +416,6 @@ def test_aspirate_meniscus_well_location( volume=42.0, rate=1.23, flow_rate=5.67, - is_meniscus=True, ), times=1, ) @@ -457,7 +452,6 @@ def test_aspirate_from_coordinates( volume=42.0, rate=1.23, flow_rate=5.67, - is_meniscus=None, ), times=1, ) @@ -971,7 +965,6 @@ def test_dispense_with_location( rate=1.23, flow_rate=5.67, push_out=None, - is_meniscus=None, ), times=1, ) @@ -1010,7 +1003,6 @@ def test_dispense_with_well_location( rate=1.23, flow_rate=3.0, push_out=7, - is_meniscus=None, ), times=1, ) @@ -1051,7 +1043,6 @@ def test_dispense_with_well( rate=1.23, flow_rate=5.67, push_out=None, - is_meniscus=None, ), times=1, ) @@ -1306,7 +1297,6 @@ def test_dispense_0_volume_means_dispense_everything( rate=1.23, flow_rate=5.67, push_out=None, - is_meniscus=None, ), times=1, ) @@ -1336,7 +1326,6 @@ def test_dispense_0_volume_means_dispense_nothing( rate=1.23, flow_rate=5.67, push_out=None, - is_meniscus=None, ), times=1, ) @@ -1376,7 +1365,6 @@ def test_aspirate_0_volume_means_aspirate_everything( volume=200, rate=1.23, flow_rate=5.67, - is_meniscus=None, ), times=1, ) @@ -1416,7 +1404,6 @@ def test_aspirate_0_volume_means_aspirate_nothing( volume=0, rate=1.23, flow_rate=5.67, - is_meniscus=None, ), times=1, ) diff --git a/api/tests/opentrons/protocol_api/test_well.py b/api/tests/opentrons/protocol_api/test_well.py index c0ef530289b..fd9555fb3b9 100644 --- a/api/tests/opentrons/protocol_api/test_well.py +++ b/api/tests/opentrons/protocol_api/test_well.py @@ -108,7 +108,6 @@ def test_well_meniscus(decoy: Decoy, mock_well_core: WellCore, subject: Well) -> assert isinstance(result, Location) assert result.point == Point(0, 0, 4.2) - assert result.is_meniscus is True assert result.labware.as_well() is subject diff --git a/api/tests/opentrons/protocol_engine/state/test_geometry_view.py b/api/tests/opentrons/protocol_engine/state/test_geometry_view.py index bf82c17c6bc..805c255fa60 100644 --- a/api/tests/opentrons/protocol_engine/state/test_geometry_view.py +++ b/api/tests/opentrons/protocol_engine/state/test_geometry_view.py @@ -2006,7 +2006,6 @@ def test_get_relative_liquid_handling_well_location( labware_id="labware-id", well_name="B2", absolute_point=Point(x=0, y=0, z=-2), - is_meniscus=True, ) assert result == LiquidHandlingWellLocation( diff --git a/api/tests/opentrons/protocol_engine/state/test_motion_view.py b/api/tests/opentrons/protocol_engine/state/test_motion_view.py index 3e9d60da79a..18e765cb438 100644 --- a/api/tests/opentrons/protocol_engine/state/test_motion_view.py +++ b/api/tests/opentrons/protocol_engine/state/test_motion_view.py @@ -310,7 +310,7 @@ def test_get_movement_waypoints_to_well_for_y_center( decoy.when( geometry_view.get_well_position( - "labware-id", "well-name", WellLocation(), None, "pipette-id" + "labware-id", "well-name", WellLocation(), None, "pipette-id", False ) ).then_return(Point(x=4, y=5, z=6)) @@ -394,7 +394,7 @@ def test_get_movement_waypoints_to_well_for_xy_center( decoy.when( geometry_view.get_well_position( - "labware-id", "well-name", WellLocation(), None, "pipette-id" + "labware-id", "well-name", WellLocation(), None, "pipette-id", False ) ).then_return(Point(x=4, y=5, z=6)) diff --git a/api/tests/opentrons/test_types.py b/api/tests/opentrons/test_types.py index 77249fa0492..6cd93dce125 100644 --- a/api/tests/opentrons/test_types.py +++ b/api/tests/opentrons/test_types.py @@ -29,7 +29,7 @@ def test_location_repr_labware(min_lw: Labware) -> None: loc = Location(point=Point(x=1.1, y=2.1, z=3.5), labware=min_lw) assert ( f"{loc}" - == "Location(point=Point(x=1.1, y=2.1, z=3.5), labware=minimal labware on deck, is_meniscus=False)" + == "Location(point=Point(x=1.1, y=2.1, z=3.5), labware=minimal labware on deck)" ) @@ -38,17 +38,14 @@ def test_location_repr_well(min_lw: Labware) -> None: loc = Location(point=Point(x=1, y=2, z=3), labware=min_lw.wells()[0]) assert ( f"{loc}" - == "Location(point=Point(x=1, y=2, z=3), labware=A1 of minimal labware on deck, is_meniscus=False)" + == "Location(point=Point(x=1, y=2, z=3), labware=A1 of minimal labware on deck)" ) def test_location_repr_slot() -> None: """It should represent labware as a slot""" loc = Location(point=Point(x=-1, y=2, z=3), labware="1") - assert ( - f"{loc}" - == "Location(point=Point(x=-1, y=2, z=3), labware=1, is_meniscus=False)" - ) + assert f"{loc}" == "Location(point=Point(x=-1, y=2, z=3), labware=1)" @pytest.mark.parametrize( diff --git a/hardware-testing/hardware_testing/liquid_relative_pipetting/__init__.py b/hardware-testing/hardware_testing/liquid_relative_pipetting/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hardware-testing/hardware_testing/liquid_relative_pipetting/__main__.py b/hardware-testing/hardware_testing/liquid_relative_pipetting/__main__.py new file mode 100644 index 00000000000..084f99eadee --- /dev/null +++ b/hardware-testing/hardware_testing/liquid_relative_pipetting/__main__.py @@ -0,0 +1,393 @@ +"""Liquid sense testing.""" +import argparse +from dataclasses import dataclass +from json import load as json_load +from pathlib import Path +import subprocess +from time import sleep +import os +from typing import List, Any, Optional, Dict +import traceback +import sys + +from hardware_testing.opentrons_api import helpers_ot3 +from hardware_testing.gravimetric import helpers, workarounds +from hardware_testing.data.csv_report import CSVReport +from hardware_testing.gravimetric.measurement.record import GravimetricRecorder +from hardware_testing.gravimetric.measurement.scale import Scale +from hardware_testing.drivers import ( + asair_sensor, + mitutoyo_digimatic_indicator, + list_ports_and_select, +) +from hardware_testing.data import ( + ui, + create_run_id_and_start_time, + get_git_description, + get_testing_data_directory, +) +from opentrons_hardware.hardware_control.motion_planning import move_utils + +from opentrons.protocol_api import InstrumentContext, ProtocolContext +from opentrons.protocol_engine.types import LabwareOffset + +from hardware_testing.liquid_sense import execute +from .report import build_ls_report, store_config, store_serial_numbers +from .post_process import process_csv_directory, process_google_sheet + +from hardware_testing.protocols.liquid_sense_lpc import ( + liquid_sense_ot3_p50_single_vial, + liquid_sense_ot3_p50_multi_vial, + liquid_sense_ot3_p1000_96_vial, + liquid_sense_ot3_p1000_single_vial, + liquid_sense_ot3_p1000_multi_vial, +) + +try: + from abr_testing.automation import google_sheets_tool, google_drive_tool +except ImportError: + ui.print_error( + "Unable to import abr repo if this isn't a simulation push the abr_testing package" + ) + pass + +CREDENTIALS_PATH = "/var/lib/jupyter/notebooks/abr.json" + +API_LEVEL = "2.18" + +LABWARE_OFFSETS: List[LabwareOffset] = [] + +# NOTE: (sigler) plunger on 1ch/8ch won't move faster than ~20mm second +# which means it take ~3.5 seconds to reach full plunger travel. +# Therefore, there is no need for any probing in this test script to +# take longer than 3.5 seconds. +# NOTE: (sigler) configuring the starting height of each probing sequence +# not based on millimeters but instead on the number seconds it takes +# before the tip contacts the meniscus will help make sure that adjusting +# the Z-speed will inadvertently affect the pressure's rate-of-change +# (which could happen if the meniscus seal is formed at wildly different +# positions along the plunger travel). +MAX_PROBE_SECONDS = 3.5 + + +LIQUID_SENSE_CFG: Dict[int, Dict[int, Any]] = { + 50: { + 1: liquid_sense_ot3_p50_single_vial, + 8: liquid_sense_ot3_p50_multi_vial, + }, + 1000: { + 1: liquid_sense_ot3_p1000_single_vial, + 8: liquid_sense_ot3_p1000_multi_vial, + 96: liquid_sense_ot3_p1000_96_vial, + }, +} + +PIPETTE_MODEL_NAME = { + 50: { + 1: "p50_single_flex", + 8: "p50_multi_flex", + }, + 1000: { + 1: "p1000_single_flex", + 8: "p1000_multi_flex", + 96: "p1000_96_flex", + }, +} + + +@dataclass +class RunArgs: + """Common resources across multiple runs.""" + + tip_volumes: List[int] + run_id: str + pipette: InstrumentContext + pipette_tag: str + git_description: str + recorder: GravimetricRecorder + pipette_volume: int + pipette_channels: int + name: str + environment_sensor: asair_sensor.AsairSensorBase + trials: int + z_speed: float + return_tip: bool + ctx: ProtocolContext + protocol_cfg: Any + test_report: CSVReport + aspirate: bool + dial_indicator: Optional[mitutoyo_digimatic_indicator.Mitutoyo_Digimatic_Indicator] + plunger_speed: float + trials_before_jog: int + no_multi_pass: int + test_well: str + wet: bool + dial_front_channel: bool + + @classmethod + def _get_protocol_context(cls, args: argparse.Namespace) -> ProtocolContext: + if not args.simulate and not args.skip_labware_offsets: + # getting labware offsets must be done before creating the protocol context + # because it requires the robot-server to be running + ui.print_title("SETUP") + ui.print_info( + "Starting opentrons-robot-server, so we can http GET labware offsets" + ) + LABWARE_OFFSETS.extend(workarounds.http_get_all_labware_offsets()) + ui.print_info(f"found {len(LABWARE_OFFSETS)} offsets:") + for offset in LABWARE_OFFSETS: + ui.print_info(f"\t{offset.createdAt}:") + ui.print_info(f"\t\t{offset.definitionUri}") + ui.print_info(f"\t\t{offset.vector}") + # gather the custom labware (for simulation) + custom_defs = {} + if args.simulate: + labware_dir = Path(__file__).parent.parent / "labware" + custom_def_uris = [ + "radwag_pipette_calibration_vial", + "dial_indicator", + ] + for def_uri in custom_def_uris: + with open(labware_dir / def_uri / "1.json", "r") as f: + custom_def = json_load(f) + custom_defs[def_uri] = custom_def + _ctx = helpers.get_api_context( + API_LEVEL, # type: ignore[attr-defined] + is_simulating=args.simulate, + pipette_left=PIPETTE_MODEL_NAME[args.pipette][args.channels], + extra_labware=custom_defs, + ) + for offset in LABWARE_OFFSETS: + engine = _ctx._core._engine_client._transport._engine # type: ignore[attr-defined] + engine.state_view._labware_store._add_labware_offset(offset) + return _ctx + + @classmethod + def build_run_args(cls, args: argparse.Namespace) -> "RunArgs": + """Build.""" + _ctx = RunArgs._get_protocol_context(args) + run_id, start_time = create_run_id_and_start_time() + environment_sensor = asair_sensor.BuildAsairSensor(simulate=True) + git_description = get_git_description() + protocol_cfg = LIQUID_SENSE_CFG[args.pipette][args.channels] + name = protocol_cfg.metadata["protocolName"] # type: ignore[union-attr] + ui.print_header("LOAD PIPETTE") + pipette = _ctx.load_instrument( + f"flex_{args.channels}channel_{args.pipette}", args.mount + ) + loaded_labwares = _ctx.loaded_labwares + if 12 in loaded_labwares.keys(): + trash = loaded_labwares[12] + else: + trash = _ctx.load_labware("opentrons_1_trash_3200ml_fixed", "A3") + pipette.trash_container = trash + pipette_tag = helpers._get_tag_from_pipette(pipette, False, False) + + trials = args.trials + + if args.tip == 0: + if args.pipette == 1000: + tip_volumes: List[int] = [50, 200, 1000] + else: + tip_volumes = [50] + else: + tip_volumes = [args.tip] + + scale = Scale.build(simulate=True) + recorder: GravimetricRecorder = execute._load_scale( + name, + scale, + run_id, + pipette_tag, + start_time, + simulating=True, + ) + dial: Optional[mitutoyo_digimatic_indicator.Mitutoyo_Digimatic_Indicator] = None + if not _ctx.is_simulating(): + dial_port = list_ports_and_select("Dial Indicator") + dial = mitutoyo_digimatic_indicator.Mitutoyo_Digimatic_Indicator( + port=dial_port + ) + dial.connect() + ui.print_info(f"pipette_tag {pipette_tag}") + report = build_ls_report(name, run_id, trials, tip_volumes) + report.set_tag(name) + # go ahead and store the meta data now + store_serial_numbers( + report, + pipette_tag, + scale.read_serial_number(), + environment_sensor.get_serial(), + git_description, + ) + + store_config( + report, + name, + args.pipette, + tip_volumes, + trials, + "aspirate" if args.aspirate else "dispense", + args.liquid, + protocol_cfg.LABWARE_ON_SCALE, # type: ignore[union-attr] + args.z_speed, + ) + return RunArgs( + tip_volumes=tip_volumes, + run_id=run_id, + pipette=pipette, + pipette_tag=pipette_tag, + git_description=git_description, + recorder=recorder, + pipette_volume=args.pipette, + pipette_channels=args.channels, + name=name, + environment_sensor=environment_sensor, + trials=trials, + z_speed=args.z_speed, + return_tip=args.return_tip, + ctx=_ctx, + protocol_cfg=protocol_cfg, + test_report=report, + aspirate=args.aspirate, + dial_indicator=dial, + plunger_speed=args.plunger_speed, + trials_before_jog=args.trials_before_jog, + no_multi_pass=args.no_multi_pass, + test_well=args.test_well, + wet=args.wet, + dial_front_channel=args.dial_front_channel, + ) + + +if __name__ == "__main__": + move_utils.MINIMUM_DISPLACEMENT = 0.01 + + parser = argparse.ArgumentParser("Pipette Testing") + parser.add_argument("--simulate", action="store_true") + parser.add_argument("--pipette", type=int, choices=[50, 1000], required=True) + parser.add_argument("--mount", type=str, choices=["left", "right"], default="left") + parser.add_argument("--channels", type=int, choices=[1, 8, 96], default=1) + parser.add_argument("--tip", type=int, choices=[0, 50, 200, 1000], default=0) + parser.add_argument("--return-tip", action="store_true") + parser.add_argument("--trials", type=int, default=7) + parser.add_argument("--trials-before-jog", type=int, default=7) + parser.add_argument("--z-speed", type=float, default=5) + parser.add_argument("--aspirate", action="store_true") + parser.add_argument("--plunger-speed", type=float, default=15) + parser.add_argument("--no-multi-pass", action="store_true") + parser.add_argument("--wet", action="store_true") + parser.add_argument("--starting-tip", type=str, default="A1") + parser.add_argument("--test-well", type=str, default="A1") + parser.add_argument("--p-solo-time", type=float, default=0) + parser.add_argument("--google-sheet-name", type=str, default="LLD-Shared-Data") + parser.add_argument( + "--gd-parent-folder", type=str, default="1b2V85fDPA0tNqjEhyHOGCWRZYgn8KsGf" + ) + parser.add_argument("--liquid", type=str, default="unknown") + parser.add_argument("--skip-labware-offsets", action="store_true") + parser.add_argument("--dial-front-channel", action="store_true") + + args = parser.parse_args() + + run_args = RunArgs.build_run_args(args) + exit_error = 0 + serial_logger: Optional[subprocess.Popen] = None + data_dir = get_testing_data_directory() + data_file = f"/{data_dir}/{run_args.name}/{run_args.run_id}/serial.log" + try: + if not run_args.ctx.is_simulating(): + ui.print_info(f"logging can data to {data_file}") + serial_logger = subprocess.Popen( + [f"python3 -m opentrons_hardware.scripts.can_mon > {data_file}"], + shell=True, + ) + sleep(1) + # Connect to Google Sheet + ui.print_info(f"robot has credentials: {os.path.exists(CREDENTIALS_PATH)}") + google_sheet: Optional[ + google_sheets_tool.google_sheet + ] = google_sheets_tool.google_sheet( + CREDENTIALS_PATH, args.google_sheet_name, 0 + ) + sheet_id = google_sheet.create_worksheet(run_args.run_id) # type: ignore[union-attr] + try: + sys.path.insert(0, "/var/lib/jupyter/notebooks/") + + google_drive: Optional[ + google_drive_tool.google_drive + ] = google_drive_tool.google_drive( + CREDENTIALS_PATH, + args.gd_parent_folder, + "rhyann.clarke@opentrons.com", + ) + except ImportError: + raise ImportError( + "Run on robot. Make sure google_drive_tool.py is in jupyter notebook." + ) + else: + google_sheet = None + sheet_id = None + google_drive = None + hw = run_args.ctx._core.get_hardware() + ui.print_info("homing...") + run_args.ctx.home() + for tip in run_args.tip_volumes: + execute.run(tip, run_args, google_sheet, sheet_id, args.starting_tip) + except Exception as e: + ui.print_error(f"got error {e}") + ui.print_error(traceback.format_exc()) + exit_error = 1 + finally: + if run_args.recorder is not None: + ui.print_info("ending recording") + if not run_args.ctx.is_simulating() and serial_logger: + ui.print_info("killing serial log") + serial_logger.terminate() + if run_args.dial_indicator is not None: + run_args.dial_indicator.disconnect() + run_args.test_report.save_to_disk() + run_args.test_report.print_results() + ui.print_info("done\n\n") + if not run_args.ctx.is_simulating(): + new_folder_name = ( + f"MS{args.z_speed}_PS{args.plunger_speed}_{run_args.run_id}" + ) + try: + process_csv_directory( + f"{data_dir}/{run_args.name}/{run_args.run_id}", + run_args.tip_volumes, + run_args.trials, + google_sheet, + google_drive, + run_args.run_id, + sheet_id, + new_folder_name, + make_graph=False, + ) + # Log to Google Sheet + if args.aspirate is False: + plunger_direction = "dispense" + else: + plunger_direction = "aspirate" + test_info = [ + run_args.run_id, + run_args.pipette_tag, + args.pipette, + args.tip, + args.z_speed, + args.plunger_speed, + "threshold", + plunger_direction, + ] + process_google_sheet(google_sheet, run_args, test_info, sheet_id) + except Exception as e: + ui.print_error("error making graphs or logging data on google sheet") + ui.print_error(f"got error {e}") + ui.print_error(traceback.format_exc()) + exit_error = 2 + + run_args.ctx.cleanup() + if not args.simulate: + helpers_ot3.restart_server_ot3() + os._exit(exit_error) diff --git a/shared-data/pipette/definitions/2/general/single_channel/p1000/3_6.json b/shared-data/pipette/definitions/2/general/single_channel/p1000/3_6.json index f0ebd0e00a3..7c6db0dfa1c 100644 --- a/shared-data/pipette/definitions/2/general/single_channel/p1000/3_6.json +++ b/shared-data/pipette/definitions/2/general/single_channel/p1000/3_6.json @@ -30,7 +30,7 @@ }, "v1": { "default": 10.5, - "opentrons/opentrons_flex_96_tiprack_1000ul/1": 9.65, + "opentrons/opentrons_flex_96_tiprack_1000ul/1": 10.5, "opentrons/opentrons_flex_96_tiprack_200ul/1": 9.76, "opentrons/opentrons_flex_96_tiprack_50ul/1": 10.09, "opentrons/opentrons_flex_96_filtertiprack_1000ul/1": 9.65, diff --git a/shared-data/pipette/definitions/2/geometry/single_channel/p1000/3_6.json b/shared-data/pipette/definitions/2/geometry/single_channel/p1000/3_6.json index 8eb3dfb9e97..9c0d1b6b8e1 100644 --- a/shared-data/pipette/definitions/2/geometry/single_channel/p1000/3_6.json +++ b/shared-data/pipette/definitions/2/geometry/single_channel/p1000/3_6.json @@ -13,15 +13,15 @@ }, "lldSettings": { "t50": { - "minHeight": 1.0, + "minHeight": 0.0, "minVolume": 0 }, "t200": { - "minHeight": 1.0, + "minHeight": 0.0, "minVolume": 0 }, "t1000": { - "minHeight": 1.5, + "minHeight": 0.0, "minVolume": 0 } }