-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathrerun_loader_urdf.py
executable file
·340 lines (281 loc) · 13.3 KB
/
rerun_loader_urdf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!/usr/bin/env python3
"""Example of an executable data-loader plugin for the Rerun Viewer for URDF files."""
from __future__ import annotations
import argparse
import os
import pathlib
from typing import Optional
import numpy as np
import rerun as rr # pip install rerun-sdk
import scipy.spatial.transform as st
import trimesh
import trimesh.visual
from PIL import Image
from urdf_parser_py import urdf as urdf_parser
class URDFLogger:
"""Class to log a URDF to Rerun."""
def __init__(self, filepath: str, entity_path_prefix: Optional[str]) -> None:
self.urdf = urdf_parser.URDF.from_xml_file(filepath)
self.entity_path_prefix = entity_path_prefix
self.mat_name_to_mat = {mat.name: mat for mat in self.urdf.materials}
def link_entity_path(self, link: urdf_parser.Link) -> str:
"""Return the entity path for the URDF link."""
root_name = self.urdf.get_root()
link_names = self.urdf.get_chain(root_name, link.name)[0::2] # skip the joints
return self.add_entity_path_prefix("/".join(link_names))
def joint_entity_path(self, joint: urdf_parser.Joint) -> str:
"""Return the entity path for the URDF joint."""
root_name = self.urdf.get_root()
joint_names = self.urdf.get_chain(root_name, joint.child)[0::2] # skip the links
return self.add_entity_path_prefix("/".join(joint_names))
def add_entity_path_prefix(self, entity_path: str) -> str:
"""Add prefix (if passed) to entity path."""
if self.entity_path_prefix is not None:
return f"{self.entity_path_prefix}/{entity_path}"
return entity_path
def log(self) -> None:
"""Log a URDF file to Rerun."""
for joint in self.urdf.joints:
entity_path = self.joint_entity_path(joint)
self.log_joint(entity_path, joint)
for link in self.urdf.links:
entity_path = self.link_entity_path(link)
self.log_link(entity_path, link)
def log_link(self, entity_path: str, link: urdf_parser.Link) -> None:
"""Log a URDF link to Rerun."""
# create one mesh out of all visuals
for i, visual in enumerate(link.visuals):
self.log_visual(entity_path + f"/visual_{i}", visual)
def log_joint(self, entity_path: str, joint: urdf_parser.Joint) -> None:
"""Log a URDF joint to Rerun."""
translation = rotation = None
if joint.origin is not None and joint.origin.xyz is not None:
translation = joint.origin.xyz
if joint.origin is not None and joint.origin.rpy is not None:
rotation = st.Rotation.from_euler("xyz", joint.origin.rpy).as_matrix()
rr.log(entity_path, rr.Transform3D(translation=translation, mat3x3=rotation))
def log_visual(self, entity_path: str, visual: urdf_parser.Visual) -> None:
"""Log a URDF visual to Rerun."""
material = None
if visual.material is not None:
if visual.material.color is None and visual.material.texture is None:
# use globally defined material
material = self.mat_name_to_mat[visual.material.name]
else:
material = visual.material
transform = np.eye(4)
if visual.origin is not None and visual.origin.xyz is not None:
transform[:3, 3] = visual.origin.xyz
if visual.origin is not None and visual.origin.rpy is not None:
transform[:3, :3] = st.Rotation.from_euler("xyz", visual.origin.rpy).as_matrix()
if isinstance(visual.geometry, urdf_parser.Mesh):
resolved_path = resolve_ros_path(visual.geometry.filename)
mesh_scale = visual.geometry.scale
mesh_or_scene = trimesh.load_mesh(resolved_path)
if mesh_scale is not None:
transform[:3, :3] *= mesh_scale
elif isinstance(visual.geometry, urdf_parser.Box):
mesh_or_scene = trimesh.creation.box(extents=visual.geometry.size)
elif isinstance(visual.geometry, urdf_parser.Cylinder):
mesh_or_scene = trimesh.creation.cylinder(
radius=visual.geometry.radius,
height=visual.geometry.length,
)
elif isinstance(visual.geometry, urdf_parser.Sphere):
mesh_or_scene = trimesh.creation.icosphere(
radius=visual.geometry.radius,
)
else:
rr.log(
"",
rr.TextLog("Unsupported geometry type: " + str(type(visual.geometry))),
)
mesh_or_scene = trimesh.Trimesh()
mesh_or_scene.apply_transform(transform)
if isinstance(mesh_or_scene, trimesh.Scene):
scene = mesh_or_scene
# use dump to apply scene graph transforms and get a list of transformed meshes
for i, mesh in enumerate(scene_to_trimeshes(scene)):
if material is not None and not isinstance(mesh.visual, trimesh.visual.texture.TextureVisuals):
if material.color is not None:
mesh.visual = trimesh.visual.ColorVisuals()
mesh.visual.vertex_colors = material.color.rgba
elif material.texture is not None:
texture_path = resolve_ros_path(material.texture.filename)
mesh.visual = trimesh.visual.texture.TextureVisuals(image=Image.open(texture_path))
log_trimesh(entity_path + f"/{i}", mesh)
else:
mesh = mesh_or_scene
if material is not None and not isinstance(mesh.visual, trimesh.visual.texture.TextureVisuals):
if material.color is not None:
mesh.visual = trimesh.visual.ColorVisuals()
mesh.visual.vertex_colors = material.color.rgba
elif material.texture is not None:
texture_path = resolve_ros_path(material.texture.filename)
mesh.visual = trimesh.visual.texture.TextureVisuals(image=Image.open(texture_path))
log_trimesh(entity_path, mesh)
def scene_to_trimeshes(scene: trimesh.Scene) -> list[trimesh.Trimesh]:
"""
Convert a trimesh.Scene to a list of trimesh.Trimesh.
Skips objects that are not an instance of trimesh.Trimesh.
"""
trimeshes = []
scene_dump = scene.dump()
geometries = [scene_dump] if not isinstance(scene_dump, list) else scene_dump
for geometry in geometries:
if isinstance(geometry, trimesh.Trimesh):
trimeshes.append(geometry)
elif isinstance(geometry, trimesh.Scene):
trimeshes.extend(scene_to_trimeshes(geometry))
return trimeshes
def log_trimesh(entity_path: str, mesh: trimesh.Trimesh) -> None:
vertex_colors = albedo_texture = vertex_texcoords = None
if isinstance(mesh.visual, trimesh.visual.color.ColorVisuals):
vertex_colors = mesh.visual.vertex_colors
elif isinstance(mesh.visual, trimesh.visual.texture.TextureVisuals):
trimesh_material = mesh.visual.material
if mesh.visual.uv is not None:
vertex_texcoords = mesh.visual.uv
# Trimesh uses the OpenGL convention for UV coordinates, so we need to flip the V coordinate
# since Rerun uses the Vulkan/Metal/DX12/WebGPU convention.
vertex_texcoords[:, 1] = 1.0 - vertex_texcoords[:, 1]
if isinstance(trimesh_material, trimesh.visual.material.PBRMaterial):
if trimesh_material.baseColorTexture is not None:
albedo_texture = pil_image_to_albedo_texture(trimesh_material.baseColorTexture)
elif trimesh_material.baseColorFactor is not None:
vertex_colors = trimesh_material.baseColorFactor
elif isinstance(trimesh_material, trimesh.visual.material.SimpleMaterial):
if trimesh_material.image is not None:
albedo_texture = pil_image_to_albedo_texture(trimesh_material.image)
else:
vertex_colors = mesh.visual.to_color().vertex_colors
rr.log(
entity_path,
rr.Mesh3D(
vertex_positions=mesh.vertices,
triangle_indices=mesh.faces,
vertex_normals=mesh.vertex_normals,
vertex_colors=vertex_colors,
albedo_texture=albedo_texture,
vertex_texcoords=vertex_texcoords,
),
timeless=True,
)
def resolve_ros_path(path_str: str) -> str:
"""Resolve a ROS path to an absolute path."""
if path_str.startswith("package://"):
path = pathlib.Path(path_str)
package_name = path.parts[1]
relative_path = pathlib.Path(*path.parts[2:])
package_path = resolve_ros1_package(package_name) or resolve_ros2_package(package_name)
if package_path is None:
raise ValueError(
f"Could not resolve {path}."
f"Replace with relative / absolute path, source the correct ROS environment, or install {package_name}."
)
return str(package_path / relative_path)
elif path_str.startswith("file://"):
return path_str[len("file://") :]
else:
return path_str
def resolve_ros2_package(package_name: str) -> Optional[str]:
try:
import ament_index_python
try:
return ament_index_python.get_package_share_directory(package_name)
except ament_index_python.packages.PackageNotFoundError:
return None
except ImportError:
return None
def resolve_ros1_package(package_name: str) -> Optional[str]:
try:
import rospkg
try:
return rospkg.RosPack().get_path(package_name)
except rospkg.ResourceNotFound:
return None
except ImportError:
return None
def pil_image_to_albedo_texture(image: Image.Image) -> np.ndarray:
"""Convert a PIL image to an albedo texture."""
albedo_texture = np.asarray(image)
if albedo_texture.ndim == 2:
# If the texture is grayscale, we need to convert it to RGB since
# Rerun expects a 3-channel texture.
# See: https://github.com/rerun-io/rerun/issues/4878
albedo_texture = np.stack([albedo_texture] * 3, axis=-1)
return albedo_texture
def main() -> None:
# The Rerun Viewer will always pass these two pieces of information:
# 1. The path to be loaded, as a positional arg.
# 2. A shared recording ID, via the `--recording-id` flag.
#
# It is up to you whether you make use of that shared recording ID or not.
# If you use it, the data will end up in the same recording as all other plugins interested in
# that file, otherwise you can just create a dedicated recording for it. Or both.
parser = argparse.ArgumentParser(
description="""
This is an example executable data-loader plugin for the Rerun Viewer.
Any executable on your `$PATH` with a name that starts with `rerun-loader-` will be
treated as an external data-loader.
This example will load URDF files, logs them to Rerun,
and returns a special exit code to indicate that it doesn't support anything else.
To try it out, copy it in your $PATH as `rerun-loader-python-example-urdf`,
then open a URDF file with Rerun (`rerun example.urdf`).
"""
)
parser.add_argument("filepath", type=str)
parser.add_argument("--application-id", type=str, help="optional recommended ID for the application")
parser.add_argument("--recording-id", type=str, help="optional recommended ID for the recording")
parser.add_argument("--entity-path-prefix", type=str, help="optional prefix for all entity paths")
parser.add_argument(
"--timeless", action="store_true", default=False, help="optionally mark data to be logged as timeless"
)
parser.add_argument(
"--time",
type=str,
action="append",
help="optional timestamps to log at (e.g. `--time sim_time=1709203426`)",
)
parser.add_argument(
"--sequence",
type=str,
action="append",
help="optional sequences to log at (e.g. `--sequence sim_frame=42`)",
)
args = parser.parse_args()
is_file = os.path.isfile(args.filepath)
is_urdf_file = ".urdf" in args.filepath
# Inform the Rerun Viewer that we do not support that kind of file.
if not is_file or not is_urdf_file:
exit(rr.EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE)
if args.application_id is not None:
app_id = args.application_id
else:
app_id = args.filepath
rr.init(app_id, recording_id=args.recording_id)
# The most important part of this: log to standard output so the Rerun Viewer can ingest it!
rr.stdout()
set_time_from_args(args)
if args.entity_path_prefix is not None:
prefix = args.entity_path_prefix
else:
prefix = os.path.basename(args.filepath)
urdf_logger = URDFLogger(args.filepath, prefix)
urdf_logger.log()
def set_time_from_args(args) -> None:
if not args.timeless and args.time is not None:
for time_str in args.time:
parts = time_str.split("=")
if len(parts) != 2:
continue
timeline_name, time = parts
rr.set_time_nanos(timeline_name, int(time))
for time_str in args.sequence:
parts = time_str.split("=")
if len(parts) != 2:
continue
timeline_name, time = parts
rr.set_time_sequence(timeline_name, int(time))
if __name__ == "__main__":
main()