From a19f505eaef2dca1e6eef27e0a3c46a09b11da56 Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Fri, 28 Oct 2022 13:29:30 +0000 Subject: [PATCH 1/2] Set multiple parameters multiple param set standardize argument errors & address pr comments revert parameter set print code address pr comments add test_verb_set Signed-off-by: Brian Chen --- ros2param/ros2param/verb/set.py | 61 +++++---- ros2param/test/test_verb_set.py | 226 ++++++++++++++++++++++++++++++++ 2 files changed, 265 insertions(+), 22 deletions(-) create mode 100644 ros2param/test/test_verb_set.py diff --git a/ros2param/ros2param/verb/set.py b/ros2param/ros2param/verb/set.py index 86c29fa06..c92e4088d 100644 --- a/ros2param/ros2param/verb/set.py +++ b/ros2param/ros2param/verb/set.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import argparse import sys from rcl_interfaces.msg import Parameter @@ -28,6 +29,17 @@ from ros2param.verb import VerbExtension +class RequireParameterPairAction(argparse.Action): + """Argparse action to validate parameter argument pairs.""" + + def __call__(self, parser, args, values, option_string=None): + if len(values) == 0: + parser.error('No parameters specified') + if len(values) % 2: + parser.error('Must provide parameter name and value pairs') + setattr(args, self.dest, values) + + class SetVerb(VerbExtension): """Set parameter.""" @@ -41,38 +53,43 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '--include-hidden-nodes', action='store_true', help='Consider hidden nodes as well') arg = parser.add_argument( - 'parameter_name', help='Name of the parameter') + 'parameters', nargs='*', + action=RequireParameterPairAction, + help='List of parameter name and value pairs i.e. "int_param 1 str_param hello_world"') arg.completer = ParameterNameCompleter() - parser.add_argument( - 'value', help='Value of the parameter') + + def build_parameters(self, params): + parameters = [] + for i in range(0, len(params), 2): + parameter = Parameter() + parameter.name = params[i] + parameter.value = get_parameter_value(string_value=params[i+1]) + parameters.append(parameter) + return parameters def main(self, *, args): # noqa: D102 with NodeStrategy(args) as node: node_names = get_node_names( node=node, include_hidden_nodes=args.include_hidden_nodes) - node_name = get_absolute_node_name(args.node_name) + if node_name not in {n.full_name for n in node_names}: return 'Node not found' with DirectNode(args) as node: - parameter = Parameter() - Parameter.name = args.parameter_name - parameter.value = get_parameter_value(string_value=args.value) - + parameters = self.build_parameters(args.parameters) response = call_set_parameters( - node=node, node_name=args.node_name, parameters=[parameter]) + node=node, node_name=args.node_name, parameters=parameters) + results = response.results - # output response - assert len(response.results) == 1 - result = response.results[0] - if result.successful: - msg = 'Set parameter successful' - if result.reason: - msg += ': ' + result.reason - print(msg) - else: - msg = 'Setting parameter failed' - if result.reason: - msg += ': ' + result.reason - print(msg, file=sys.stderr) + for i, result in enumerate(results): + if result.successful: + msg = f'Set parameter {parameters[i].name} successful' + if result.reason: + msg += ': ' + result.reason + print(msg) + else: + msg = 'Set parameter failed' + if result.reason: + msg += ': ' + result.reason + print(msg, file=sys.stderr) diff --git a/ros2param/test/test_verb_set.py b/ros2param/test/test_verb_set.py new file mode 100644 index 000000000..d08140d32 --- /dev/null +++ b/ros2param/test/test_verb_set.py @@ -0,0 +1,226 @@ +# Copyright 2022 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import os +import sys +import time +import unittest +import xmlrpc + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +from launch_ros.actions import Node +import launch_testing +import launch_testing.actions +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +import launch_testing_ros.tools + +import pytest + +import rclpy +from rclpy.utilities import get_available_rmw_implementations + +from ros2cli.node.strategy import NodeStrategy + + +TEST_NODE = 'test_node' +TEST_NAMESPACE = '/foo' +TEST_TIMEOUT = 20.0 + +# Skip cli tests on Windows while they exhibit pathological behavior +# https://github.com/ros2/build_farmer/issues/248 +if sys.platform.startswith('win'): + pytest.skip( + 'CLI tests can block for a pathological amount of time on Windows.', + allow_module_level=True) + + +@pytest.mark.rostest +@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) +def generate_test_description(rmw_implementation): + path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures') + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + + # Parameter node test fixture + path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py') + parameter_node = Node( + executable=sys.executable, + name=TEST_NODE, + namespace=TEST_NAMESPACE, + arguments=[path_to_parameter_node_script], + additional_env=additional_env + ) + + return LaunchDescription([ + # TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli + ExecuteProcess( + cmd=['ros2', 'daemon', 'stop'], + name='daemon-stop', + on_exit=[ + ExecuteProcess( + cmd=['ros2', 'daemon', 'start'], + name='daemon-start', + on_exit=[ + parameter_node, + launch_testing.actions.ReadyToTest(), + ], + additional_env=additional_env + ) + ] + ), + ]) + + +class TestVerbSet(unittest.TestCase): + + @classmethod + def setUpClass( + cls, + launch_service, + proc_info, + proc_output, + rmw_implementation + ): + rmw_implementation_filter = launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=rmw_implementation + ) + + @contextlib.contextmanager + def launch_param_set_command(self, arguments): + param_set_command_action = ExecuteProcess( + cmd=['ros2', 'param', 'set', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + }, + name='ros2param-set-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, param_set_command_action, proc_info, proc_output, + output_filter=rmw_implementation_filter + ) as param_set_command: + yield param_set_command + cls.launch_param_set_command = launch_param_set_command + + @contextlib.contextmanager + def launch_param_dump_command(self, arguments): + param_dump_command_action = ExecuteProcess( + cmd=['ros2', 'param', 'dump', *arguments], + additional_env={ + 'RMW_IMPLEMENTATION': rmw_implementation, + }, + name='ros2param-dump-cli', + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, param_dump_command_action, proc_info, proc_output, + output_filter=rmw_implementation_filter + ) as param_dump_command: + yield param_dump_command + cls.launch_param_dump_command = launch_param_dump_command + + def setUp(self): + # Ensure the daemon node is running and discovers the test node + start_time = time.time() + timed_out = True + with NodeStrategy(None) as node: + while (time.time() - start_time) < TEST_TIMEOUT: + # TODO(jacobperron): Create a generic 'CliNodeError' so we can treat errors + # from DirectNode and DaemonNode the same + try: + services = node.get_service_names_and_types_by_node(TEST_NODE, TEST_NAMESPACE) + except rclpy.node.NodeNameNonExistentError: + continue + except xmlrpc.client.Fault as e: + if 'NodeNameNonExistentError' in e.faultString: + continue + raise + + service_names = [name_type_tuple[0] for name_type_tuple in services] + if ( + len(service_names) > 0 + and f'{TEST_NAMESPACE}/{TEST_NODE}/get_parameters' in service_names + ): + timed_out = False + break + if timed_out: + self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds') + + def test_verb_set_single(self): + with self.launch_param_set_command( + arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', 'int_param', '1'] + ) as param_load_command: + assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT) + assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=['Set parameter int_param successful'], + text=param_load_command.output, + strict=True) + + def test_verb_set_multiple(self): + with self.launch_param_set_command( + arguments=[ + f'{TEST_NAMESPACE}/{TEST_NODE}', + 'int_param', '1', + 'str_param', 'hello world', + 'bool_param', 'True', + ] + ) as param_load_command: + assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT) + assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'Set parameter int_param successful', + 'Set parameter str_param successful', + 'Set parameter bool_param successful'], + text=param_load_command.output, + strict=True) + + def test_verb_set_odd_params(self): + with self.launch_param_set_command( + arguments=[ + f'{TEST_NAMESPACE}/{TEST_NODE}', + 'int_param', '1', + 'str_param', 'hello world', + 'bool_param', + ] + ) as param_load_command: + assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT) + assert param_load_command.exit_code == 2 + assert launch_testing.tools.expect_output( + expected_lines=['ros2 param set: error: Must provide parameter name and value pairs'], + text=param_load_command.output, + strict=False + ) + + def test_verb_set_invalid_node(self): + with self.launch_param_set_command( + arguments=[ + f'{TEST_NAMESPACE}/{TEST_NODE}_invalid', + 'int_param', '123', + ] + ) as param_load_command: + assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT) + assert param_load_command.exit_code == 1 + assert launch_testing.tools.expect_output( + expected_lines=['Node not found'], + text=param_load_command.output, + strict=True) + + +if __name__ == '__main__': + unittest.main() From 21caf351da3ecc59e28cd658ca2a28a297a86bc2 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Fri, 28 Oct 2022 13:45:35 +0000 Subject: [PATCH 2/2] Small fixes. To support Python 3.6 (no fstrings), and also to slightly clean up the generation of output. Signed-off-by: Chris Lalancette --- ros2param/ros2param/verb/set.py | 5 +++-- ros2param/test/test_verb_set.py | 12 ++++-------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/ros2param/ros2param/verb/set.py b/ros2param/ros2param/verb/set.py index c92e4088d..d3f9bc066 100644 --- a/ros2param/ros2param/verb/set.py +++ b/ros2param/ros2param/verb/set.py @@ -83,13 +83,14 @@ def main(self, *, args): # noqa: D102 results = response.results for i, result in enumerate(results): + msg = "Set parameter '%s' " % (parameters[i].name) if result.successful: - msg = f'Set parameter {parameters[i].name} successful' + msg += 'successful' if result.reason: msg += ': ' + result.reason print(msg) else: - msg = 'Set parameter failed' + msg += 'failed' if result.reason: msg += ': ' + result.reason print(msg, file=sys.stderr) diff --git a/ros2param/test/test_verb_set.py b/ros2param/test/test_verb_set.py index d08140d32..4a7320752 100644 --- a/ros2param/test/test_verb_set.py +++ b/ros2param/test/test_verb_set.py @@ -167,7 +167,7 @@ def test_verb_set_single(self): assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT) assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( - expected_lines=['Set parameter int_param successful'], + expected_lines=["Set parameter 'int_param' successful"], text=param_load_command.output, strict=True) @@ -184,9 +184,9 @@ def test_verb_set_multiple(self): assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK assert launch_testing.tools.expect_output( expected_lines=[ - 'Set parameter int_param successful', - 'Set parameter str_param successful', - 'Set parameter bool_param successful'], + "Set parameter 'int_param' successful", + "Set parameter 'str_param' successful", + "Set parameter 'bool_param' successful"], text=param_load_command.output, strict=True) @@ -220,7 +220,3 @@ def test_verb_set_invalid_node(self): expected_lines=['Node not found'], text=param_load_command.output, strict=True) - - -if __name__ == '__main__': - unittest.main()