Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Allow recovering from errors that happen during the preparation part of an aspirate command #16896

Draft
wants to merge 3 commits into
base: edge
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 72 additions & 40 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from typing_extensions import Literal

from .pipetting_common import (
ErrorLocationInfo,
OverpressureError,
PipetteIdMixin,
AspirateVolumeMixin,
FlowRateMixin,
BaseLiquidHandlingResult,
aspirate_in_place,
prepare_for_aspirate,
)
from .movement_common import (
LiquidHandlingWellLocationMixin,
Expand Down Expand Up @@ -94,6 +96,17 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id = params.pipetteId
labware_id = params.labwareId
well_name = params.wellName
well_location = params.wellLocation

state_update = StateUpdate()

final_location = self._state_view.geometry.get_well_position(
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
operation_volume=-params.volume,
pipette_id=pipette_id,
)

ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate(
pipette_id=pipette_id
Expand All @@ -102,14 +115,34 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
current_well = None

if not ready_to_aspirate:
await self._movement.move_to_well(
move_result = await move_to_well(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(origin=WellOrigin.TOP),
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return DefinedErrorData(move_result.public, state_update=state_update)

# TODO: Figure out what to do for state_update_if_false_positive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo passthrough and union with previous successes


await self._pipetting.prepare_for_aspirate(pipette_id=pipette_id)
prepare_result = await prepare_for_aspirate(
pipette_id=pipette_id,
pipetting=self._pipetting,
model_utils=self._model_utils,
# Note that the retryLocation is the final location, inside the liquid,
# because that's where we'd want the client to try re-aspirating if this
# command fails and the run enters error recovery.
location_if_error={"retryLocation": final_location},
)
state_update.append(prepare_result.state_update)
if isinstance(prepare_result, DefinedErrorData):
return DefinedErrorData(
public=prepare_result.public, state_update=state_update
)

# set our current deck location to the well now that we've made
# an intermediate move for the "prepare for aspirate" step
Expand All @@ -125,12 +158,15 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=params.wellLocation,
well_location=well_location,
current_well=current_well,
operation_volume=-params.volume,
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return move_result
return DefinedErrorData(
public=move_result.public, state_update=state_update
)

aspirate_result = await aspirate_in_place(
pipette_id=pipette_id,
Expand All @@ -147,46 +183,42 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipetting=self._pipetting,
model_utils=self._model_utils,
)
state_update.append(aspirate_result.state_update)
if isinstance(aspirate_result, DefinedErrorData):
return DefinedErrorData(
public=aspirate_result.public,
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update_if_false_positive=StateUpdate.reduce(
move_result.state_update,
aspirate_result.state_update_if_false_positive,
state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
else:
return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
),
return DefinedErrorData(
public=aspirate_result.public, state_update=state_update
)
# TODO: state_update_if_false_positive?

state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
)

return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=state_update,
)


class Aspirate(
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/execution/pipetting.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ def get_is_ready_to_aspirate(self, pipette_id: str) -> bool:
)

async def prepare_for_aspirate(self, pipette_id: str) -> None:
"""Prepare for pipette aspiration."""
"""Prepare for pipette aspiration.

Raises:
PipetteOverpressureError, propagated as-is from the hardware controller.
"""
hw_mount = self._state_view.pipettes.get_mount(pipette_id).to_hw_mount()
await self._hardware_api.prepare_for_aspirate(mount=hw_mount)

Expand Down
13 changes: 13 additions & 0 deletions api/src/opentrons/protocol_engine/state/update_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,19 @@ class StateUpdate:

liquid_class_loaded: LiquidClassLoadedUpdate | NoChangeType = NO_CHANGE

def append(self, other: Self) -> Self:
"""Apply another `StateUpdate` "on top of" this one.

This object is mutated in-place, taking values from `other`.
If an attribute in `other` is `NO_CHANGE`, the value in this object is kept.
"""
fields = dataclasses.fields(other)
for field in fields:
other_value = other.__dict__[field.name]
if other_value != NO_CHANGE:
self.__dict__[field.name] = other_value
return self

Comment on lines +302 to +314
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is append() a good name for this? Is there a naming scheme that harmonizes better with reduce()?

@classmethod
def reduce(cls: typing.Type[Self], *args: Self) -> Self:
"""Fuse multiple state updates into a single one.
Comment on lines 315 to 317
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Can probably implement reduce() in terms of append().

Expand Down
11 changes: 0 additions & 11 deletions api/tests/opentrons/protocol_engine/commands/test_aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def subject(
async def test_aspirate_implementation_no_prep(
decoy: Decoy,
state_view: StateView,
hardware_api: HardwareControlAPI,
movement: MovementHandler,
pipetting: PipettingHandler,
subject: AspirateImplementation,
Expand Down Expand Up @@ -151,7 +150,6 @@ async def test_aspirate_implementation_no_prep(
async def test_aspirate_implementation_with_prep(
decoy: Decoy,
state_view: StateView,
hardware_api: HardwareControlAPI,
movement: MovementHandler,
pipetting: PipettingHandler,
mock_command_note_adder: CommandNoteAdder,
Expand Down Expand Up @@ -416,15 +414,6 @@ async def test_overpressure_error(
pipette_id=pipette_id
),
),
state_update_if_false_positive=update_types.StateUpdate(
pipette_location=update_types.PipetteLocationUpdate(
pipette_id=pipette_id,
new_location=update_types.Well(
labware_id=labware_id, well_name=well_name
),
new_deck_point=DeckPoint(x=position.x, y=position.y, z=position.z),
),
),
)


Expand Down
Loading