Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified cleanup using scoped behaviors #103

Merged
merged 17 commits into from
Sep 26, 2023
Merged
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated eventually_swiss and verified that it passes tests
  • Loading branch information
amalnanavati committed Sep 22, 2023
commit db5019d72e2258e51fb2d5e50a0180935ec0c76c
3 changes: 3 additions & 0 deletions ada_feeding/ada_feeding/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -23,5 +23,8 @@
from .set_position_path_constraint import SetPositionPathConstraint
from .set_orientation_path_constraint import SetOrientationPathConstraint

# Force Status
from .force_status import ForceStatus

# On Preempt
from .on_preempt import OnPreempt
68 changes: 68 additions & 0 deletions ada_feeding/ada_feeding/decorators/force_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
This module defines the ForceStatus decorator, which ignores the child's
current status and always returns the status specified by the user.
"""

# Standard imports
import typing

# Third-party imports
from py_trees import behaviour, common
from py_trees.decorators import Decorator

# Local imports


class ForceStatus(Decorator):
"""
This decorator ignores the child's current status and always returns
the status specified by the user. This can be useful e.g., to force a
composite without memory to move on to the next child while still ticking
the child of this decorator.

While this decorator's behavior can be achieved with a chain of "X is Y"
style decorators, it is often conceptually easier to reason about a decorator
always returning one status, as opposed to a chain of decorators that each
flip their child's status.
"""

def __init__(self, name: str, child: behaviour.Behaviour, status: common.Status):
"""
Initialise the decorator.

Args:
name: the decorator name
child: the child to be decorated
status: the status to return on :meth:`update`
"""
super().__init__(name=name, child=child)
self.const_status = status

def tick(self) -> typing.Iterator[behaviour.Behaviour]:
"""
Don't stop the child even if this decorator's status is non-RUNNING.

Yields:
a reference to itself
"""
self.logger.debug(f"{self.__class__.__name__}.tick()")
# initialise just like other behaviours/composites
if self.status != common.Status.RUNNING:
self.initialise()
# interrupt proceedings and process the child node
# (including any children it may have as well)
for node in self.decorated.tick():
yield node
# resume normal proceedings for a Behaviour's tick
self.status = self.update()
amalnanavati marked this conversation as resolved.
Show resolved Hide resolved
# do not stop even if this decorator's status is non-RUNNING
yield self

def update(self) -> common.Status:
"""
Return the status specified when creating this decorator.

