Skip to content

Commit

Permalink
Rework tf listener wrapper (#35)
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
mhidalgo-bdai authored Oct 19, 2023
1 parent a1273c7 commit d907d62
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 122 deletions.
2 changes: 1 addition & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def send_goal_async_handle(
Returns:
ActionHandle: An object to manage the asynchronous lifecycle of the action request
"""
handle = ActionHandle(action_name=action_name, logger=self._node.get_logger())
handle = ActionHandle(action_name=action_name, logger=self._node.get_logger(), context=self._node.context)
if result_callback is not None:
handle.set_result_callback(result_callback)

Expand Down
8 changes: 7 additions & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

from action_msgs.msg import GoalStatus
from rclpy.action.client import ClientGoalHandle
from rclpy.context import Context
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.task import Future
from rclpy.utilities import get_default_context

from bdai_ros2_wrappers.type_hints import Action

Expand All @@ -15,20 +17,24 @@ class ActionHandle(object):
as holding the various callbacks for sending an ActionGoal (cancel, failure, feedback, result)
"""

def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None):
def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None):
"""Constructor
Args:
action_name (str): The name of the action (for logging purposes)
logger (Optional[RcutilsLogger]): An optional logger to use. If none is provided, one is created
"""
if context is None:
context = get_default_context()
self._action_name = action_name
self._send_goal_future: Optional[Future] = None
self._goal_handle: Optional[ClientGoalHandle] = None
self._wait_for_acceptance_event: Event = Event()
context.on_shutdown(self._wait_for_acceptance_event.set)
self._get_result_future: Optional[Future] = None
self._feedback_callback: Optional[Callable[[Action.Feedback], None]] = None
self._wait_for_result_event: Event = Event()
context.on_shutdown(self._wait_for_result_event.set)
self._result_callback: Optional[Callable[[Action.Result], None]] = None
self._on_failure_callback: Optional[Callable] = None
self._cancel_future: Optional[Future] = None
Expand Down
20 changes: 11 additions & 9 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
from threading import Event
from typing import Any, Optional
from typing import Optional

from rclpy.context import Context
from rclpy.task import Future
from rclpy.utilities import get_default_context


def wait_for_future(future: Future, timeout_sec: Optional[float] = None) -> bool:
def wait_for_future(future: Future, timeout_sec: Optional[float] = None, *, context: Optional[Context] = None) -> bool:
"""Blocks while waiting for a future to become done
Args:
future (Future): The future to be waited on
timeout_sec (Optional[float]): An optional timeout for how long to wait
context (Optional[Context]): Current context (will use the default if none is given)
Returns:
bool: True if successful, False if the timeout was triggered
"""
if context is None:
context = get_default_context()
event = Event()

def done_callback(_: Any) -> None:
nonlocal event
event.set()

future.add_done_callback(done_callback)
return event.wait(timeout=timeout_sec)
context.on_shutdown(event.set)
future.add_done_callback(lambda _: event.set())
event.wait(timeout=timeout_sec)
return future.done()
7 changes: 6 additions & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/service_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import threading
from typing import Callable, Optional

from rclpy.context import Context
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.task import Future
from rclpy.utilities import get_default_context

from bdai_ros2_wrappers.type_hints import SrvTypeResponse

Expand All @@ -13,10 +15,13 @@ class ServiceHandle(object):
as holding the various callbacks for sending an ServiceRequest (result, failure)
"""

def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None):
def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None):
if context is None:
context = get_default_context()
self._service_name = service_name
self._send_service_future: Optional[Future] = None
self._future_ready_event = threading.Event()
context.on_shutdown(self._future_ready_event.set)
self._result_callback: Optional[Callable] = None
self._on_failure_callback: Optional[Callable] = None
self._result: Optional[SrvTypeResponse] = None
Expand Down
19 changes: 13 additions & 6 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/subscription.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
import threading
import typing

import rclpy.node
import rclpy.qos
import rclpy.task

import bdai_ros2_wrappers.scope as scope
from bdai_ros2_wrappers.futures import wait_for_future

MessageT = typing.TypeVar("MessageT")

Expand Down Expand Up @@ -49,7 +49,12 @@ def callback(msg: MessageT) -> None:


def wait_for_message(
msg_type: MessageT, topic_name: str, timeout_sec: typing.Optional[float] = None, **kwargs: typing.Any
msg_type: MessageT,
topic_name: str,
timeout_sec: typing.Optional[float] = None,
*,
node: typing.Optional[rclpy.node.Node] = None,
**kwargs: typing.Any,
) -> typing.Optional[MessageT]:
"""
Wait for message on a given topic synchronously.
Expand All @@ -65,9 +70,11 @@ def wait_for_message(
Returns:
The message received, or None on timeout.
"""
event = threading.Event()
future = wait_for_message_async(msg_type, topic_name, **kwargs)
future.add_done_callback(lambda future: event.set())
if not event.wait(timeout_sec):
node = node or scope.node()
if node is None:
raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)")
future = wait_for_message_async(msg_type, topic_name, node=node, **kwargs)
if not wait_for_future(future, timeout_sec, context=node.context):
future.cancel()
return None
return future.result()
Loading

0 comments on commit d907d62

Please sign in to comment.