Skip to content

Commit

Permalink
Add unwrap_future function
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <[email protected]>
  • Loading branch information
mhidalgo-bdai committed Jun 10, 2024
1 parent aa9e19c commit bdb93d9
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion bdai_ros2_wrappers/bdai_ros2_wrappers/futures.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved.
from threading import Event
from typing import Awaitable, Callable, Optional, Protocol, TypeVar, Union, runtime_checkable
from typing import Any, Awaitable, Callable, Optional, Protocol, TypeVar, Union, runtime_checkable

from rclpy.context import Context
from rclpy.utilities import get_default_context
Expand Down Expand Up @@ -84,3 +84,21 @@ def wait_for_future(
event.set()
event.wait(timeout=timeout_sec)
return proper_future.done()


def unwrap_future(
future: AnyFuture,
timeout_sec: Optional[float] = None,
*,
context: Optional[Context] = None,
) -> Any:
"""Fetch future result when it is done.
Note this function may block and may raise if the future does or it times out
waiting for it. See wait_for_future() documentation for further reference on
arguments taken.
"""
proper_future = as_proper_future(future)
if not wait_for_future(proper_future, timeout_sec, context=context):
raise ValueError("cannot unwrap future that is not done")
return proper_future.result()

0 comments on commit bdb93d9

Please sign in to comment.