-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Michel Hidalgo <[email protected]>
- Loading branch information
1 parent
872a19c
commit cf0cb23
Showing
2 changed files
with
50 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved. | ||
import threading | ||
from typing import Optional | ||
|
||
from rclpy.context import Context | ||
from rclpy.utilities import get_default_context | ||
|
||
|
||
def wait_for_shutdown(*, timeout_sec: Optional[float] = None, context: Optional[Context] = None) -> bool: | ||
""" | ||
Wait for context shutdown. | ||
Args: | ||
timeout_sec: optional timeout for wait, wait indefinitely by default. | ||
context: context to wait on, use default context by default. | ||
Returns: | ||
True if shutdown, False on timeout. | ||
""" | ||
if context is None: | ||
context = get_default_context() | ||
event = threading.Event() | ||
context.on_shutdown(event.set) | ||
return event.wait(timeout_sec) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Copyright (c) 2023 Boston Dynamics AI Institute Inc. All rights reserved. | ||
|
||
from typing import Iterable | ||
|
||
import pytest | ||
import rclpy | ||
|
||
from bdai_ros2_wrappers.context import wait_for_shutdown | ||
from bdai_ros2_wrappers.process import ROSAwareScope | ||
|
||
|
||
@pytest.fixture | ||
def ros() -> Iterable[ROSAwareScope]: | ||
rclpy.init() | ||
try: | ||
with ROSAwareScope("fixture") as scope: | ||
yield scope | ||
finally: | ||
rclpy.try_shutdown() | ||
|
||
|
||
def test_wait_for_shutdown(ros: ROSAwareScope) -> None: | ||
"""Asserts that wait for shutdown works as expected.""" | ||
assert not wait_for_shutdown(timeout_sec=1.0) | ||
ros.executor.create_task(lambda: rclpy.shutdown()) | ||
assert wait_for_shutdown(timeout_sec=10.0) |