Returns:
the behaviour's new status :class:`~py_trees.common.Status`
"""
return self.const_status
106 changes: 90 additions & 16 deletions ada_feeding/ada_feeding/idioms/eventually_swiss.py
Original file line number Diff line number Diff line change
@@ -8,13 +8,11 @@
import typing

from py_trees import behaviour, behaviours, common, composites
from py_trees.decorators import Inverter, StatusToBlackboard, SuccessIsFailure

from ada_feeding.decorators import OnPreempt
from ada_feeding.decorators import ForceStatus, OnPreempt


# pylint: disable=too-many-arguments
# This is acceptable, to give users maximum control over how the swiss knife
# idiom behaves.
def eventually_swiss(
name: str,
workers: typing.List[behaviour.Behaviour],
@@ -24,6 +22,7 @@ def eventually_swiss(
on_preempt_single_tick: bool = True,
on_preempt_period_ms: int = 0,
on_preempt_timeout: typing.Optional[float] = None,
status_blackboard_key: typing.Optional[str] = None,
) -> behaviour.Behaviour:
"""
Implement a multi-tick, general purpose 'try-except-else'-like pattern.
@@ -53,40 +52,115 @@ def eventually_swiss(
on_preempt_timeout: how long (sec) to wait for the on_preempt behaviour
to reach a status other than :data:`~py_trees.common.Status.RUNNING`
if `on_preempt_single_tick` is False. If None, then do not timeout.
status_blackboard_key: the key to use for the status blackboard variable.
If None, use "/{name}/eventually_swiss_status".

Returns:
:class:`~py_trees.behaviour.Behaviour`: the root of the oneshot subtree

.. seealso:: :meth:`py_trees.idioms.eventually`, :ref:`py-trees-demo-eventually-swiss-program`
"""
# pylint: disable=too-many-arguments, too-many-locals
# This is acceptable, to give users maximum control over how the swiss knife
# idiom behaves.

# Create the subtree to handle `on_success`
if status_blackboard_key is None:
status_blackboard_key = f"/{name}/eventually_swiss_status"
unset_status = behaviours.UnsetBlackboardVariable(
name="Unset Status", key=status_blackboard_key
)
save_success_status = StatusToBlackboard(
name="Save Success Status",
child=on_success,
variable_name=status_blackboard_key,
)
on_success_sequence = composites.Sequence(
name="Work to Success", memory=True, children=workers + [on_success]
name="Work to Success",
memory=True,
children=[unset_status] + workers + [save_success_status],
)

# Create the subtree to handle `on_failure`. This subtree must start with
# `return_status_if_set` so that `on_failure` is not ticked if `on_success`
# fails.
check_status_exists = behaviours.CheckBlackboardVariableExists(
name="Wait for Status",
variable_name=status_blackboard_key,
)
# Note that we can get away with using an Inverter here because the only way
# we get to this branch is if either the `workers` or `on_success` fails.
# So the status either doesn't exist or is FAILURE.
return_status_if_set = Inverter(
name="Return Status if Set",
child=check_status_exists,
)
on_failure_always_fail = SuccessIsFailure(
name="On Failure Always Failure",
child=on_failure,
)
save_failure_status = StatusToBlackboard(
name="Save Failure Status",
child=on_failure_always_fail,
variable_name=status_blackboard_key,
)
on_failure_sequence = composites.Sequence(
name="On Failure",
memory=True,
children=[on_failure, behaviours.Failure(name="Failure")],
children=[return_status_if_set, save_failure_status],
)

# Create the combined subtree to handle `on_success` and `on_failure`
combined = composites.Selector(
name="On Non-Preemption",
name="On Non-Preemption Subtree",
memory=True,
children=[on_success_sequence, on_failure_sequence],
)
subtree_root = composites.Selector(
name=name,
memory=False,
children=[],
# We force the outcome of this tree to FAILURE so that the Selector always
# goes on to tick the `on_preempt_subtree`, which is necessary to ensure
# that the `on_preempt` behavior will get run if the tree is preempted.
on_success_or_failure_subtree = ForceStatus(
name="On Non-Preemption",
child=combined,
status=common.Status.FAILURE,
)
decorator = OnPreempt(

# Create the subtree to handle `on_preempt`
on_preempt_subtree = OnPreempt(
name="On Preemption",
child=on_preempt,
# Returning FAILURE is necessary so (a) the Selector always moves on to
# `combined`; and (b) the decorator's status is not INVALID (which would
# cause the Selector to not pass on a `stop(INVALID)` call to it).
# `set_status_subtree`; and (b) the decorator's status is not INVALID (which would
# prevent the Selector from passing on a `stop(INVALID)` call to it).
update_status=common.Status.FAILURE,
single_tick=on_preempt_single_tick,
period_ms=on_preempt_period_ms,
timeout=on_preempt_timeout,
)
subtree_root.add_children([decorator, combined])
return subtree_root

# Create the subtree to output the status once `on_success_or_failure_subtree`
# is done.
wait_for_status = behaviours.WaitForBlackboardVariable(
name="Wait for Status",
variable_name=status_blackboard_key,
)
blackboard_to_status = behaviours.BlackboardToStatus(
name="Blackboard to Status",
variable_name=status_blackboard_key,
)
set_status_subtree = composites.Sequence(
name="Set Status",
memory=True,
children=[wait_for_status, blackboard_to_status],
)

root = composites.Selector(
name=name,
memory=False,
children=[
on_success_or_failure_subtree,
on_preempt_subtree,
set_status_subtree,
],
)
return root