Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework tf listener wrapper #35

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def send_goal_async_handle(
Returns:
ActionHandle: An object to manage the asynchronous lifecycle of the action request
"""
handle = ActionHandle(action_name=action_name, logger=self._node.get_logger())
handle = ActionHandle(action_name=action_name, logger=self._node.get_logger(), context=self._node.context)
if result_callback is not None:
handle.set_result_callback(result_callback)

Expand Down
8 changes: 7 additions & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/action_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

from action_msgs.msg import GoalStatus
from rclpy.action.client import ClientGoalHandle
from rclpy.context import Context
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.task import Future
from rclpy.utilities import get_default_context

from bdai_ros2_wrappers.type_hints import Action

Expand All @@ -15,20 +17,24 @@ class ActionHandle(object):
as holding the various callbacks for sending an ActionGoal (cancel, failure, feedback, result)
"""

def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None):
def __init__(self, action_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None):
"""Constructor

Args:
action_name (str): The name of the action (for logging purposes)
logger (Optional[RcutilsLogger]): An optional logger to use. If none is provided, one is created
"""
if context is None:
context = get_default_context()
self._action_name = action_name
self._send_goal_future: Optional[Future] = None
self._goal_handle: Optional[ClientGoalHandle] = None
self._wait_for_acceptance_event: Event = Event()
context.on_shutdown(self._wait_for_acceptance_event.set)
self._get_result_future: Optional[Future] = None
self._feedback_callback: Optional[Callable[[Action.Feedback], None]] = None
self._wait_for_result_event: Event = Event()
context.on_shutdown(self._wait_for_result_event.set)
self._result_callback: Optional[Callable[[Action.Result], None]] = None
self._on_failure_callback: Optional[Callable] = None
self._cancel_future: Optional[Future] = None
Expand Down
20 changes: 11 additions & 9 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
from threading import Event
from typing import Any, Optional
from typing import Optional

from rclpy.context import Context
from rclpy.task import Future
from rclpy.utilities import get_default_context


def wait_for_future(future: Future, timeout_sec: Optional[float] = None) -> bool:
def wait_for_future(future: Future, timeout_sec: Optional[float] = None, *, context: Optional[Context] = None) -> bool:
"""Blocks while waiting for a future to become done

Args:
future (Future): The future to be waited on
timeout_sec (Optional[float]): An optional timeout for how long to wait
context (Optional[Context]): Current context (will use the default if none is given)

Returns:
bool: True if successful, False if the timeout was triggered
"""
if context is None:
context = get_default_context()
event = Event()

def done_callback(_: Any) -> None:
nonlocal event
event.set()

future.add_done_callback(done_callback)
return event.wait(timeout=timeout_sec)
context.on_shutdown(event.set)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the context has multiple events set and shuts down? Is there a possibility where two events can be waiting on each other and deadlock?

Copy link
Contributor Author

@mhidalgo-bdai mhidalgo-bdai Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the context has multiple events, all of them will be set. Events do not wait on each other. Callees wait on events. Callees could run into deadlock if, for example, they have to complete each other futures or you somehow manage to wait on a future within an on context shutdown handler (which I'm not even sure if the Context API will allow).

So I would say deadlocks are a non-issue here inasmuch as it won't happen by accident unless code is flawed.

future.add_done_callback(lambda _: event.set())
event.wait(timeout=timeout_sec)
return future.done()
7 changes: 6 additions & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/service_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import threading
from typing import Callable, Optional

from rclpy.context import Context
from rclpy.impl.rcutils_logger import RcutilsLogger
from rclpy.task import Future
from rclpy.utilities import get_default_context

from bdai_ros2_wrappers.type_hints import SrvTypeResponse

Expand All @@ -13,10 +15,13 @@ class ServiceHandle(object):
as holding the various callbacks for sending an ServiceRequest (result, failure)
"""

def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None):
def __init__(self, service_name: str, logger: Optional[RcutilsLogger] = None, context: Optional[Context] = None):
if context is None:
context = get_default_context()
self._service_name = service_name
self._send_service_future: Optional[Future] = None
self._future_ready_event = threading.Event()
context.on_shutdown(self._future_ready_event.set)
self._result_callback: Optional[Callable] = None
self._on_failure_callback: Optional[Callable] = None
self._result: Optional[SrvTypeResponse] = None
Expand Down
19 changes: 13 additions & 6 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/subscription.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
import threading
import typing

import rclpy.node
import rclpy.qos
import rclpy.task

import bdai_ros2_wrappers.scope as scope
from bdai_ros2_wrappers.futures import wait_for_future

MessageT = typing.TypeVar("MessageT")

Expand Down Expand Up @@ -49,7 +49,12 @@ def callback(msg: MessageT) -> None:


def wait_for_message(
msg_type: MessageT, topic_name: str, timeout_sec: typing.Optional[float] = None, **kwargs: typing.Any
msg_type: MessageT,
topic_name: str,
timeout_sec: typing.Optional[float] = None,
*,
node: typing.Optional[rclpy.node.Node] = None,
**kwargs: typing.Any,
) -> typing.Optional[MessageT]:
"""
Wait for message on a given topic synchronously.
Expand All @@ -65,9 +70,11 @@ def wait_for_message(
Returns:
The message received, or None on timeout.
"""
event = threading.Event()
future = wait_for_message_async(msg_type, topic_name, **kwargs)
future.add_done_callback(lambda future: event.set())
if not event.wait(timeout_sec):
node = node or scope.node()
if node is None:
raise ValueError("no ROS 2 node available (did you use bdai_ros2_wrapper.process.main?)")
future = wait_for_message_async(msg_type, topic_name, node=node, **kwargs)
if not wait_for_future(future, timeout_sec, context=node.context):
future.cancel()
return None
return future.result()
Loading