Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework tf listener wrapper #35

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def send_goal_async_handle(
Returns:
ActionHandle: An object to manage the asynchronous lifecycle of the action request
"""
handle = ActionHandle(action_name=action_name, logger=self._node.get_logger())
handle = ActionHandle(action_name=action_name, logger=self._node.get_logger(), context=self._node.context)
if result_callback is not None:
handle.set_result_callback(result_callback)

Expand Down
8 changes: 7 additions & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

from action_msgs.msg import GoalStatus
from rclpy.action.client import ClientGoalHandle
from rclpy.context import Context
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.task import Future
from rclpy.utilities import get_default_context

from bdai_ros2_wrappers.type_hints import Action

Expand All @@ -15,20 +17,24 @@ class ActionHandle(object):
as holding the various callbacks for sending an ActionGoal (cancel, failure, feedback, result)
"""

def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None):
def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None):
"""Constructor

Args:
action_name (str): The name of the action (for logging purposes)
logger (Optional[RcutilsLogger]): An optional logger to use. If none is provided, one is created
"""
if context is None:
context = get_default_context()
self._action_name = action_name
self._send_goal_future: Optional[Future] = None
self._goal_handle: Optional[ClientGoalHandle] = None
self._wait_for_acceptance_event: Event = Event()
context.on_shutdown(self._wait_for_acceptance_event.set)
self._get_result_future: Optional[Future] = None
self._feedback_callback: Optional[Callable[[Action.Feedback], None]] = None
self._wait_for_result_event: Event = Event()
context.on_shutdown(self._wait_for_result_event.set)
self._result_callback: Optional[Callable[[Action.Result], None]] = None
self._on_failure_callback: Optional[Callable] = None
self._cancel_future: Optional[Future] = None
Expand Down
20 changes: 11 additions & 9 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
from threading import Event
from typing import Any, Optional
from typing import Optional

from rclpy.context import Context
from rclpy.task import Future
from rclpy.utilities import get_default_context


def wait_for_future(future: Future, timeout_sec: Optional[float] = None) -> bool:
def wait_for_future(future: Future, timeout_sec: Optional[float] = None, *, context: Optional[Context] = None) -> bool:
"""Blocks while waiting for a future to become done

Args:
future (Future): The future to be waited on
timeout_sec (Optional[float]): An optional timeout for how long to wait
context (Optional[Context]): Current context (will use the default if none is given)

Returns:
bool: True if successful, False if the timeout was triggered
"""
if context is None:
context = get_default_context()
event = Event()

def done_callback(_: Any) -> None:
nonlocal event
event.set()

future.add_done_callback(done_callback)
return event.wait(timeout=timeout_sec)
context.on_shutdown(event.set)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the context has multiple events set and shuts down? Is there a possibility where two events can be waiting on each other and deadlock?

Copy link
Contributor Author

