From 540822cc63adb2dc464efdd58907e9d52cb6873a Mon Sep 17 00:00:00 2001 From: mhidalgo-bdai <144129882+mhidalgo-bdai@users.noreply.github.com> Date: Wed, 17 Jan 2024 11:49:50 -0300 Subject: [PATCH] Adjust to proto2ros conversion code to mypy requests (#55) Signed-off-by: Michel Hidalgo --- proto2ros/proto2ros/output/python.py | 12 +++++- .../output/templates/conversions.py.jinja | 41 +++++++++---------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/proto2ros/proto2ros/output/python.py b/proto2ros/proto2ros/output/python.py index ed3310f..1834f04 100644 --- a/proto2ros/proto2ros/output/python.py +++ b/proto2ros/proto2ros/output/python.py @@ -25,12 +25,19 @@ def to_python_module_name(module_path: os.PathLike) -> str: def to_python_identifier(string: str) -> str: """Transforms an arbitrary string into a valid Python identifier.""" - return inflection.underscore(string.replace("/", "_").replace(".", "_")) + denormalized_identifier = string.replace("/", "_").replace(".", "_") + denormalized_identifier = denormalized_identifier.replace("UInt", "uint") + return inflection.underscore(denormalized_identifier) + + +def unqualify_python_identifier(identifier: str) -> str: + """Extracts a Python identifier basename.""" + return identifier.rpartition(".")[-1] def itemize_python_identifier(identifier: str, prefix: Optional[str] = None) -> str: """Derives a loop variable identifier for its iterable variable identifier.""" - _, _, basename = identifier.rpartition(".") + basename = unqualify_python_identifier(identifier) if prefix and not basename.startswith(prefix): basename = prefix + basename if not basename.endswith("item"): @@ -130,6 +137,7 @@ def dump_conversions_python_module( env.globals["to_ros_base_type"] = to_ros_base_type env.globals["itemize_python_identifier"] = itemize_python_identifier env.filters["as_python_identifier"] = to_python_identifier + env.filters["python_identifier_name"] = unqualify_python_identifier env.filters["as_ros_python_type"] = to_ros_python_type pb2_python_type_lut = build_pb2_python_type_lut(config.python_imports, config.inline_python_imports) diff --git a/proto2ros/proto2ros/output/templates/conversions.py.jinja b/proto2ros/proto2ros/output/templates/conversions.py.jinja index 50408cd..5c14038 100644 --- a/proto2ros/proto2ros/output/templates/conversions.py.jinja +++ b/proto2ros/proto2ros/output/templates/conversions.py.jinja @@ -82,23 +82,22 @@ for {{ input_item }} in {{ source }}: {#- ROS message must be deserialized for conversion. -#} if {{ source }}.type_name != "{{ spec.type | as_ros_base_type }}": raise ValueError("expected {{ spec.type | as_ros_base_type }} message for {{ spec.name }} member, got %s" % {{ source }}.type) -typed_field_message = rclpy.serialization.deserialize_message({{ source }}.value.tobytes(), {{ spec.type | as_ros_base_type | as_ros_python_type }}) -convert_{{ spec.type | as_ros_base_type | as_python_identifier }}_message_to_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto(typed_field_message, {{ destination }}) +typed_{{ source | python_identifier_name }}_message = rclpy.serialization.deserialize_message({{ source }}.value.tobytes(), {{ spec.type | as_ros_base_type | as_ros_python_type }}) +convert_{{ spec.type | as_ros_base_type | as_python_identifier }}_message_to_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto(typed_{{ source | python_identifier_name }}_message, {{ destination }}) {%- elif spec.annotations.get("type-casted") -%} {#- ROS message must be converted and packed for assignment. -#} -typed_proto_message = {{ spec.annotations["proto-type"] | as_pb2_python_type }}() -convert_{{ spec.type | as_ros_base_type | as_python_identifier }}_message_to_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto({{ source }}, typed_proto_message) -{{ destination }}.Pack(typed_proto_message) +typed_{{ source | python_identifier_name }}_message = {{ spec.annotations["proto-type"] | as_pb2_python_type }}() +convert_{{ spec.type | as_ros_base_type | as_python_identifier }}_message_to_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto({{ source }}, typed_{{ source | python_identifier_name }}_message) +{{ destination }}.Pack(typed_{{ source | python_identifier_name }}_message) {%- elif spec.annotations.get("type-casts") -%} {#- ROS message must be deserialized according to type, then converted, then packed for assignment. -#} match {{ source }}.type_name: {% for proto_type_name, ros_type_name in spec.annotations["type-casts"] -%} case "{{ ros_type_name }}": - typed_field_message = rclpy.serialization.deserialize_message( - {{ source }}.value.tobytes(), {{ ros_type_name | as_ros_python_type }}) - typed_proto_message = {{ proto_type_name | as_pb2_python_type }}() - convert_{{ ros_type_name | as_python_identifier }}_message_to_{{ proto_type_name | as_python_identifier }}_proto(typed_field_message, typed_proto_message) - {{ destination }}.Pack(typed_proto_message) + typed_{{ ros_type_name | as_python_identifier | python_identifier_name }}_message = rclpy.serialization.deserialize_message({{ source }}.value.tobytes(), {{ ros_type_name | as_ros_python_type }}) + typed_{{ proto_type_name | as_python_identifier | python_identifier_name }}_message = {{ proto_type_name | as_pb2_python_type }}() + convert_{{ ros_type_name | as_python_identifier }}_message_to_{{ proto_type_name | as_python_identifier }}_proto(typed_{{ ros_type_name | as_python_identifier | python_identifier_name }}_message, typed_{{ proto_type_name | as_python_identifier | python_identifier_name }}_message) + {{ destination }}.Pack(typed_{{ proto_type_name | as_python_identifier | python_identifier_name }}_message) {% endfor -%} case _: raise ValueError("unexpected %s in {{ spec.name }} member:" % ros_msg.{{ spec.name }}.type) @@ -186,15 +185,15 @@ for {{ input_item }} in {{ input_iterable }}: {%- set type_spec = known_message_specifications.get(to_ros_base_type(spec.type)) -%} {%- if spec.annotations.get("type-erased") -%} {#- ROS message must be serialized for assignment. -#} -typed_field_message = {{ spec.type | as_ros_base_type | as_ros_python_type }}() -convert_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto_to_{{ spec.type | as_ros_base_type | as_python_identifier }}_message({{ source }}, typed_field_message) -{{ destination }}.value = rclpy.serialization.serialize_message(typed_field_message) +typed_{{ source | python_identifier_name }}_message = {{ spec.type | as_ros_base_type | as_ros_python_type }}() +convert_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto_to_{{ spec.type | as_ros_base_type | as_python_identifier }}_message({{ source }}, typed_{{ source | python_identifier_name }}_message) +{{ destination }}.value = rclpy.serialization.serialize_message(typed_{{ source | python_identifier_name }}_message) {{ destination }}.type_name = "{{ spec.type | as_ros_base_type }}" {%- elif spec.annotations.get("type-casted") -%} {#- Protobuf message must be unpacked for conversion. -#} -typed_proto_message = {{ spec.annotations["proto-type"] | as_pb2_python_type }}() -{{ source }}.Unpack(typed_proto_message) -convert_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto_to_{{ spec.type | as_ros_base_type | as_python_identifier }}_message(typed_proto_message, {{ destination }}) +typed_{{ source | python_identifier_name }}_message = {{ spec.annotations["proto-type"] | as_pb2_python_type }}() +{{ source }}.Unpack(typed_{{ source | python_identifier_name }}_message) +convert_{{ spec.annotations["proto-type"] | as_python_identifier }}_proto_to_{{ spec.type | as_ros_base_type | as_python_identifier }}_message(typed_{{ source | python_identifier_name }}_message, {{ destination }}) {%- elif spec.annotations.get("type-casts") -%} {#- Protobuf message must be unpacked according to type, then converted, then serialized for assignment. -#} {%- for proto_type_name, ros_type_name in spec.annotations["type-casts"] -%} @@ -203,12 +202,12 @@ if {{ source }}.Is({{ proto_type_name | as_pb2_python_type }}.DESCRIPTOR): {%- else %} elif {{ source }}.Is({{ proto_type_name | as_pb2_python_type }}.DESCRIPTOR): {%- endif %} - typed_proto_message = {{ proto_type_name | as_pb2_python_type }}() - ok = {{ source }}.Unpack(typed_proto_message) + typed_{{ proto_type_name | as_python_identifier | python_identifier_name }}_message = {{ proto_type_name | as_pb2_python_type }}() + ok = {{ source }}.Unpack(typed_{{ proto_type_name | as_python_identifier | python_identifier_name }}_message) assert ok, "Failed to unpack any protobuf, internal error" - typed_field_message = {{ ros_type_name | as_ros_python_type }}() - convert_{{ proto_type_name | as_python_identifier }}_proto_to_{{ ros_type_name | as_python_identifier }}_message(typed_proto_message, typed_field_message) - {{ destination }}.value = rclpy.serialization.serialize_message(typed_field_message) + typed_{{ ros_type_name | as_python_identifier | python_identifier_name }}_message = {{ ros_type_name | as_ros_python_type }}() + convert_{{ proto_type_name | as_python_identifier }}_proto_to_{{ ros_type_name | as_python_identifier }}_message(typed_{{ proto_type_name | as_python_identifier | python_identifier_name }}_message, typed_{{ ros_type_name | as_python_identifier | python_identifier_name }}_message) + {{ destination }}.value = rclpy.serialization.serialize_message(typed_{{ ros_type_name | as_python_identifier | python_identifier_name }}_message) {{ destination }}.type_name = "{{ ros_type_name }}" {%- endfor %} else: