Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-1104] Move some internal custom launch actions into ros_utilities #102

Merged
merged 6 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved.
88 changes: 88 additions & 0 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved.
from __future__ import annotations

from enum import Enum
from typing import Any, Final, List, Type

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.utilities.type_utils import coerce_to_type

_BOOLEAN_STR_CHOICES: Final[List[str]] = ["true", "True", "false", "False"]
_BOOLEAN_CHOICES: Final[List[str | bool]] = [*_BOOLEAN_STR_CHOICES, True, False]
_OPTIONAL_CHOICES: Final[List[str]] = [""]


def convert_to_bool(param_name: str, val: str) -> bool:
"""Converts a ros parameter to a bool"""
try:
return coerce_to_type(val.lower(), bool)
except ValueError:
print(f"Cannot convert `{param_name}` to bool (value is {val})")
raise


def update_sigterm_sigkill_timeout(
ld: LaunchDescription,
*,
sigterm_timeout_s: float = 60,
sigkill_timeout_s: float = 60,
) -> None:
"""Increases the timeout for launch to escalate to SIGTERM and SIGKILL after you CTRL+C"""
ld.add_action(DeclareLaunchArgument("sigterm_timeout", default_value=str(sigterm_timeout_s)))
ld.add_action(DeclareLaunchArgument("sigkill_timeout", default_value=str(sigkill_timeout_s)))


class DeclareBooleanLaunchArgument(DeclareLaunchArgument):
"""Thin wrapper on `DeclareLaunchArgument` to restrict the choices to boolean"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
if "choices" in kwargs:
raise KeyError("Cannot set `choices` for `DeclareBooleanLaunchArgument`")
if "default_value" in kwargs:
default_value = kwargs["default_value"]
if default_value not in _BOOLEAN_CHOICES:
raise ValueError(f"`default_value` must be from {_BOOLEAN_CHOICES}")
if isinstance(default_value, bool):
kwargs["default_value"] = "true" if default_value else "false"

super().__init__(*args, choices=_BOOLEAN_STR_CHOICES, **kwargs)


class DeclareEnumLaunchArgument(DeclareLaunchArgument):
"""Thin wrapper on `DeclareLaunchArgument` to restrict the choices to the values of an enum"""

def __init__(self, enum_type: Type[Enum], *args: Any, optional: bool = False, **kwargs: Any) -> None:
choices = [str(e.value) for e in enum_type] + (
[] if not optional else _OPTIONAL_CHOICES
) # typing: ignore[attr-defined]
if "choices" in kwargs:
raise KeyError("Cannot set `choices` for `DeclareEnumLaunchArgument`")
if "default_value" in kwargs:
default_value = kwargs["default_value"]
if isinstance(default_value, str):
if default_value not in choices:
raise ValueError(
(
f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be"
f" from {choices} or {list(enum_type)}"
),
)
elif isinstance(default_value, enum_type):
if default_value not in enum_type:
raise ValueError(
(
f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be"
f" from {choices} or {list(enum_type)}"
),
)
kwargs["default_value"] = str(default_value.value)
else:
raise TypeError(
(
f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be of type"
f" `str` or `{enum_type.__name__}`"
),
)

super().__init__(*args, choices=choices, **kwargs)
45 changes: 45 additions & 0 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved.
from __future__ import annotations

from typing import Literal

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

from bdai_ros2_wrappers.launch.actions import DeclareBooleanLaunchArgument

_ROBOT_NAME: Literal["robot_name"] = "robot_name"
_VERBOSE: Literal["verbose"] = "verbose"


def add_robot_name_argument(ld: LaunchDescription) -> LaunchConfiguration:
"""Adds a launch argument for `robot_name` to the `LaunchDescription`

Args:
ld: The launch description instance

Returns:
A launch configuration that can be used to parse the command line argument for `robot_name`
"""
ld.add_action(
DeclareLaunchArgument(
_ROBOT_NAME,
description="Name of the robot.",
default_value="",
),
)
return LaunchConfiguration(_ROBOT_NAME)


def add_verbose_argument(ld: LaunchDescription) -> LaunchConfiguration:
"""Adds a launch argument for `verbose` to the `LaunchDescription`

Args:
ld: The launch description instance

Returns:
A launch configuration that can be used to parse the command line argument for `verbose`
"""
ld.add_action(DeclareBooleanLaunchArgument(_VERBOSE, description="Run with debug logging on.", default_value=False))
return LaunchConfiguration(_VERBOSE)
27 changes: 27 additions & 0 deletions bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved.
from typing import List, Optional

from launch import Condition, SomeSubstitutionsType
from launch.conditions import UnlessCondition
from launch.substitutions import OrSubstitution


# TODO: when/if we move to rolling we should use the `AnySubstitution`
def not_any_substitution(conditions: List[SomeSubstitutionsType]) -> Optional[Condition]:
"""A substitution that is True if none of the conditions are substituted with True

Args:
conditions (list[SomeSubstitutionsType]): A list of substitutions
Returns:
Optional[Condition]: A substitution that is True if none of the conditions are substituted with True, if less
than 2 conditions are provided returns none
"""
if len(conditions) < 2:
print("Bad number of conditions in not_any_substitution.")
return None

substitution = OrSubstitution(conditions[0], conditions[1])
for cond in conditions[2:]:
substitution = OrSubstitution(substitution, cond)

return UnlessCondition(substitution)
88 changes: 88 additions & 0 deletions bdai_ros2_wrappers/test/launch/test_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved.
from enum import Enum

import pytest

from bdai_ros2_wrappers.launch.actions import DeclareBooleanLaunchArgument, DeclareEnumLaunchArgument


def test_declare_boolean_launch_argument_default_value_false() -> None:
arg = DeclareBooleanLaunchArgument("arg", default_value="false")
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "false"


def test_declare_boolean_launch_argument_default_value_False() -> None:
arg = DeclareBooleanLaunchArgument("arg", default_value="False")
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "False"


def test_declare_boolean_launch_argument_default_value_bool_false() -> None:
arg = DeclareBooleanLaunchArgument("arg", default_value=False)
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "false"


def test_declare_boolean_launch_argument_default_value_true() -> None:
arg = DeclareBooleanLaunchArgument("arg", default_value="true")
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "true"


def test_declare_boolean_launch_argument_default_value_True() -> None:
arg = DeclareBooleanLaunchArgument("arg", default_value="True")
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "True"


def test_declare_boolean_launch_argument_default_value_bool_true() -> None:
arg = DeclareBooleanLaunchArgument("arg", default_value=True)
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "true"


def test_declare_boolean_launch_argument_set_choices() -> None:
with pytest.raises(KeyError):
_ = DeclareBooleanLaunchArgument("arg", choices=["some", "choices"])


def test_declare_boolean_launch_argument_default_value_invalid() -> None:
with pytest.raises(ValueError):
_ = DeclareBooleanLaunchArgument("arg", default_value="not true or false")


class MockStrEnum(str, Enum):
A = "A"
B = "B"
C = "C"


def test_declare_enum_launch_argument_str_enum_str_default_value() -> None:
arg = DeclareEnumLaunchArgument(MockStrEnum, "arg", default_value="A")
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "A"


def test_declare_enum_launch_argument_str_enum_enum_default_value() -> None:
arg = DeclareEnumLaunchArgument(MockStrEnum, "arg", default_value=MockStrEnum.A)
assert arg.default_value is not None
assert len(arg.default_value) == 1
assert arg.default_value[0].text == "A"


def test_declare_enum_launch_argument_set_choices() -> None:
with pytest.raises(KeyError):
_ = DeclareEnumLaunchArgument(MockStrEnum, "arg", choices=["some", "choices"])


def test_declare_enum_launch_argument_invalid_default() -> None:
with pytest.raises(ValueError):
_ = DeclareEnumLaunchArgument(MockStrEnum, "arg", default_value="D")
Loading