@mhidalgo-bdai mhidalgo-bdai Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the context has multiple events, all of them will be set. Events do not wait on each other. Callees wait on events. Callees could run into deadlock if, for example, they have to complete each other futures or you somehow manage to wait on a future within an on context shutdown handler (which I'm not even sure if the Context API will allow).

So I would say deadlocks are a non-issue here inasmuch as it won't happen by accident unless code is flawed.

future.add_done_callback(lambda _: event.set())
event.wait(timeout=timeout_sec)
return future.done()
7 changes: 6 additions & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/service_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import threading
from typing import Callable, Optional

from rclpy.context import Context
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.task import Future
from rclpy.utilities import get_default_context

from bdai_ros2_wrappers.type_hints import SrvTypeResponse

Expand All @@ -13,10 +15,13 @@ class ServiceHandle(object):
as holding the various callbacks for sending an ServiceRequest (result, failure)
"""

def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None):
def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None):
if context is None:
context = get_default_context()
self._service_name = service_name
self._send_service_future: Optional[Future] = None
self._future_ready_event = threading.Event()
context.on_shutdown(self._future_ready_event.set)
self._result_callback: Optional[Callable] = None
self._on_failure_callback: Optional[Callable] = None
self._result: Optional[SrvTypeResponse] = None
Expand Down
19 changes: 13 additions & 6 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/subscription.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
import threading
import typing

import rclpy.node
import rclpy.qos
import rclpy.task

import bdai_ros2_wrappers.scope as scope
from bdai_ros2_wrappers.futures import wait_for_future

MessageT = typing.TypeVar("MessageT")

Expand Down Expand Up @@ -49,7 +49,12 @@ def callback(msg: MessageT) -> None:


def wait_for_message(
msg_type: MessageT, topic_name: str, timeout_sec: typing.Optional[float] = None, **kwargs: typing.Any
msg_type: MessageT,
topic_name: str,
timeout_sec: typing.Optional[float] = None,
*,
node: typing.Optional[rclpy.node.Node] = None,
**kwargs: typing.Any,
) -> typing.Optional[MessageT]:
"""
Wait for message on a given topic synchronously.
Expand All @@ -65,9 +70,11 @@ def wait_for_message(
Returns:
The message received, or None on timeout.
"""
event = threading.Event()
future = wait_for_message_async(msg_type, topic_name, **kwargs)
future.add_done_callback(lambda future: event.set())
if not event.wait(timeout_sec):
node = node or scope.node()
if node is None:
raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)")
future = wait_for_message_async(msg_type, topic_name, node=node, **kwargs)
if not wait_for_future(future, timeout_sec, context=node.context):
future.cancel()
return None
return future.result()
166 changes: 66 additions & 100 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/tf_listener_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,120 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
import time
from typing import Optional, Tuple

import rclpy
from typing import Optional

from geometry_msgs.msg import TransformStamped
from rclpy.clock import ClockType
from rclpy.node import Node
from rclpy.task import Future
from rclpy.time import Duration, Time
from tf2_ros import ExtrapolationException, TransformException
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener

import bdai_ros2_wrappers.scope as scope
from bdai_ros2_wrappers.futures import wait_for_future


class TFListenerWrapper(object):
def __init__(
self,
node: Optional[Node] = None,
wait_for_transform: Optional[Tuple[str, str]] = None,
cache_time_s: Optional[int] = None,
) -> None:
class TFListenerWrapper:
def __init__(self, node: Optional[Node] = None, cache_time_s: Optional[float] = None) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happeneed to the wait for init part? I do know people were using that...

Copy link
Contributor Author

@mhidalgo-bdai mhidalgo-bdai Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, quite a bit in fact (and not always correctly e.g. here we pass three frames). It is a problematic pattern, though. When the wrapper is bound to a node that is spinning, all is fine with waiting on a constructor. When the wrapper gets instantiated from within the constructor of the node it is bound to, it hangs, because the node can't be put to spin until it's fully constructed.

Eliminating that pattern was the original motivation behind this PR. You can still wait, but you have do to so explicitly after construction. Migrated code already accounts for this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Could we add a comment here explaining how to do that? The reason we added this was because people were constantly getting exceptions thrown from trying to use the listener too soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in fc71daa. I added documentation, including basic examples, and it will also now generate warnings if you are about to shoot yourself in the foot. PTAL!

node = node or scope.node()
if node is None:
raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)")
self._node = node

cache_time_py = None
if cache_time_s is not None:
cache_time_py = Duration(seconds=cache_time_s)
else:
cache_time_py = None
cache_time_py = Duration(nanoseconds=cache_time_s * 1e9)
self._tf_buffer = Buffer(cache_time=cache_time_py)
self._tf_listener = TransformListener(self._tf_buffer, self._node, spin_thread=False)

if wait_for_transform is not None and len(wait_for_transform) == 2:
self.wait_for_init(wait_for_transform[0], wait_for_transform[1])
self._tf_listener = TransformListener(self._tf_buffer, self._node)
amessing-bdai marked this conversation as resolved.
Show resolved Hide resolved

@property
def buffer(self) -> Buffer:
return self._tf_buffer

def wait_for_init(self, from_frame: str, to_frame: str) -> None:
"""
Waits for transform from from_frame to to_frame to become available and prints initializing statements.
"""
# Wait for the buffer to fill to avoid annoying warnings.
self._node.get_logger().info(
"Waiting for TF to contain transform from " + str(from_frame) + " to " + str(to_frame)
)
self.wait_for_transform(from_frame, to_frame)
self._node.get_logger().info("TF initialized")
def shutdown(self) -> None:
self._tf_listener.unregister()

def wait_for_a_tform_b_async(self, frame_a: str, frame_b: str, transform_time: Optional[Time] = None) -> Future:
if transform_time is None:
transform_time = Time()
return self._tf_buffer.wait_for_transform_async(frame_a, frame_b, transform_time)

def wait_for_transform(self, from_frame: str, to_frame: str) -> None:
def wait_for_a_tform_b(
self, frame_a: str, frame_b: str, transform_time: Optional[Time] = None, timeout_sec: Optional[float] = None
) -> bool:
"""
Wait for the transform from from_frame to to_frame to become available.
"""
# We need a while instead of the usual wait because we use a different node and a thread for our listener
while rclpy.ok(context=self._node.context):
try:
self.lookup_a_tform_b(from_frame, to_frame)
break
except TransformException:
time.sleep(0.1)
future = self.wait_for_a_tform_b_async(frame_a, frame_b, transform_time)
if not wait_for_future(future, timeout_sec, context=self._node.context):
future.cancel()
return False
return True

def _internal_lookup_a_tform_b(
def lookup_a_tform_b(
self,
frame_a: str,
frame_b: str,
transform_time: Optional[float] = None,
timeout: Optional[float] = None,
transform_time: Optional[Time] = None,
timeout_sec: Optional[float] = None,
wait_for_frames: bool = False,
) -> TransformStamped:
"""
Parameters:
frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b
frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b
transform_time: The time at which to look up the transform. If left at None, will be the most recent
transform available
timeout: The time to wait for the transform to become available if the transform_time is beyond the most
recent transform in the buffer.
wait_for_frames: If true, waits timeout amount of time for a path to exist from frame_a to frame_b in the
buffer. If false, this will return immediately if a path does not exist even if timeout is not
None. Note that wait_for_transform can be used to wait indefinitely for a transform to become
available.
frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b
frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b
transform_time: The time at which to look up the transform. If left at None, the most
recent transform available will used.
timeout_sec: The time to wait for the transform to become available if the requested time is beyond
the most recent transform in the buffer. If set to 0, it will not wait. If left at None, it will
wait indefinitely.
wait_for_frames: If true, it will wait for a path to exist from frame_a to frame_b in the
buffer. If false, lookup will fail immediately if a path between frames does not exist,
regardless of what timeout was set. Note that wait_for_a_tform_b can also be used to
wait for a transform to become available.
Returns:
The transform frame_a_t_frame_b at the time specified.
Raises:
All the possible TransformExceptions.
"""
if not wait_for_frames:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic now looks correct to me

try:
latest_time = self._tf_buffer.get_latest_common_time(frame_a, frame_b)
# tf2 Python bindings yield system clock timestamps rather than ROS clock
# timestamps, regardless of where these timestamps came from (as there is no
# clock notion in tf2)
latest_time = Time(nanoseconds=latest_time.nanoseconds, clock_type=ClockType.ROS_TIME)
if transform_time is not None and latest_time > transform_time:
# no need to wait, either lookup can be satisfied or an extrapolation
# to the past exception will ensue
timeout_sec = 0.0
except TransformException:
# either frames do not exist or a path between them doesn't, do not wait
timeout_sec = 0.0
if transform_time is None:
transform_time = Time()
if timeout is None or not wait_for_frames:
timeout_py = Duration()
else:
timeout_py = Duration(seconds=timeout)
start_time = time.time()
while rclpy.ok(context=self._node.context):
try:
return self._tf_buffer.lookup_transform(frame_a, frame_b, time=transform_time, timeout=timeout_py)
except ExtrapolationException as e:
if "future" not in str(e) or timeout is None:
raise e # Waiting won't help with this
now = time.time()
if now - start_time > timeout:
raise e
time.sleep(0.01)

def lookup_a_tform_b(
self,
frame_a: str,
frame_b: str,
transform_time: Optional[float] = None,
timeout: Optional[float] = None,
wait_for_frames: bool = False,
) -> TransformStamped:
"""
Parameters:
frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b
frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b
transform_time: The time at which to look up the transform. If left at None, will be the most recent
transform available
timeout: The time to wait for the transform to become available if the transform_time is beyond the most
recent transform in the buffer.
wait_for_frames: If true, waits timeout amount of time for a path to exist from frame_a to frame_b in the
buffer. If false, this will return immediately if a path does not exist even if timeout is not
None. Note that wait_for_transform can be used to wait indefinitely for a transform to become
available.
Returns:
The transform frame_a_t_frame_b at the time specified.
Raises:
All the possible TransformExceptions.
"""
return self._internal_lookup_a_tform_b(frame_a, frame_b, transform_time, timeout, wait_for_frames)
if timeout_sec != 0.0:
self.wait_for_a_tform_b(frame_a, frame_b, transform_time, timeout_sec)
return self._tf_buffer.lookup_transform(frame_a, frame_b, transform_time)

def lookup_latest_timestamp(
self, frame_a: str, frame_b: str, timeout: Optional[float] = None, wait_for_frames: bool = False
self, frame_a: str, frame_b: str, timeout_sec: Optional[float] = None, wait_for_frames: bool = False
) -> Time:
"""
Parameters:
frame_a: Base frame for transform. The transform returned will be frame_a_t_frame_b
frame_b: Tip frame for transform. The transform returned will be frame_a_t_frame_b
timeout: The time to wait for the transform to become available if the transform_time is beyond the most
recent transform in the buffer.
wait_for_frames: If true, waits timeout amount of time for a path to exist from frame_a to frame_b in the
buffer. If false, this will return immediately if a path does not exist even if timeout is not
None. Note that wait_for_transform can be used to wait indefinitely for a transform to become
available.
timeout_sec: The time to wait for the transform to become available if the requested time is beyond
the most recent transform in the buffer. If set to 0, it will not wait. If left at None, it will
wait indefinitely.
wait_for_frames: If true, it will wait for a path to exist from frame_a to frame_b in the
buffer. If false, lookup will fail immediately if a path between frames does not exist,
regardless of what timeout was set. Note that wait_for_a_tform_b can also be used to
wait for a transform to become available.
Returns:
The timestamp from the latest recorded transform frame_a_t_frame_b
Raises:
All the possible TransformExceptions.
"""
transform = self._internal_lookup_a_tform_b(frame_a, frame_b, None, timeout, wait_for_frames)
transform = self.lookup_a_tform_b(frame_a, frame_b, timeout_sec, wait_for_frames)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw self._tf_buffer.get_latest_common_time(frame_a, frame_b) above. Is there a difference in the result from doing that verses this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version can wait for frames to come into existence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug on line 119 right there 👀 Fixed it and added a test (we didn't have any for this method) in 49ee392.

return Time.from_msg(transform.header.stamp)
2 changes: 1 addition & 1 deletion bdai_ros2_wrappers/test/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,5 @@ def deferred() -> bool:
with pytest.raises(RuntimeError):
executor.spin_until_future_complete(future)

assert wait_for_future(future, timeout_sec=10.0)
assert wait_for_future(future, timeout_sec=10.0, context=ros_context)
assert future.result()
Loading