From 55c2e7f7a57b331266f43f2080cc8c8c20c39e9f Mon Sep 17 00:00:00 2001 From: Katie Hughes Date: Tue, 4 Jun 2024 17:14:50 -0400 Subject: [PATCH 1/6] Move launch actions to ros_utilities --- .../bdai_ros2_wrappers/launch/__init__.py | 0 .../bdai_ros2_wrappers/launch/actions.py | 79 +++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py create mode 100644 bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py new file mode 100644 index 0000000..2e3d911 --- /dev/null +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py @@ -0,0 +1,79 @@ +# Copyright (c) 2023-2024 Boston Dynamics AI Institute Inc. All rights reserved. +from __future__ import annotations + +from enum import Enum +from typing import Any, Final, Type + +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.utilities.type_utils import coerce_to_type + +_BOOLEAN_STR_CHOICES: Final[list[str]] = ["true", "True", "false", "False"] +_BOOLEAN_CHOICES: Final[list[str | bool]] = [*_BOOLEAN_STR_CHOICES, True, False] +_OPTIONAL_CHOICES: Final[list[str]] = [""] + + +def convert_to_bool(param_name: str, val: str) -> bool: + """Converts a ros argument to a bool""" + try: + return coerce_to_type(val.lower(), bool) + except ValueError: + print(f"Cannot convert `{param_name}` to bool (value is {val})") + raise + + +def update_sigterm_sigkill_timeout( + ld: LaunchDescription, *, sigterm_timeout_s: float = 60, sigkill_timeout_s: float = 60 +) -> None: + """Specify the timeout that launch takes to escalate to SIGTERM and SIGKILL after CTRL+C""" + ld.add_action(DeclareLaunchArgument("sigterm_timeout", default_value=str(sigterm_timeout_s))) + ld.add_action(DeclareLaunchArgument("sigkill_timeout", default_value=str(sigkill_timeout_s))) + + +class DeclareBooleanLaunchArgument(DeclareLaunchArgument): + """Thin wrapper on `DeclareLaunchArgument` to restrict the choices to boolean""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + if "choices" in kwargs: + raise KeyError("Cannot set `choices` for `DeclareBooleanLaunchArgument`") + if "default_value" in kwargs: + default_value = kwargs["default_value"] + if default_value not in _BOOLEAN_CHOICES: + raise ValueError(f"`default_value` must be from {_BOOLEAN_CHOICES}") + if isinstance(default_value, bool): + kwargs["default_value"] = "true" if default_value else "false" + + super().__init__(*args, choices=_BOOLEAN_STR_CHOICES, **kwargs) + + +class DeclareEnumLaunchArgument(DeclareLaunchArgument): + """Thin wrapper on `DeclareLaunchArgument` to restrict the choices to the values of an enum""" + + def __init__(self, enum_type: Type[Enum], *args: Any, optional: bool = False, **kwargs: Any) -> None: + choices = [str(e.value) for e in enum_type] + ( + [] if not optional else _OPTIONAL_CHOICES + ) # typing: ignore[attr-defined] + if "choices" in kwargs: + raise KeyError("Cannot set `choices` for `DeclareEnumLaunchArgument`") + if "default_value" in kwargs: + default_value = kwargs["default_value"] + if isinstance(default_value, str): + if default_value not in choices: + raise ValueError( + f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be from" + f" {choices} or {list(enum_type)}" + ) + elif isinstance(default_value, enum_type): + if default_value not in enum_type: + raise ValueError( + f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be from" + f" {choices} or {list(enum_type)}" + ) + kwargs["default_value"] = str(default_value.value) + else: + raise TypeError( + f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be of type" + f" `str` or `{enum_type.__name__}`" + ) + + super().__init__(*args, choices=choices, **kwargs) From c975e247bc66cba0d054928fd5d22e8bbae58aeb Mon Sep 17 00:00:00 2001 From: Katie Hughes Date: Fri, 7 Jun 2024 11:04:31 -0400 Subject: [PATCH 2/6] Copy all files from bdai_ros over --- .../bdai_ros2_wrappers/launch/__init__.py | 1 + .../bdai_ros2_wrappers/launch/actions.py | 6 +-- .../bdai_ros2_wrappers/launch/arguments.py | 45 +++++++++++++++++++ .../launch/substitutions.py | 27 +++++++++++ 4 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py create mode 100644 bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py index e69de29..022d069 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py index 2e3d911..9272112 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024 Boston Dynamics AI Institute Inc. All rights reserved. +# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. from __future__ import annotations from enum import Enum @@ -14,7 +14,7 @@ def convert_to_bool(param_name: str, val: str) -> bool: - """Converts a ros argument to a bool""" + """Converts a ros parameter to a bool""" try: return coerce_to_type(val.lower(), bool) except ValueError: @@ -25,7 +25,7 @@ def convert_to_bool(param_name: str, val: str) -> bool: def update_sigterm_sigkill_timeout( ld: LaunchDescription, *, sigterm_timeout_s: float = 60, sigkill_timeout_s: float = 60 ) -> None: - """Specify the timeout that launch takes to escalate to SIGTERM and SIGKILL after CTRL+C""" + """Increases the timeout for launch to escalate to SIGTERM and SIGKILL after you CTRL+C""" ld.add_action(DeclareLaunchArgument("sigterm_timeout", default_value=str(sigterm_timeout_s))) ld.add_action(DeclareLaunchArgument("sigkill_timeout", default_value=str(sigkill_timeout_s))) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py new file mode 100644 index 0000000..4df7986 --- /dev/null +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py @@ -0,0 +1,45 @@ +# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. +from __future__ import annotations + +from typing import Literal + +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration + +from bdai_ros2_wrappers.launch.actions import DeclareBooleanLaunchArgument + +_ROBOT_NAME: Literal["robot_name"] = "robot_name" +_VERBOSE: Literal["verbose"] = "verbose" + + +def add_robot_name_argument(ld: LaunchDescription) -> LaunchConfiguration: + """Adds a launch argument for `robot_name` to the `LaunchDescription` + + Args: + ld: The launch description instance + + Returns: + A launch configuration that can be used to parse the command line argument for `robot_name` + """ + ld.add_action( + DeclareLaunchArgument( + _ROBOT_NAME, + description="Name of the robot.", + default_value="", + ) + ) + return LaunchConfiguration(_ROBOT_NAME) + + +def add_verbose_argument(ld: LaunchDescription) -> LaunchConfiguration: + """Adds a launch argument for `verbose` to the `LaunchDescription` + + Args: + ld: The launch description instance + + Returns: + A launch configuration that can be used to parse the command line argument for `verbose` + """ + ld.add_action(DeclareBooleanLaunchArgument(_VERBOSE, description="Run with debug logging on.", default_value=False)) + return LaunchConfiguration(_VERBOSE) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py new file mode 100644 index 0000000..8c2f89c --- /dev/null +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py @@ -0,0 +1,27 @@ +# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. +from typing import Optional + +from launch import Condition, SomeSubstitutionsType +from launch.conditions import UnlessCondition +from launch.substitutions import OrSubstitution + + +# TODO: when/if we move to rolling we should use the `AnySubstitution` +def not_any_substitution(conditions: list[SomeSubstitutionsType]) -> Optional[Condition]: + """A substitution that is True if none of the conditions are substituted with True + + Args: + conditions (list[SomeSubstitutionsType]): A list of substitutions + Returns: + Optional[Condition]: A substitution that is True if none of the conditions are substituted with True, if less + than 2 conditions are provided returns none + """ + if len(conditions) < 2: + print("Bad number of conditions in not_any_substitution.") + return None + + substitution = OrSubstitution(conditions[0], conditions[1]) + for cond in conditions[2:]: + substitution = OrSubstitution(substitution, cond) + + return UnlessCondition(substitution) From ef98da545f42ab71db40bff8586caed15ade462a Mon Sep 17 00:00:00 2001 From: Katie Hughes Date: Fri, 7 Jun 2024 11:07:15 -0400 Subject: [PATCH 3/6] Copy over tests --- .../test/launch/test_actions.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 bdai_ros2_wrappers/test/launch/test_actions.py diff --git a/bdai_ros2_wrappers/test/launch/test_actions.py b/bdai_ros2_wrappers/test/launch/test_actions.py new file mode 100644 index 0000000..4737c4d --- /dev/null +++ b/bdai_ros2_wrappers/test/launch/test_actions.py @@ -0,0 +1,87 @@ +# Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. +from enum import Enum + +import pytest +from bdai_ros2_wrappers.launch.actions import DeclareBooleanLaunchArgument, DeclareEnumLaunchArgument + + +def test_declare_boolean_launch_argument_default_value_false() -> None: + arg = DeclareBooleanLaunchArgument("arg", default_value="false") + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "false" + + +def test_declare_boolean_launch_argument_default_value_False() -> None: + arg = DeclareBooleanLaunchArgument("arg", default_value="False") + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "False" + + +def test_declare_boolean_launch_argument_default_value_bool_false() -> None: + arg = DeclareBooleanLaunchArgument("arg", default_value=False) + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "false" + + +def test_declare_boolean_launch_argument_default_value_true() -> None: + arg = DeclareBooleanLaunchArgument("arg", default_value="true") + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "true" + + +def test_declare_boolean_launch_argument_default_value_True() -> None: + arg = DeclareBooleanLaunchArgument("arg", default_value="True") + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "True" + + +def test_declare_boolean_launch_argument_default_value_bool_true() -> None: + arg = DeclareBooleanLaunchArgument("arg", default_value=True) + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "true" + + +def test_declare_boolean_launch_argument_set_choices() -> None: + with pytest.raises(KeyError): + _ = DeclareBooleanLaunchArgument("arg", choices=["some", "choices"]) + + +def test_declare_boolean_launch_argument_default_value_invalid() -> None: + with pytest.raises(ValueError): + _ = DeclareBooleanLaunchArgument("arg", default_value="not true or false") + + +class MockStrEnum(str, Enum): + A = "A" + B = "B" + C = "C" + + +def test_declare_enum_launch_argument_str_enum_str_default_value() -> None: + arg = DeclareEnumLaunchArgument(MockStrEnum, "arg", default_value="A") + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "A" + + +def test_declare_enum_launch_argument_str_enum_enum_default_value() -> None: + arg = DeclareEnumLaunchArgument(MockStrEnum, "arg", default_value=MockStrEnum.A) + assert arg.default_value is not None + assert len(arg.default_value) == 1 + assert arg.default_value[0].text == "A" + + +def test_declare_enum_launch_argument_set_choices() -> None: + with pytest.raises(KeyError): + _ = DeclareEnumLaunchArgument(MockStrEnum, "arg", choices=["some", "choices"]) + + +def test_declare_enum_launch_argument_invalid_default() -> None: + with pytest.raises(ValueError): + _ = DeclareEnumLaunchArgument(MockStrEnum, "arg", default_value="D") From 237dc55beb46e5097ef22e45bf69b1494bf4bf20 Mon Sep 17 00:00:00 2001 From: Katie Hughes Date: Fri, 7 Jun 2024 11:11:34 -0400 Subject: [PATCH 4/6] Linting for mypy --- .../bdai_ros2_wrappers/launch/actions.py | 23 +++++++++++++------ .../bdai_ros2_wrappers/launch/arguments.py | 2 +- .../launch/substitutions.py | 4 ++-- .../test/launch/test_actions.py | 1 + 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py index 9272112..9c32b7b 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py @@ -23,7 +23,10 @@ def convert_to_bool(param_name: str, val: str) -> bool: def update_sigterm_sigkill_timeout( - ld: LaunchDescription, *, sigterm_timeout_s: float = 60, sigkill_timeout_s: float = 60 + ld: LaunchDescription, + *, + sigterm_timeout_s: float = 60, + sigkill_timeout_s: float = 60, ) -> None: """Increases the timeout for launch to escalate to SIGTERM and SIGKILL after you CTRL+C""" ld.add_action(DeclareLaunchArgument("sigterm_timeout", default_value=str(sigterm_timeout_s))) @@ -60,20 +63,26 @@ def __init__(self, enum_type: Type[Enum], *args: Any, optional: bool = False, ** if isinstance(default_value, str): if default_value not in choices: raise ValueError( - f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be from" - f" {choices} or {list(enum_type)}" + ( + f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be" + f" from {choices} or {list(enum_type)}" + ), ) elif isinstance(default_value, enum_type): if default_value not in enum_type: raise ValueError( - f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be from" - f" {choices} or {list(enum_type)}" + ( + f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be" + f" from {choices} or {list(enum_type)}" + ), ) kwargs["default_value"] = str(default_value.value) else: raise TypeError( - f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be of type" - f" `str` or `{enum_type.__name__}`" + ( + f"For an Enum Launch Argument of type {enum_type.__name__}, the `default_value` must be of type" + f" `str` or `{enum_type.__name__}`" + ), ) super().__init__(*args, choices=choices, **kwargs) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py index 4df7986..e028c0a 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/arguments.py @@ -27,7 +27,7 @@ def add_robot_name_argument(ld: LaunchDescription) -> LaunchConfiguration: _ROBOT_NAME, description="Name of the robot.", default_value="", - ) + ), ) return LaunchConfiguration(_ROBOT_NAME) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py index 8c2f89c..1e046fc 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/substitutions.py @@ -1,5 +1,5 @@ # Copyright (c) 2024 Boston Dynamics AI Institute LLC. All rights reserved. -from typing import Optional +from typing import List, Optional from launch import Condition, SomeSubstitutionsType from launch.conditions import UnlessCondition @@ -7,7 +7,7 @@ # TODO: when/if we move to rolling we should use the `AnySubstitution` -def not_any_substitution(conditions: list[SomeSubstitutionsType]) -> Optional[Condition]: +def not_any_substitution(conditions: List[SomeSubstitutionsType]) -> Optional[Condition]: """A substitution that is True if none of the conditions are substituted with True Args: diff --git a/bdai_ros2_wrappers/test/launch/test_actions.py b/bdai_ros2_wrappers/test/launch/test_actions.py index 4737c4d..7235e6c 100644 --- a/bdai_ros2_wrappers/test/launch/test_actions.py +++ b/bdai_ros2_wrappers/test/launch/test_actions.py @@ -2,6 +2,7 @@ from enum import Enum import pytest + from bdai_ros2_wrappers.launch.actions import DeclareBooleanLaunchArgument, DeclareEnumLaunchArgument From e630e41dafe6f9048b21c090fa1eab8ffc006274 Mon Sep 17 00:00:00 2001 From: Katie Hughes <157421702+khughes-bdai@users.noreply.github.com> Date: Fri, 7 Jun 2024 12:54:49 -0400 Subject: [PATCH 5/6] Update bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py Co-authored-by: Andrew Messing <129519955+amessing-bdai@users.noreply.github.com> --- bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py index 9c32b7b..fecd2a4 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py @@ -8,9 +8,9 @@ from launch.actions import DeclareLaunchArgument from launch.utilities.type_utils import coerce_to_type -_BOOLEAN_STR_CHOICES: Final[list[str]] = ["true", "True", "false", "False"] -_BOOLEAN_CHOICES: Final[list[str | bool]] = [*_BOOLEAN_STR_CHOICES, True, False] -_OPTIONAL_CHOICES: Final[list[str]] = [""] +_BOOLEAN_STR_CHOICES: Final[List[str]] = ["true", "True", "false", "False"] +_BOOLEAN_CHOICES: Final[List[str | bool]] = [*_BOOLEAN_STR_CHOICES, True, False] +_OPTIONAL_CHOICES: Final[List[str]] = [""] def convert_to_bool(param_name: str, val: str) -> bool: From d8dafacc15a2dd9b5536a7cce66d071c4dc587f4 Mon Sep 17 00:00:00 2001 From: Katie Hughes Date: Fri, 7 Jun 2024 12:57:18 -0400 Subject: [PATCH 6/6] Lint --- bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py index fecd2a4..7502fde 100644 --- a/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py +++ b/bdai_ros2_wrappers/bdai_ros2_wrappers/launch/actions.py @@ -2,7 +2,7 @@ from __future__ import annotations from enum import Enum -from typing import Any, Final, Type +from typing import Any, Final, List, Type from launch import LaunchDescription from launch.actions import DeclareLaunchArgument