Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): disable pressure sensor capabilities for PEEK pipettes #16921

Open
wants to merge 10 commits into
base: edge
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ def restore_system_constraints(self) -> AsyncIterator[None]:
def grab_pressure(self, channels: int, mount: OT3Mount) -> AsyncIterator[None]:
...

def set_pressure_sensor_available(
self, pipette_axis: Axis, available: bool
) -> None:
...

def get_pressure_sensor_available(self, pipette_axis: Axis) -> bool:
...

def update_constraints_for_gantry_load(self, gantry_load: GantryLoad) -> None:
...

Expand Down
26 changes: 24 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
PipetteLiquidNotFoundError,
CommunicationError,
PythonException,
UnsupportedHardwareCommand,
)

from .subsystem_manager import SubsystemManager
Expand Down Expand Up @@ -362,6 +363,7 @@ def __init__(
self._configuration.motion_settings, GantryLoad.LOW_THROUGHPUT
)
)
self._pressure_sensor_available: Dict[NodeId, bool] = {}

@asynccontextmanager
async def restore_system_constraints(self) -> AsyncIterator[None]:
Expand All @@ -380,6 +382,16 @@ async def grab_pressure(
async with grab_pressure(channels, tool, self._messenger):
yield

def set_pressure_sensor_available(
self, pipette_axis: Axis, available: bool
) -> None:
pip_node = axis_to_node(pipette_axis)
self._pressure_sensor_available[pip_node] = available

def get_pressure_sensor_available(self, pipette_axis: Axis) -> bool:
pip_node = axis_to_node(pipette_axis)
return self._pressure_sensor_available[pip_node]

def update_constraints_for_calibration_with_gantry_load(
self,
gantry_load: GantryLoad,
Expand Down Expand Up @@ -762,7 +774,8 @@ async def _runner_coroutine(
for runner, is_gear_move in maybe_runners
if runner
]
async with self._monitor_overpressure(pipettes_moving):
checked_moving_pipettes = self._pipettes_to_monitor_pressure(pipettes_moving)
async with self._monitor_overpressure(checked_moving_pipettes):
all_positions = await asyncio.gather(*coros)

for positions, handle_gear_move in all_positions:
Expand Down Expand Up @@ -871,7 +884,8 @@ async def home(
moving_pipettes = [
axis_to_node(ax) for ax in checked_axes if ax in Axis.pipette_axes()
]
async with self._monitor_overpressure(moving_pipettes):
checked_moving_pipettes = self._pipettes_to_monitor_pressure(moving_pipettes)
async with self._monitor_overpressure(checked_moving_pipettes):
positions = await asyncio.gather(*coros)
# TODO(CM): default gear motor homing routine to have some acceleration
if Axis.Q in checked_axes:
Expand All @@ -886,6 +900,9 @@ async def home(
self._handle_motor_status_response(position)
return axis_convert(self._position, 0.0)

def _pipettes_to_monitor_pressure(self, pipettes: List[NodeId]) -> List[NodeId]:
return [pip for pip in pipettes if self._pressure_sensor_available[pip]]

def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup:
new_group: MoveGroup = []
for step in move_group:
Expand Down Expand Up @@ -1473,6 +1490,11 @@ async def liquid_probe(
) -> float:
head_node = axis_to_node(Axis.by_mount(mount))
tool = sensor_node_for_pipette(OT3Mount(mount.value))
if tool not in self._pipettes_to_monitor_pressure([tool]):
raise UnsupportedHardwareCommand(
"Liquid Presence Detection not available on this pipette."
)

positions = await liquid_probe(
messenger=self._messenger,
tool=tool,
Expand Down
20 changes: 20 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from opentrons_shared_data.pipette import (
pipette_load_name_conversions as pipette_load_name,
pipette_definition,
)
from opentrons_shared_data.robot.types import RobotType

Expand Down Expand Up @@ -634,10 +635,29 @@ async def cache_pipette(
self._feature_flags.use_old_aspiration_functions,
)
self._pipette_handler.hardware_instruments[mount] = p
if config is not None:
self._set_pressure_sensor_available(mount, instrument_config=config)
# TODO (lc 12-5-2022) Properly support backwards compatibility
# when applicable
return skipped

def get_pressure_sensor_available(self, mount: OT3Mount) -> bool:
pip_axis = Axis.of_main_tool_actuator(mount)
return self._backend.get_pressure_sensor_available(pip_axis)

def _set_pressure_sensor_available(
self,
mount: OT3Mount,
instrument_config: pipette_definition.PipetteConfigurations,
) -> None:
pressure_sensor_available = (
"pressure" in instrument_config.available_sensors.sensors
)
pip_axis = Axis.of_main_tool_actuator(mount)
self._backend.set_pressure_sensor_available(
pipette_axis=pip_axis, available=pressure_sensor_available
)

async def cache_gripper(self, instrument_data: AttachedGripper) -> bool:
"""Set up gripper based on scanned information."""
grip_cal = load_gripper_calibration_offset(instrument_data.get("id"))
Expand Down
5 changes: 5 additions & 0 deletions api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,11 @@ def retract(self) -> None:
self._engine_client.execute_command(cmd.HomeParams(axes=[z_axis]))

def detect_liquid_presence(self, well_core: WellCore, loc: Location) -> bool:
if not self._sync_hardware_api.pressure_sensor_available(
ryanthecoder marked this conversation as resolved.
Show resolved Hide resolved
mount=self.get_mount()
):
raise ValueError("Liquid Presence Detection not available.")

labware_id = well_core.labware_id
well_name = well_core.get_name()
well_location = WellLocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ async def test_home_execute(
**config
) as mock_runner:
present_axes = set(ax for ax in axes if controller.axis_is_present(ax))
controller.set_pressure_sensor_available(Axis.P_L, True)
controller.set_pressure_sensor_available(Axis.P_R, True)

# nothing has been homed
assert not controller._motor_status
Expand Down Expand Up @@ -485,6 +487,8 @@ async def test_home_only_present_devices(
homed_position = {}

controller._position = starting_position
controller.set_pressure_sensor_available(Axis.P_L, True)
controller.set_pressure_sensor_available(Axis.P_R, True)

mock_move_group_run.side_effect = move_group_run_side_effect_home(controller, axes)

Expand Down Expand Up @@ -1413,3 +1417,34 @@ async def test_controller_move(

assert position == expected_pos
assert gear_position == gear_position


@pytest.mark.parametrize(
argnames=["axes", "pipette_has_sensor"],
argvalues=[[[Axis.P_L, Axis.P_R], True], [[Axis.P_L, Axis.P_R], False]],
)
async def test_pressure_disable(
controller: OT3Controller,
axes: List[Axis],
mock_present_devices: None,
mock_check_overpressure: None,
pipette_has_sensor: bool,
) -> None:
config = {"run.side_effect": move_group_run_side_effect_home(controller, axes)}
with mock.patch( # type: ignore [call-overload]
"opentrons.hardware_control.backends.ot3controller.MoveGroupRunner",
spec=MoveGroupRunner,
**config
):
with mock.patch.object(controller, "_monitor_overpressure") as monitor:
controller.set_pressure_sensor_available(Axis.P_L, pipette_has_sensor)
controller.set_pressure_sensor_available(Axis.P_R, True)

await controller.home(axes, GantryLoad.LOW_THROUGHPUT)

if pipette_has_sensor:
monitor.assert_called_once_with(
[NodeId.pipette_left, NodeId.pipette_right]
)
else:
monitor.assert_called_once_with([NodeId.pipette_right])
Loading