diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/action.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/action.py
new file mode 100644
index 0000000..5d769d5
--- /dev/null
+++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/action.py
@@ -0,0 +1,583 @@
+# Copyright (c) 2024 Boston Dynamics AI Institute Inc. All rights reserved.
+
+import collections
+import inspect
+import queue
+import threading
+import warnings
+from collections.abc import Sequence
+from typing import Any, Callable, Iterable, List, Optional, Type, Union
+
+import action_msgs.msg
+from rclpy.action.client import ActionClient, ClientGoalHandle
+from rclpy.node import Node
+from rclpy.task import Future
+
+import bdai_ros2_wrappers.scope as scope
+from bdai_ros2_wrappers.futures import wait_for_future
+
+
+class ActionException(Exception):
+ """Base action exception."""
+
+ pass
+
+
+class ActionTimeout(ActionException):
+ """Exception raised on action timeout."""
+
+ pass
+
+
+class ActionRejected(ActionException):
+ """Exception raised when the action is rejected."""
+
+ pass
+
+
+class ActionAborted(ActionException):
+ """Exception raised when the action is aborted."""
+
+ pass
+
+
+class ActionCancelled(ActionException):
+ """Exception raised when the action is cancelled."""
+
+ pass
+
+
+class Tape:
+ """A thread-safe data tape that can be written and iterated safely."""
+
+ class Stream:
+ """A synchronized data stream."""
+
+ def __init__(self, max_size: Optional[int] = None, label: Optional[str] = None) -> None:
+ """
+ Initializes the stream.
+
+ Args:
+ max_size: optional maximum stream size.
+ """
+ self._queue: queue.Queue = queue.Queue(max_size or 0)
+ self._label = label
+
+ @property
+ def label(self) -> Optional[str]:
+ return self._label
+
+ def write(self, data: Any) -> bool:
+ """
+ Write data to the stream.
+
+ Returns:
+ True if the write operation succeeded, False if the
+ stream has grown to its specified maximum size and
+ the write operation cannot be carried out.
+ """
+ try:
+ self._queue.put_nowait(data)
+ except queue.Full:
+ return False
+ return True
+
+ def read(self, timeout_sec: Optional[float] = None) -> Optional[Any]:
+ """
+ Read data from the stream.
+
+ Args:
+ timeout_sec: optional read timeout, in seconds.
+
+ Returns:
+ data if the read is successful and ``None``
+ if the read times out or is interrupted.
+ """
+ try:
+ data = self._queue.get(timeout=timeout_sec)
+ except queue.Empty:
+ return None
+ self._queue.task_done()
+ return data
+
+ def interrupt(self) -> None:
+ """
+ Interrupt the stream and wake up the reader.
+ """
+ try:
+ self._queue.put_nowait(None)
+ except queue.Full:
+ pass
+
+ @property
+ def consumed(self) -> bool:
+ """Check if all stream data has been consumed."""
+ return self._queue.empty()
+
+ def __init__(self, max_length: Optional[int] = None) -> None:
+ """
+ Initializes the data tape.
+
+ Args:
+ max_length: optional maximum tape length.
+ """
+ self._lock = threading.Lock()
+ self._streams: List[Tape.Stream] = []
+ self._content: collections.deque = collections.deque(maxlen=max_length)
+ self._closed = False
+
+ def write(self, data: Any) -> None:
+ """Write the data tape."""
+ with self._lock:
+ for stream in self._streams:
+ if not stream.write(data):
+ message = "Stream is filled up, dropping message"
+ if stream.label:
+ message = f"{stream.label}: {message}"
+ warnings.warn(message, RuntimeWarning)
+ self._content.append(data)
+
+ def content(
+ self,
+ *,
+ follow: bool = False,
+ follow_buffer_size: Optional[int] = None,
+ timeout_sec: Optional[float] = None,
+ label: Optional[str] = None,
+ ) -> Iterable:
+ """
+ Iterate over the data tape.
+
+ When following the data tape, iteration stops when the given timeout
+ expires and when the data tape is closed.
+
+ Args:
+ follow: whether to follow the data tape as it gets written or not.
+ follow_buffer_size: optional buffer size when following the data tape.
+ If none is provided, the data tape size will be used as default.
+ timeout_sec: optional timeout, in seconds, when following the data tape.
+ label: optional label to qualify logs and warnings.
+
+ Returns:
+ a lazy iterable over the data tape.
+ """
+ with self._lock:
+ content = self._content.copy()
+ stream: Optional[Tape.Stream] = None
+ if follow and not self._closed:
+ stream = Tape.Stream(follow_buffer_size, label)
+ self._streams.append(stream)
+ try:
+ yield from content
+ if stream is not None:
+ while not self._closed:
+ feedback = stream.read(timeout_sec)
+ if feedback is None:
+ break
+ yield feedback
+ while not stream.consumed:
+ # This is safe as long as there is
+ # a single reader for the stream,
+ # which is currently the case.
+ feedback = stream.read(timeout_sec)
+ if feedback is None:
+ continue
+ yield feedback
+ finally:
+ if stream is not None:
+ with self._lock:
+ self._streams.remove(stream)
+
+ def close(self) -> None:
+ """
+ Close the data tape.
+
+ This will interrupt all following content iterators.
+ """
+ self._closed = True
+ with self._lock:
+ for stream in self._streams:
+ stream.interrupt()
+
+
+class ActionFuture:
+ """
+ A proxy to a ROS 2 action invocation.
+
+ Action futures are to actions what plain futures are to services, with a bit more functionality
+ to cover action specific semantics such feedback and cancellation.
+
+ Action futures are rarely instantiated explicitly, but implicitly through `Actionable` APIs.
+ """
+
+ def __init__(self, goal_handle_future: Future, feedback_tape: Optional[Tape] = None) -> None:
+ """
+ Initializes the action future.
+
+ Args:
+ goal_handle_future: action goal handle future, as returned by the
+ `rclpy.action.ActionClient.send_goal_async` method.
+ feedback_tape: optional feedback tape, assumed to be tracking action feedback.
+ """
+ self._executor_ref = goal_handle_future._executor
+ self._goal_handle_future = goal_handle_future
+ if feedback_tape is not None:
+ executor = self._executor_ref()
+ if executor is not None:
+ executor.context.on_shutdown(feedback_tape.close)
+ self._feedback_tape = feedback_tape
+ self._result_future: Optional[Future] = None
+ self._acknowledgement_future = Future(executor=self._executor_ref())
+ self._finalization_future = Future(executor=self._executor_ref())
+ self._goal_handle_future.add_done_callback(self._goal_handle_callback)
+
+ def _goal_handle_callback(self, goal_handle_future: Future) -> None:
+ """Callback on action goal handle future resolution."""
+ exception = goal_handle_future.exception()
+ if exception is not None:
+ self._acknowledgement_future.set_exception(exception)
+ # An exception during acknowledgement is a rejection
+ self._finalization_future.set_result(None)
+ return
+ goal_handle = goal_handle_future.result()
+ self._acknowledgement_future.set_result(goal_handle.accepted)
+ if goal_handle.accepted:
+ self._result_future = goal_handle.get_result_async()
+ self._result_future.add_done_callback(self._result_callback)
+ else:
+ self._finalization_future.set_result(None)
+
+ def _result_callback(self, result_future: Future) -> None:
+ """Callback on action result future resolution."""
+ exception = result_future.exception()
+ if exception is None:
+ self._finalization_future.set_result(result_future.result().status)
+ else:
+ self._finalization_future.set_exception(exception)
+ if self._feedback_tape is not None:
+ self._feedback_tape.close()
+
+ @property
+ def acknowledgement(self) -> Future:
+ """Get future to wait for action acknowledgement (either acceptance or rejection)."""
+ return self._acknowledgement_future
+
+ @property
+ def acknowledged(self) -> bool:
+ """Check if action was acknowledged (i.e. accepted or rejected)"""
+ return self._acknowledgement_future.done()
+
+ @property
+ def goal_handle(self) -> ClientGoalHandle:
+ """
+ Get action goal handle.
+
+ Raises:
+ RuntimeError: if action has not been accepted or rejected yet.
+ """
+ if not self._goal_handle_future.done():
+ raise RuntimeError("Action not acknowledged yet")
+ return self._goal_handle_future.result()
+
+ @property
+ def finalization(self) -> Future:
+ """
+ Get future to wait for action finalization.
+
+ Action rejection also counts as finalization.
+ """
+ return self._finalization_future
+
+ @property
+ def finalized(self) -> bool:
+ """Check if action was finalized."""
+ return self._finalization_future.done()
+
+ @property
+ def result(self) -> Any:
+ """
+ Get action result.
+
+ Raises:
+ RuntimeError: if action is still executing.
+ """
+ if self._result_future is None or not self._result_future.done():
+ raise RuntimeError("Action still executing")
+ return self._result_future.result().result
+
+ @property
+ def tracks_feedback(self) -> bool:
+ """Check if action future tracks action feedback."""
+ return self._feedback_tape is not None
+
+ @property
+ def feedback(self) -> Sequence:
+ """
+ Get all buffered action feedback.
+
+ Action must have been accepted before any feedback can be fetched.
+
+ Raises:
+ RuntimeError: if action feedback tracking is not enabled,
+ action acknowledgement has not been received yet, or
+ action was not accepted.
+ """
+ if self._feedback_tape is None:
+ raise RuntimeError("Action feedback tracking is disabled")
+ if not self._goal_handle_future.done():
+ raise RuntimeError("Action not acknowledged yet")
+ goal_handle = self._goal_handle_future.result()
+ if not goal_handle.accepted:
+ raise RuntimeError("Action not accepted")
+ return list(self._feedback_tape.content())
+
+ def feedback_stream(self, *, buffer_size: Optional[int] = None, timeout_sec: Optional[float] = None) -> Iterable:
+ """
+ Iterate over action feedback as it comes.
+
+ This includes buffered action feedback. Iteration stops when the given timeout
+ expires or when the action is done executing. Note that iterating over action
+ feedback to come is a blocking operation.
+
+ Action must have been accepted before feedback streaming is allowed.
+
+ Args:
+ buffer_size: optional maximum size for the incoming feedback buffer.
+ If none is provided, the configured feedback tracking buffer size will
+ be used.
+ timeout_sec: optional timeout, in seconds, for new action feedback.
+
+ Returns:
+ a lazy iterable over action feedback.
+
+ Raises:
+ RuntimeError: if action feedback tracking is not enabled,
+ action acknowledgement has not been received yet, or
+ action was not accepted.
+ """
+ if self._feedback_tape is None:
+ raise RuntimeError("Action feedback tracking is not enabled")
+ if not self._goal_handle_future.done():
+ raise RuntimeError("Action not acknowledged yet")
+ goal_handle = self._goal_handle_future.result()
+ if not goal_handle.accepted:
+ raise RuntimeError("Action not accepted")
+ outerframe = inspect.stack()[1]
+ yield from self._feedback_tape.content(
+ follow=True, timeout_sec=timeout_sec, label=f"{outerframe.filename}:{outerframe.lineno}"
+ )
+
+ @property
+ def accepted(self) -> bool:
+ """Check if action was accepted."""
+ return not self._goal_handle_future.done() or self._goal_handle_future.result().accepted
+
+ @property
+ def cancelled(self) -> bool:
+ """Check if action was cancelled."""
+ return (
+ self._result_future is not None
+ and self._result_future.done()
+ and (self._result_future.result().status == action_msgs.msg.GoalStatus.STATUS_CANCELED)
+ )
+
+ @property
+ def aborted(self) -> bool:
+ """Check if action was aborted."""
+ return (
+ self._result_future is not None
+ and self._result_future.done()
+ and (self._result_future.result().status == action_msgs.msg.GoalStatus.STATUS_ABORTED)
+ )
+
+ @property
+ def succeeded(self) -> bool:
+ """Check if action was succeeded."""
+ return (
+ self._result_future is not None
+ and self._result_future.done()
+ and (self._result_future.result().status == action_msgs.msg.GoalStatus.STATUS_SUCCEEDED)
+ )
+
+ @property
+ def status(self) -> Optional[int]:
+ """
+ Get action status code.
+
+ See `action_msgs.msg.GoalStatus` documentation for status codes.
+ Rejected actions are conventially assigned `None` for status.
+
+ Raises:
+ RuntimeError: if action has not been acknowledged or is still executing.
+ """
+ if not self._goal_handle_future.done():
+ raise RuntimeError("Action not acknowledged yet")
+ if not self._goal_handle_future.result().accepted:
+ return None
+ if self._result_future is None or not self._result_future.done():
+ raise RuntimeError("Action still executing")
+ return self._result_future.result().status
+
+ def as_future(self) -> Future:
+ """
+ Get action future as a plain future.
+
+ Useful to pass action futures to APIs that expect plain futures.
+ """
+ future = Future(executor=self._executor_ref())
+
+ def _done_callback(_: Future) -> None:
+ if future.cancelled():
+ self.cancel()
+
+ future.add_done_callback(_done_callback)
+
+ def _bridge_callback(finalization_future: Future) -> None:
+ if not self.accepted:
+ future.set_exception(ActionRejected())
+ return
+ if self.aborted:
+ future.set_exception(ActionRejected())
+ return
+ if self.cancelled:
+ future.set_exception(ActionCancelled())
+ return
+ exception = finalization_future.exception()
+ if exception:
+ future.set_exception(exception)
+ return
+ future.set_result(self.result)
+
+ self._finalization_future.add_done_callback(_bridge_callback)
+
+ return future
+
+ def cancel(self) -> Future:
+ """
+ Cancel action.
+
+ Returns:
+ a future for the cancellation status code.
+ See `action_msgs.srv.CancelGoal` documentation for
+ the list of possible status codes.
+ """
+ cancellation_future = Future(executor=self._executor_ref())
+
+ def _response_callback(cancel_goal_future: Future) -> None:
+ exception = cancel_goal_future.exception()
+ if exception is None:
+ response = cancel_goal_future.result()
+ cancellation_future.set_result(response.return_code)
+ else:
+ cancellation_future.set_exception(exception)
+
+ cancel_goal_future = self.goal_handle.cancel_goal_async()
+ cancel_goal_future.add_done_callback(_response_callback)
+ return cancellation_future
+
+
+class Actionable:
+ """
+ An ergonomic interface to call actions in ROS 2.
+
+ Actionables wrap `rclpy.action.ActionClient` instances to allow for synchronous
+ and asynchronous action invocation, in a way that resembles remote procedure calls.
+ """
+
+ def __init__(self, action_type: Type, action_name: str, node: Optional[Node] = None, **kwargs: Any) -> None:
+ """
+ Initializes the actionable.
+
+ Args:
+ action_type: Target action type class (as generated by ``rosidl``).
+ action_name: Name of the target action in the ROS 2 graph.
+ node: optional node for the underlying action client, defaults to
+ the current process node.
+
+ All other keyword arguments are forward to the underlying action client.
+ See `rclpy.action.ActionClient` documentation for further reference.
+ """
+ node = node or scope.node()
+ self._action_type = action_type
+ self._action_name = action_name
+ self._action_client = ActionClient(node, action_type, action_name, **kwargs)
+
+ @property
+ def action_name(self) -> str:
+ """Get the target action name."""
+ return self._action_name
+
+ @property
+ def action_type(self) -> Type:
+ """Get the target action type."""
+ return self._action_type
+
+ @property
+ def action_client(self) -> ActionClient:
+ """Get the underlying action client."""
+ return self._action_client
+
+ def __call__(self, *args: Any, **kwargs: Any) -> Any:
+ """Forward invocation to `Actionable.synchronously`."""
+ return self.synchronously(*args, **kwargs)
+
+ def synchronously(
+ self, goal: Any, *, feedback_callback: Optional[Callable] = None, timeout_sec: Optional[float] = None
+ ) -> Any:
+ """
+ Invoke action synchronously.
+
+ Args:
+ goal: goal to invoke action with.
+ feedback_callback: optional action feedback callback.
+ timeout_sec: optional action timeout, in seconds. If a timeout is specified and it
+ expires, the action will be cancelled and the call will raise. Note this timeout is
+ local to the caller. It may take some time for the action to be cancelled, that is
+ if cancellation is not rejected by the action server.
+
+ Returns:
+ the action result.
+
+ Raises:
+ ActionTimeout: if the action timed out.
+ ActionRejected: if the action was not accepted.
+ ActionCancelled: if the action was cancelled.
+ ActionAborted: if the action was aborted.
+ """
+ action = ActionFuture(self._action_client.send_goal_async(goal, feedback_callback))
+ if not wait_for_future(action.finalization, timeout_sec=timeout_sec):
+ action.cancel() # proactively cancel
+ raise ActionTimeout()
+ if not action.accepted:
+ raise ActionRejected()
+ if action.cancelled:
+ raise ActionCancelled()
+ if action.aborted:
+ raise ActionAborted()
+ return action.result
+
+ def asynchronously(self, goal: Any, *, track_feedback: Union[int, bool] = False) -> ActionFuture:
+ """
+ Invoke action asynchronously.
+
+ Args:
+ goal: goal to invoke action with.
+ track_feedback: whether and how to track action feedback. Other
+ than a boolean to enable or disable tracking, a positive integer
+ may be provided to cap feedback buffer size.
+
+ Returns:
+ the action future.
+ """
+ feedback_tape: Optional[Tape] = None
+ if track_feedback:
+ feedback_tape_length = None
+ if track_feedback is not True:
+ feedback_tape_length = track_feedback
+ feedback_tape = Tape(feedback_tape_length)
+ goal_handle_future = self._action_client.send_goal_async(
+ goal, lambda feedback: feedback_tape.write(feedback.feedback)
+ )
+ else:
+ goal_handle_future = self._action_client.send_goal_async(goal)
+ return ActionFuture(goal_handle_future, feedback_tape)
diff --git a/bdai_ros2_wrappers/package.xml b/bdai_ros2_wrappers/package.xml
index 26c0b7c..c3f95e8 100644
--- a/bdai_ros2_wrappers/package.xml
+++ b/bdai_ros2_wrappers/package.xml
@@ -7,6 +7,10 @@
The AI Institute
MIT
+ action_msgs
+ rclpy
+
+ example_interfaces
python3-pytest
diff --git a/bdai_ros2_wrappers/test/test_action.py b/bdai_ros2_wrappers/test/test_action.py
new file mode 100644
index 0000000..6aa3015
--- /dev/null
+++ b/bdai_ros2_wrappers/test/test_action.py
@@ -0,0 +1,220 @@
+# Copyright (c) 2024 Boston Dynamics AI Institute Inc. All rights reserved.
+import array
+import collections
+import time
+from typing import Iterable
+from unittest.mock import Mock
+
+import pytest
+from action_msgs.msg import GoalStatus
+from action_msgs.srv import CancelGoal
+from example_interfaces.action import Fibonacci
+from rclpy.action.server import ActionServer, CancelResponse, GoalResponse, ServerGoalHandle
+
+from bdai_ros2_wrappers.action import Actionable, ActionAborted, ActionCancelled, ActionRejected
+from bdai_ros2_wrappers.futures import wait_for_future
+from bdai_ros2_wrappers.scope import ROSAwareScope
+
+
+def fibonacci_sequence(order: int) -> Iterable[int]:
+ """Generate a Fibonacci sequence for the given `order`."""
+ yield 0
+ if order > 0:
+ yield 1
+ if order > 1:
+ sequence: collections.deque = collections.deque([0, 1], maxlen=2)
+ for partial_order in range(2, order + 1):
+ sequence.append(sequence[-1] + sequence[-2])
+ yield sequence[-1]
+
+
+def default_execute_callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
+ """Execute default Fibonacci action behavior."""
+ feedback = Fibonacci.Feedback()
+
+ for number in fibonacci_sequence(goal_handle.request.order):
+ feedback.sequence.append(number)
+ goal_handle.publish_feedback(feedback)
+ time.sleep(0.01)
+
+ goal_handle.succeed()
+
+ result = Fibonacci.Result()
+ result.sequence = feedback.sequence
+ return result
+
+
+def test_successful_synchronous_action_invocation(ros: ROSAwareScope) -> None:
+ ActionServer(ros.node, Fibonacci, "fibonacci/compute", default_execute_callback)
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ result = compute_fibonacci(Fibonacci.Goal(order=5), timeout_sec=10.0)
+ expected_result = array.array("i", [0, 1, 1, 2, 3, 5])
+ assert result.sequence == expected_result
+
+
+def test_successful_synchronous_action_invocation_with_feedback(ros: ROSAwareScope) -> None:
+ mock_callback = Mock()
+ ActionServer(ros.node, Fibonacci, "fibonacci/compute", default_execute_callback)
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ result = compute_fibonacci(Fibonacci.Goal(order=5), feedback_callback=mock_callback, timeout_sec=10.0)
+ expected_result = array.array("i", [0, 1, 1, 2, 3, 5])
+ assert result.sequence == expected_result
+ assert mock_callback.call_count > 0
+
+
+def test_successful_asynchronous_action_invocation(ros: ROSAwareScope) -> None:
+ ActionServer(ros.node, Fibonacci, "fibonacci/compute", default_execute_callback)
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ action = compute_fibonacci.asynchronously(Fibonacci.Goal(order=5))
+ assert wait_for_future(action.finalization, timeout_sec=10.0)
+ assert action.acknowledged
+ assert action.accepted
+ assert action.finalized
+ assert not action.aborted
+ assert not action.cancelled
+ assert action.succeeded
+ expected_result = array.array("i", [0, 1, 1, 2, 3, 5])
+ assert action.result.sequence == expected_result
+
+
+def test_successful_asynchronous_action_invocation_with_feedback(ros: ROSAwareScope) -> None:
+ ActionServer(ros.node, Fibonacci, "fibonacci/compute", default_execute_callback)
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ action = compute_fibonacci.asynchronously(Fibonacci.Goal(order=5), track_feedback=True)
+ assert wait_for_future(action.acknowledgement, timeout_sec=10.0)
+ streamed_sequences = []
+ for order, feedback in enumerate(action.feedback_stream(timeout_sec=2.0)):
+ assert feedback.sequence == array.array("i", fibonacci_sequence(order))
+ streamed_sequences.append(feedback.sequence)
+ assert action.finalized
+ assert action.succeeded
+ assert not action.aborted
+ assert not action.cancelled
+ expected_result = array.array("i", [0, 1, 1, 2, 3, 5])
+ assert action.result.sequence == expected_result
+ stored_sequences = [feedback.sequence for feedback in action.feedback]
+ assert stored_sequences == streamed_sequences
+
+
+def test_successful_asynchronous_action_invocation_with_limited_feedback(ros: ROSAwareScope) -> None:
+ ActionServer(ros.node, Fibonacci, "fibonacci/compute", default_execute_callback)
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ action = compute_fibonacci.asynchronously(Fibonacci.Goal(order=5), track_feedback=1)
+ assert wait_for_future(action.acknowledgement, timeout_sec=10.0)
+ *_, last_feedback = action.feedback_stream(buffer_size=10, timeout_sec=2.0)
+ assert action.finalized
+ assert action.succeeded
+ assert not action.aborted
+ assert not action.cancelled
+ expected_result = array.array("i", [0, 1, 1, 2, 3, 5])
+ assert action.result.sequence == expected_result
+ assert len(action.feedback) == 1
+ assert action.feedback[-1].sequence == last_feedback.sequence
+
+
+def test_rejected_synchronous_action_invocation(ros: ROSAwareScope) -> None:
+ ActionServer(
+ ros.node, Fibonacci, "fibonacci/compute", default_execute_callback, goal_callback=lambda _: GoalResponse.REJECT
+ )
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ with pytest.raises(ActionRejected):
+ compute_fibonacci(Fibonacci.Goal(order=5), timeout_sec=10.0)
+
+
+def test_rejected_asynchronous_action_invocation(ros: ROSAwareScope) -> None:
+ ActionServer(
+ ros.node, Fibonacci, "fibonacci/compute", default_execute_callback, goal_callback=lambda _: GoalResponse.REJECT
+ )
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ action = compute_fibonacci.asynchronously(Fibonacci.Goal(order=5))
+ assert wait_for_future(action.finalization, timeout_sec=10.0)
+ assert action.acknowledged
+ assert action.finalized
+ assert not action.accepted
+ assert not action.aborted
+ assert not action.succeeded
+ assert not action.cancelled
+
+
+def test_aborted_synchronous_action_invocation(ros: ROSAwareScope) -> None:
+ def execute_callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
+ goal_handle.abort()
+ result = Fibonacci.Result()
+ return result
+
+ ActionServer(ros.node, Fibonacci, "fibonacci/compute", execute_callback)
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ with pytest.raises(ActionAborted):
+ compute_fibonacci(Fibonacci.Goal(order=5), timeout_sec=10.0)
+
+
+def test_aborted_asynchronous_action_invocation(ros: ROSAwareScope) -> None:
+ def execute_callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
+ goal_handle.abort()
+ result = Fibonacci.Result()
+ return result
+
+ ActionServer(ros.node, Fibonacci, "fibonacci/compute", execute_callback)
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ action = compute_fibonacci.asynchronously(Fibonacci.Goal(order=5))
+ assert wait_for_future(action.finalization, timeout_sec=10.0)
+ assert action.status == GoalStatus.STATUS_ABORTED
+ assert action.acknowledged
+ assert action.accepted
+ assert action.finalized
+ assert not action.cancelled
+ assert action.aborted
+ assert not action.succeeded
+
+
+def test_cancelled_synchronous_action_invocation(ros: ROSAwareScope) -> None:
+ def execute_callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
+ while True:
+ if goal_handle.is_cancel_requested:
+ goal_handle.canceled()
+ return Fibonacci.Result()
+ time.sleep(0.01)
+
+ ActionServer(
+ ros.node, Fibonacci, "fibonacci/compute", execute_callback, cancel_callback=lambda _: CancelResponse.ACCEPT
+ )
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+
+ assert ros.node is not None
+ cancel_goal_client = ros.node.create_client(CancelGoal, "fibonacci/compute/_action/cancel_goal")
+
+ def deferred_cancel() -> None:
+ time.sleep(0.5)
+ cancel_goal_client.call(CancelGoal.Request())
+
+ assert ros.executor is not None
+ ros.executor.create_task(deferred_cancel)
+ with pytest.raises(ActionCancelled):
+ compute_fibonacci(Fibonacci.Goal(order=5), timeout_sec=10.0)
+
+
+def test_cancelled_asynchronous_action_invocation(ros: ROSAwareScope) -> None:
+ def execute_callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
+ while True:
+ if goal_handle.is_cancel_requested:
+ goal_handle.canceled()
+ return Fibonacci.Result()
+ time.sleep(0.01)
+
+ ActionServer(
+ ros.node, Fibonacci, "fibonacci/compute", execute_callback, cancel_callback=lambda _: CancelResponse.ACCEPT
+ )
+ compute_fibonacci = Actionable(Fibonacci, "fibonacci/compute", ros.node)
+ action = compute_fibonacci.asynchronously(Fibonacci.Goal(order=5))
+ assert wait_for_future(action.acknowledgement, timeout_sec=10.0)
+ assert action.acknowledged
+ assert action.accepted
+ cancellation_future = action.cancel()
+ assert wait_for_future(cancellation_future, timeout_sec=10.0)
+ assert cancellation_future.result() == CancelGoal.Response.ERROR_NONE
+ assert wait_for_future(action.finalization, timeout_sec=10.0)
+ assert action.finalized
+ assert action.status == GoalStatus.STATUS_CANCELED
+ assert action.cancelled
+ assert not action.aborted
+ assert not action.succeeded
diff --git a/bdai_ros2_wrappers/test/test_action_client.py b/bdai_ros2_wrappers/test/test_action_client.py
index 54897fe..680f5ba 100644
--- a/bdai_ros2_wrappers/test/test_action_client.py
+++ b/bdai_ros2_wrappers/test/test_action_client.py
@@ -3,7 +3,7 @@
import time
from typing import Any
-from action_tutorials_interfaces.action import Fibonacci
+from example_interfaces.action import Fibonacci
from rclpy.action.server import ActionServer, GoalResponse, ServerGoalHandle
from bdai_ros2_wrappers.action_client import ActionClientWrapper
diff --git a/bdai_ros2_wrappers/test/test_action_handle.py b/bdai_ros2_wrappers/test/test_action_handle.py
index eb815a3..1eb2893 100644
--- a/bdai_ros2_wrappers/test/test_action_handle.py
+++ b/bdai_ros2_wrappers/test/test_action_handle.py
@@ -5,7 +5,7 @@
from typing import Any, Callable, Optional, Tuple
from unittest.mock import Mock
-from action_tutorials_interfaces.action import Fibonacci
+from example_interfaces.action import Fibonacci
from rclpy.action import ActionClient, ActionServer
from rclpy.action.server import CancelResponse, GoalResponse, ServerGoalHandle
from rclpy.node import Node
@@ -35,19 +35,19 @@ def _execute_callback_abort(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
def _execute_callback_feedback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
feedback = Fibonacci.Feedback()
- feedback.partial_sequence = [0, 1]
+ feedback.sequence = [0, 1]
goal_handle.publish_feedback(feedback)
time.sleep(0.001)
for i in range(1, goal_handle.request.order):
- feedback.partial_sequence.append(feedback.partial_sequence[i] + feedback.partial_sequence[i - 1])
+ feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i - 1])
goal_handle.publish_feedback(feedback)
time.sleep(0.01)
goal_handle.succeed()
result = Fibonacci.Result()
- result.sequence = feedback.partial_sequence
+ result.sequence = feedback.sequence
return result
diff --git a/bdai_ros2_wrappers/test/test_single_goal_action_server.py b/bdai_ros2_wrappers/test/test_single_goal_action_server.py
index 0c075b6..f5df2b4 100644
--- a/bdai_ros2_wrappers/test/test_single_goal_action_server.py
+++ b/bdai_ros2_wrappers/test/test_single_goal_action_server.py
@@ -1,7 +1,7 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
import array
-from action_tutorials_interfaces.action import Fibonacci
+from example_interfaces.action import Fibonacci
from rclpy.action.server import ServerGoalHandle
from bdai_ros2_wrappers.action_client import ActionClientWrapper
diff --git a/bdai_ros2_wrappers/test/test_single_goal_multiple_action_servers.py b/bdai_ros2_wrappers/test/test_single_goal_multiple_action_servers.py
index aae3abf..1717e79 100644
--- a/bdai_ros2_wrappers/test/test_single_goal_multiple_action_servers.py
+++ b/bdai_ros2_wrappers/test/test_single_goal_multiple_action_servers.py
@@ -4,7 +4,7 @@
from typing import Tuple
import pytest
-from action_tutorials_interfaces.action import Fibonacci
+from example_interfaces.action import Fibonacci
from rclpy.action.server import GoalStatus, ServerGoalHandle
from bdai_ros2_wrappers.action_client import ActionClientWrapper