diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py index 8df5697..0ae6214 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py @@ -96,7 +96,7 @@ def send_goal_async_handle( Returns: ActionHandle: An object to manage the asynchronous lifecycle of the action request """ - handle = ActionHandle(action_name=action_name, logger=self._node.get_logger()) + handle = ActionHandle(action_name=action_name, logger=self._node.get_logger(), context=self._node.context) if result_callback is not None: handle.set_result_callback(result_callback) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py index 0fb3618..da299c2 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py @@ -4,8 +4,10 @@ from action_msgs.msg import GoalStatus from rclpy.action.client import ClientGoalHandle +from rclpy.context import Context from rclpy.impl.rcutils_logger import RcutilsLogger from rclpy.task import Future +from rclpy.utilities import get_default_context from bdai_ros2_wrappers.type_hints import Action @@ -15,20 +17,24 @@ class ActionHandle(object): as holding the various callbacks for sending an ActionGoal (cancel, failure, feedback, result) """ - def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None): + def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None): """Constructor Args: action_name (str): The name of the action (for logging purposes) logger (Optional[RcutilsLogger]): An optional logger to use. If none is provided, one is created """ + if context is None: + context = get_default_context() self._action_name = action_name self._send_goal_future: Optional[Future] = None self._goal_handle: Optional[ClientGoalHandle] = None self._wait_for_acceptance_event: Event = Event() + context.on_shutdown(self._wait_for_acceptance_event.set) self._get_result_future: Optional[Future] = None self._feedback_callback: Optional[Callable[[Action.Feedback], None]] = None self._wait_for_result_event: Event = Event() + context.on_shutdown(self._wait_for_result_event.set) self._result_callback: Optional[Callable[[Action.Result], None]] = None self._on_failure_callback: Optional[Callable] = None self._cancel_future: Optional[Future] = None diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py index 4feeaba..02c7805 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py @@ -1,25 +1,27 @@ # Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved. from threading import Event -from typing import Any, Optional +from typing import Optional +from rclpy.context import Context from rclpy.task import Future +from rclpy.utilities import get_default_context -def wait_for_future(future: Future, timeout_sec: Optional[float] = None) -> bool: +def wait_for_future(future: Future, timeout_sec: Optional[float] = None, *, context: Optional[Context] = None) -> bool: """Blocks while waiting for a future to become done Args: future (Future): The future to be waited on timeout_sec (Optional[float]): An optional timeout for how long to wait + context (Optional[Context]): Current context (will use the default if none is given) Returns: bool: True if successful, False if the timeout was triggered """ + if context is None: + context = get_default_context() event = Event() - - def done_callback(_: Any) -> None: - nonlocal event - event.set() - - future.add_done_callback(done_callback) - return event.wait(timeout=timeout_sec) + context.on_shutdown(event.set) + future.add_done_callback(lambda _: event.set()) + event.wait(timeout=timeout_sec) + return future.done() diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/service_handle.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/service_handle.py index abc60b6..d6b0f50 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/service_handle.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/service_handle.py @@ -2,8 +2,10 @@ import threading from typing import Callable, Optional +from rclpy.context import Context from rclpy.impl.rcutils_logger import RcutilsLogger from rclpy.task import Future +from rclpy.utilities import get_default_context from bdai_ros2_wrappers.type_hints import SrvTypeResponse @@ -13,10 +15,13 @@ class ServiceHandle(object): as holding the various callbacks for sending an ServiceRequest (result, failure) """ - def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None): + def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None): + if context is None: + context = get_default_context() self._service_name = service_name self._send_service_future: Optional[Future] = None self._future_ready_event = threading.Event() + context.on_shutdown(self._future_ready_event.set) self._result_callback: Optional[Callable] = None self._on_failure_callback: Optional[Callable] = None self._result: Optional[SrvTypeResponse] = None diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/subscription.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/subscription.py index 3d619e0..e1d00fb 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/subscription.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/subscription.py @@ -1,5 +1,4 @@ # Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved. -import threading import typing import rclpy.node @@ -7,6 +6,7 @@ import rclpy.task import bdai_ros2_wrappers.scope as scope +from bdai_ros2_wrappers.futures import wait_for_future MessageT = typing.TypeVar("MessageT") @@ -49,7 +49,12 @@ def callback(msg: MessageT) -> None: def wait_for_message( - msg_type: MessageT, topic_name: str, timeout_sec: typing.Optional[float] = None, **kwargs: typing.Any + msg_type: MessageT, + topic_name: str, + timeout_sec: typing.Optional[float] = None, + *, + node: typing.Optional[rclpy.node.Node] = None, + **kwargs: typing.Any, ) -> typing.Optional[MessageT]: """ Wait for message on a given topic synchronously. @@ -65,9 +70,11 @@ def wait_for_message( Returns: The message received, or None on timeout. """ - event = threading.Event() - future = wait_for_message_async(msg_type, topic_name, **kwargs) - future.add_done_callback(lambda future: event.set()) - if not event.wait(timeout_sec): + node = node or scope.node() + if node is None: + raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)") + future = wait_for_message_async(msg_type, topic_name, node=node, **kwargs) + if not wait_for_future(future, timeout_sec, context=node.context): + future.cancel() return None return future.result() diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/tf_listener_wrapper.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/tf_listener_wrapper.py index fec002c..32a8db6 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/tf_listener_wrapper.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/tf_listener_wrapper.py @@ -1,154 +1,186 @@ # Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved. -import time -from typing import Optional, Tuple -import rclpy +import warnings +from typing import Optional + from geometry_msgs.msg import TransformStamped +from rclpy.clock import ClockType from rclpy.node import Node +from rclpy.task import Future from rclpy.time import Duration, Time -from tf2_ros import ExtrapolationException, TransformException +from tf2_ros import TransformException from tf2_ros.buffer import Buffer from tf2_ros.transform_listener import TransformListener import bdai_ros2_wrappers.scope as scope +from bdai_ros2_wrappers.futures import wait_for_future -class TFListenerWrapper(object): - def __init__( - self, - node: Optional[Node] = None, - wait_for_transform: Optional[Tuple[str, str]] = None, - cache_time_s: Optional[int] = None, - ) -> None: +class TFListenerWrapper: + """ + A `tf2_ros` lookup device, wrapping both a buffer and a listener. + + When using process-wide machinery: + + tf_listener = TFListenerWrapper() + tf_listener.wait_for_a_tform_b(target_frame, source_frame) + tf_listener.lookup_a_tform_b(target_frame, source_frame) + + When composed by a ROS 2 node: + + class MyNode(Node): + + def __init__(self, **kwargs: Any) -> None: + super().__init__("my_node", **kwargs) + self.tf_listener = TFListenerWrapper(self) + # do not wait synchronously here or it will block forever! + # ... + + def callback(self): + if tf_listener.wait_for_a_tform_b(target_frame, source_frame, timeout_sec=0.0): + self.tf_listener.lookup_a_tform_b(target_frame, source_frame) + # or you can lookup and handle exceptions yourself + """ + + def __init__(self, node: Optional[Node] = None, cache_time_s: Optional[float] = None) -> None: + """ + Initializes the wrapper. + + Args: + node: optional node for transform listening, defaults to the current scope node. + cache_time_s: optional transform buffer size, in seconds. + """ node = node or scope.node() if node is None: raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)") self._node = node - + cache_time_py = None if cache_time_s is not None: - cache_time_py = Duration(seconds=cache_time_s) - else: - cache_time_py = None + cache_time_py = Duration(nanoseconds=cache_time_s * 1e9) self._tf_buffer = Buffer(cache_time=cache_time_py) - self._tf_listener = TransformListener(self._tf_buffer, self._node, spin_thread=False) - - if wait_for_transform is not None and len(wait_for_transform) == 2: - self.wait_for_init(wait_for_transform[0], wait_for_transform[1]) + self._tf_listener = TransformListener(self._tf_buffer, self._node) @property def buffer(self) -> Buffer: return self._tf_buffer - def wait_for_init(self, from_frame: str, to_frame: str) -> None: - """ - Waits for transform from from_frame to to_frame to become available and prints initializing statements. - """ - # Wait for the buffer to fill to avoid annoying warnings. - self._node.get_logger().info( - "Waiting for TF to contain transform from " + str(from_frame) + " to " + str(to_frame) - ) - self.wait_for_transform(from_frame, to_frame) - self._node.get_logger().info("TF initialized") - - def wait_for_transform(self, from_frame: str, to_frame: str) -> None: - """ - Wait for the transform from from_frame to to_frame to become available. - """ - # We need a while instead of the usual wait because we use a different node and a thread for our listener - while rclpy.ok(context=self._node.context): - try: - self.lookup_a_tform_b(from_frame, to_frame) - break - except TransformException: - time.sleep(0.1) + def shutdown(self) -> None: + self._tf_listener.unregister() - def _internal_lookup_a_tform_b( - self, - frame_a: str, - frame_b: str, - transform_time: Optional[float] = None, - timeout: Optional[float] = None, - wait_for_frames: bool = False, - ) -> TransformStamped: + def wait_for_a_tform_b_async(self, frame_a: str, frame_b: str, transform_time: Optional[Time] = None) -> Future: """ + Wait asynchronously for the transform from from_frame to to_frame to become available. + Parameters: - frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b - frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b - transform_time: The time at which to look up the transform. If left at None, will be the most recent - transform available - timeout: The time to wait for the transform to become available if the transform_time is beyond the most - recent transform in the buffer. - wait_for_frames: If true, waits timeout amount of time for a path to exist from frame_a to frame_b in the - buffer. If false, this will return immediately if a path does not exist even if timeout is not - None. Note that wait_for_transform can be used to wait indefinitely for a transform to become - available. + frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b + frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b + transform_time: The time at which to look up the transform. If left at None, the most + recent transform available will used. Returns: - The transform frame_a_t_frame_b at the time specified. - Raises: - All the possible TransformExceptions. + A future that will tell if a transform between frame_a and frame_b is available at the time specified. """ if transform_time is None: transform_time = Time() - if timeout is None or not wait_for_frames: - timeout_py = Duration() - else: - timeout_py = Duration(seconds=timeout) - start_time = time.time() - while rclpy.ok(context=self._node.context): - try: - return self._tf_buffer.lookup_transform(frame_a, frame_b, time=transform_time, timeout=timeout_py) - except ExtrapolationException as e: - if "future" not in str(e) or timeout is None: - raise e # Waiting won't help with this - now = time.time() - if now - start_time > timeout: - raise e - time.sleep(0.01) + return self._tf_buffer.wait_for_transform_async(frame_a, frame_b, transform_time) + + def wait_for_a_tform_b( + self, frame_a: str, frame_b: str, transform_time: Optional[Time] = None, timeout_sec: Optional[float] = None + ) -> bool: + """ + Wait for a transform from frame_a to frame_b to become available. + + Note this is a blocking call. If the underlying node is not spinning, an indefinite wait may block forever. + + Parameters: + frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b + frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b + transform_time: The time at which to look up the transform. If left at None, the most + recent transform available will used. + timeout_sec: The time to wait for the transform to become available if the requested time is beyond + the most recent transform in the buffer. If set to 0, it will not wait. If left at None, it will + wait indefinitely. + Returns: + Whether a transform between frame_a and frame_b is available at the specified time. + """ + if self._node.executor is None: + if timeout_sec is None: + warnings.warn("Node is not spinning yet, wait may block forever") + elif timeout_sec > 0.0: + warnings.warn("Node is not spinning yet, wait may be futile") + future = self.wait_for_a_tform_b_async(frame_a, frame_b, transform_time) + if not wait_for_future(future, timeout_sec, context=self._node.context): + future.cancel() + return False + return True def lookup_a_tform_b( self, frame_a: str, frame_b: str, - transform_time: Optional[float] = None, - timeout: Optional[float] = None, + transform_time: Optional[Time] = None, + timeout_sec: Optional[float] = None, wait_for_frames: bool = False, ) -> TransformStamped: """ + Looks up the transform from frame_a to frame_b at the specified time. + Parameters: - frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b - frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b - transform_time: The time at which to look up the transform. If left at None, will be the most recent - transform available - timeout: The time to wait for the transform to become available if the transform_time is beyond the most - recent transform in the buffer. - wait_for_frames: If true, waits timeout amount of time for a path to exist from frame_a to frame_b in the - buffer. If false, this will return immediately if a path does not exist even if timeout is not - None. Note that wait_for_transform can be used to wait indefinitely for a transform to become - available. + frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b + frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b + transform_time: The time at which to look up the transform. If left at None, the most + recent transform available will used. + timeout_sec: The time to wait for the transform to become available if the requested time is beyond + the most recent transform in the buffer. If set to 0, it will not wait. If left at None, it will + wait indefinitely. + wait_for_frames: If true, it will wait for a path to exist from frame_a to frame_b in the + buffer. If false, lookup will fail immediately if a path between frames does not exist, + regardless of what timeout was set. Note that wait_for_a_tform_b can also be used to + wait for a transform to become available. Returns: The transform frame_a_t_frame_b at the time specified. Raises: All the possible TransformExceptions. """ - return self._internal_lookup_a_tform_b(frame_a, frame_b, transform_time, timeout, wait_for_frames) + if not wait_for_frames: + try: + latest_time = self._tf_buffer.get_latest_common_time(frame_a, frame_b) + # tf2 Python bindings yield system clock timestamps rather than ROS clock + # timestamps, regardless of where these timestamps came from (as there is no + # clock notion in tf2) + latest_time = Time(nanoseconds=latest_time.nanoseconds, clock_type=ClockType.ROS_TIME) + if transform_time is not None and latest_time > transform_time: + # no need to wait, either lookup can be satisfied or an extrapolation + # to the past exception will ensue + timeout_sec = 0.0 + except TransformException: + # either frames do not exist or a path between them doesn't, do not wait + timeout_sec = 0.0 + if transform_time is None: + transform_time = Time() + if timeout_sec != 0.0: + self.wait_for_a_tform_b(frame_a, frame_b, transform_time, timeout_sec) + return self._tf_buffer.lookup_transform(frame_a, frame_b, transform_time) def lookup_latest_timestamp( - self, frame_a: str, frame_b: str, timeout: Optional[float] = None, wait_for_frames: bool = False + self, frame_a: str, frame_b: str, timeout_sec: Optional[float] = None, wait_for_frames: bool = False ) -> Time: """ + Looks up the latest time at which a transform from frame_a to frame_b is available. + Parameters: frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b - timeout: The time to wait for the transform to become available if the transform_time is beyond the most - recent transform in the buffer. - wait_for_frames: If true, waits timeout amount of time for a path to exist from frame_a to frame_b in the - buffer. If false, this will return immediately if a path does not exist even if timeout is not - None. Note that wait_for_transform can be used to wait indefinitely for a transform to become - available. + timeout_sec: The time to wait for the transform to become available if the requested time is beyond + the most recent transform in the buffer. If set to 0, it will not wait. If left at None, it will + wait indefinitely. + wait_for_frames: If true, it will wait for a path to exist from frame_a to frame_b in the + buffer. If false, lookup will fail immediately if a path between frames does not exist, + regardless of what timeout was set. Note that wait_for_a_tform_b can also be used to + wait for a transform to become available. Returns: The timestamp from the latest recorded transform frame_a_t_frame_b Raises: All the possible TransformExceptions. """ - transform = self._internal_lookup_a_tform_b(frame_a, frame_b, None, timeout, wait_for_frames) + transform = self.lookup_a_tform_b(frame_a, frame_b, Time(), timeout_sec, wait_for_frames) return Time.from_msg(transform.header.stamp) diff --git a/bdai_ros2_wrappers/test/test_executors.py b/bdai_ros2_wrappers/test/test_executors.py index edf35f8..2033447 100644 --- a/bdai_ros2_wrappers/test/test_executors.py +++ b/bdai_ros2_wrappers/test/test_executors.py @@ -203,5 +203,5 @@ def deferred() -> bool: with pytest.raises(RuntimeError): executor.spin_until_future_complete(future) - assert wait_for_future(future, timeout_sec=10.0) + assert wait_for_future(future, timeout_sec=10.0, context=ros_context) assert future.result() diff --git a/bdai_ros2_wrappers/test/test_tf_listener_wrapper.py b/bdai_ros2_wrappers/test/test_tf_listener_wrapper.py index 282dcb4..3a6e6a9 100644 --- a/bdai_ros2_wrappers/test/test_tf_listener_wrapper.py +++ b/bdai_ros2_wrappers/test/test_tf_listener_wrapper.py @@ -9,7 +9,7 @@ from tf2_ros import ExtrapolationException, LookupException, TransformBroadcaster from bdai_ros2_wrappers.node import Node -from bdai_ros2_wrappers.process import ROSAwareScope +from bdai_ros2_wrappers.scope import ROSAwareScope from bdai_ros2_wrappers.tf_listener_wrapper import TFListenerWrapper ROBOT = "test_robot" @@ -75,7 +75,7 @@ def test_non_existant_transform_timeout( timestamp = ros.node.get_clock().now() start = time.time() with pytest.raises(LookupException): - tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp, timeout=20.0) + tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp, timeout_sec=20.0) assert time.time() - start < 10.0 @@ -101,7 +101,7 @@ def test_future_transform_extrapolation_exception( time.sleep(0.2) timestamp = ros.node.get_clock().now() with pytest.raises(ExtrapolationException): - tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp) + tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp, timeout_sec=0.0) def test_future_transform_insufficient_wait( @@ -127,7 +127,7 @@ def delayed_publish() -> None: time.sleep(0.2) timestamp = ros.node.get_clock().now() + Duration(seconds=delay) with pytest.raises(ExtrapolationException): - tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp, timeout=0.5) + tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp, timeout_sec=0.5) def test_future_transform_wait(ros: ROSAwareScope, tf_pair: Tuple[MockTfPublisherNode, TFListenerWrapper]) -> None: @@ -148,5 +148,26 @@ def delayed_publish() -> None: ros.executor.create_task(delayed_publish) timestamp += Duration(seconds=delay) - t = tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp, wait_for_frames=True, timeout=2) + t = tf_listener.lookup_a_tform_b(FRAME_ID, CHILD_FRAME_ID, timestamp, timeout_sec=2.0, wait_for_frames=True) assert equal_transform(t.transform, trans) + + +def test_future_timestamp(ros: ROSAwareScope, tf_pair: Tuple[MockTfPublisherNode, TFListenerWrapper]) -> None: + tf_publisher, tf_listener = tf_pair + timestamp = tf_publisher.get_clock().now() + + with pytest.raises(LookupException): + tf_listener.lookup_latest_timestamp(FRAME_ID, CHILD_FRAME_ID) + + def delayed_publish() -> None: + time.sleep(1.0) + trans = Transform(rotation=Quaternion(w=1.0)) + tf_publisher.publish_transform(trans, timestamp) + + assert ros.executor is not None + ros.executor.create_task(delayed_publish) + + latest_timestamp = tf_listener.lookup_latest_timestamp( + FRAME_ID, CHILD_FRAME_ID, timeout_sec=2.0, wait_for_frames=True + ) + assert latest_timestamp == timestamp