Skip to content

Commit

Permalink
fix(api): cancellation bug on legacy core api (#13767)
Browse files Browse the repository at this point in the history
* added taskification of hardware control movement commands to execution manager wrapper
  • Loading branch information
sanni-t authored Oct 11, 2023
1 parent 8b0eb3f commit 227a742
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 9 deletions.
11 changes: 10 additions & 1 deletion api/src/opentrons/hardware_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,14 +509,23 @@ async def stop(self, home_after: bool = True) -> None:
robot. After this call, no further recovery is necessary.
"""
await self._backend.halt() # calls smoothie_driver.kill()
await self._execution_manager.cancel()
await self.cancel_execution_and_running_tasks()
self._log.info("Recovering from halt")
await self.reset()
await self.cache_instruments()

if home_after:
await self.home()

def is_movement_execution_taskified(self) -> bool:
return self.taskify_movement_execution

def should_taskify_movement_execution(self, taskify: bool) -> None:
self.taskify_movement_execution = taskify

async def cancel_execution_and_running_tasks(self) -> None:
await self._execution_manager.cancel()

async def reset(self) -> None:
"""Reset the stored state of the system."""
self._pause_manager.reset()
Expand Down
22 changes: 21 additions & 1 deletion api/src/opentrons/hardware_control/execution_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ class ExecutionManagerProvider:
def __init__(self, simulator: bool) -> None:
self._em_simulate = simulator
self._execution_manager = ExecutionManager()
self._taskify_movement_execution: bool = False

@property
def taskify_movement_execution(self) -> bool:
return self._taskify_movement_execution

@taskify_movement_execution.setter
def taskify_movement_execution(self, cancellable: bool) -> None:
self._taskify_movement_execution = cancellable

@property
def execution_manager(self) -> ExecutionManager:
Expand Down Expand Up @@ -125,7 +134,18 @@ async def replace(
) -> DecoratedReturn:
if not inst._em_simulate:
await inst.execution_manager.wait_for_is_running()
return await decorated(inst, *args, **kwargs)
if inst.taskify_movement_execution:
# Running these functions inside cancellable tasks makes it easier and
# faster to cancel protocol runs. In the higher, runner & engine layers,
# a cancellation request triggers cancellation of the running move task
# and hence, prevents any further communication with hardware.
decorated_task: "asyncio.Task[DecoratedReturn]" = asyncio.create_task(
decorated(inst, *args, **kwargs)
)
inst.execution_manager.register_cancellable_task(decorated_task)
return await decorated_task
else:
return await decorated(inst, *args, **kwargs)

return cast(DecoratedMethodReturningValue, replace)

Expand Down
11 changes: 8 additions & 3 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,12 +732,17 @@ async def _chained_calls() -> None:

asyncio.run_coroutine_threadsafe(_chained_calls(), self._loop)

def is_movement_execution_taskified(self) -> bool:
return self.taskify_movement_execution

def should_taskify_movement_execution(self, taskify: bool) -> None:
self.taskify_movement_execution = taskify

async def _stop_motors(self) -> None:
"""Immediately stop motors."""
await self._backend.halt()

async def _cancel_execution_and_running_tasks(self) -> None:
"""Cancel execution manager and all running (hardware module) tasks."""
async def cancel_execution_and_running_tasks(self) -> None:
await self._execution_manager.cancel()

async def halt(self, disengage_before_stopping: bool = False) -> None:
Expand All @@ -751,7 +756,7 @@ async def halt(self, disengage_before_stopping: bool = False) -> None:
async def stop(self, home_after: bool = True) -> None:
"""Stop motion as soon as possible, reset, and optionally home."""
await self._stop_motors()
await self._cancel_execution_and_running_tasks()
await self.cancel_execution_and_running_tasks()
self._log.info("Resetting OT3API")
await self.reset()
if home_after:
Expand Down
12 changes: 12 additions & 0 deletions api/src/opentrons/hardware_control/protocols/motion_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,15 @@ async def retract(self, mount: Mount, margin: float = 10) -> None:
async def retract_axis(self, axis: Axis) -> None:
"""Retract the specified axis to its home position."""
...

def is_movement_execution_taskified(self) -> bool:
"""Get whether move functions are being executed inside cancellable tasks."""
...

def should_taskify_movement_execution(self, taskify: bool) -> None:
"""Specify whether move functions should be executed inside cancellable tasks."""
...

async def cancel_execution_and_running_tasks(self) -> None:
"""Cancel all tasks and set execution manager state to Cancelled."""
...
11 changes: 9 additions & 2 deletions api/src/opentrons/protocol_engine/protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ async def stop(self) -> None:
action = self._state_store.commands.validate_action_allowed(StopAction())
self._action_dispatcher.dispatch(action)
self._queue_worker.cancel()
if self._hardware_api.is_movement_execution_taskified():
# We 'taskify' hardware controller movement functions when running protocols
# that are not backed by the engine. Such runs cannot be stopped by cancelling
# the queue worker and hence need to be stopped via the execution manager.
# `cancel_execution_and_running_tasks()` sets the execution manager in a CANCELLED state
# and cancels the running tasks, which raises an error and gets us out of the
# run function execution, just like `_queue_worker.cancel()` does for
# engine-backed runs.
await self._hardware_api.cancel_execution_and_running_tasks()

async def wait_until_complete(self) -> None:
"""Wait until there are no more commands to execute.
Expand Down Expand Up @@ -380,7 +389,6 @@ async def finish(
# order will be backwards because the stack is first-in-last-out.
exit_stack = AsyncExitStack()
exit_stack.push_async_callback(self._plugin_starter.stop) # Last step.

exit_stack.push_async_callback(
# Cleanup after hardware halt and reset the hardware controller
self._hardware_stopper.do_stop_and_recover,
Expand All @@ -400,7 +408,6 @@ async def finish(
disengage_before_stopping=disengage_before_stopping,
)
exit_stack.push_async_callback(self._queue_worker.join) # First step.

try:
# If any teardown steps failed, this will raise something.
await exit_stack.aclose()
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_runner/protocol_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,14 @@ async def load(

if protocol.api_level < LEGACY_PYTHON_API_VERSION_CUTOFF:
equipment_broker = Broker[LegacyLoadInfo]()

self._protocol_engine.add_plugin(
LegacyContextPlugin(
broker=self._broker, equipment_broker=equipment_broker
)
)
self._hardware_api.should_taskify_movement_execution(taskify=True)
else:
self._hardware_api.should_taskify_movement_execution(taskify=False)

context = self._legacy_context_creator.create(
protocol=protocol,
Expand Down Expand Up @@ -220,6 +222,7 @@ def __init__(
# TODO(mc, 2022-01-11): replace task queue with specific implementations
# of runner interface
self._task_queue = task_queue or TaskQueue(cleanup_func=protocol_engine.finish)
self._hardware_api.should_taskify_movement_execution(taskify=False)

async def load(self, protocol_source: ProtocolSource) -> None:
"""Load a JSONv6+ ProtocolSource into managed ProtocolEngine."""
Expand Down Expand Up @@ -307,6 +310,7 @@ def __init__(
# of runner interface
self._hardware_api = hardware_api
self._task_queue = task_queue or TaskQueue(cleanup_func=protocol_engine.finish)
self._hardware_api.should_taskify_movement_execution(taskify=False)

def prepare(self) -> None:
"""Set the task queue to wait until all commands are executed."""
Expand Down
29 changes: 28 additions & 1 deletion api/tests/opentrons/protocol_engine/test_protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ async def test_stop(
state_store: StateStore,
subject: ProtocolEngine,
) -> None:
"""It should be able to stop the engine and halt the hardware."""
"""It should be able to stop the engine and run execution."""
expected_action = StopAction()

decoy.when(
Expand All @@ -685,6 +685,33 @@ async def test_stop(
)


async def test_stop_for_legacy_core_protocols(
decoy: Decoy,
action_dispatcher: ActionDispatcher,
queue_worker: QueueWorker,
hardware_stopper: HardwareStopper,
hardware_api: HardwareControlAPI,
state_store: StateStore,
subject: ProtocolEngine,
) -> None:
"""It should be able to stop the engine & run execution and cancel movement tasks."""
expected_action = StopAction()

decoy.when(
state_store.commands.validate_action_allowed(expected_action),
).then_return(expected_action)

decoy.when(hardware_api.is_movement_execution_taskified()).then_return(True)

await subject.stop()

decoy.verify(
action_dispatcher.dispatch(expected_action),
queue_worker.cancel(),
await hardware_api.cancel_execution_and_running_tasks(),
)


@pytest.mark.parametrize("maintenance_run", [True, False])
async def test_estop_during_command(
decoy: Decoy,
Expand Down

0 comments on commit 227a742

Please sign in to comment.