Skip to content

Commit

Permalink
feat: add monitoring if sim and robotic stack processes are working
Browse files Browse the repository at this point in the history
  • Loading branch information
MagdalenaKotynia committed Feb 28, 2025
1 parent e7527b5 commit 65fca0f
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 143 deletions.
289 changes: 146 additions & 143 deletions src/rai_bench/rai_bench/examples/o3de_test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,157 +38,160 @@
from rai_sim.simulation_bridge import Rotation, Translation

if __name__ == "__main__":
rclpy.init()
connector = ROS2ARIConnector()
node = connector.node
node.declare_parameter("conversion_ratio", 1.0)
try:
rclpy.init()
connector = ROS2ARIConnector()
node = connector.node
node.declare_parameter("conversion_ratio", 1.0)

# define model
llm = get_llm_model(model_type="complex_model", streaming=True)
# define model
llm = get_llm_model(model_type="complex_model", streaming=True)

system_prompt = """
You are a robotic arm with interfaces to detect and manipulate objects.
Here are the coordinates information:
x - front to back (positive is forward)
y - left to right (positive is right)
z - up to down (positive is up)
Before starting the task, make sure to grab the camera image to understand the environment.
"""
# define tools
tools: List[BaseTool] = [
GetObjectPositionsTool(
connector=connector,
target_frame="panda_link0",
source_frame="RGBDCamera5",
camera_topic="/color_image5",
depth_topic="/depth_image5",
camera_info_topic="/color_camera_info5",
get_grabbing_point_tool=GetGrabbingPointTool(connector=connector),
),
MoveToPointTool(connector=connector, manipulator_frame="panda_link0"),
GetROS2ImageTool(connector=connector),
GetROS2TopicsNamesAndTypesTool(connector=connector),
]
# define loggers
now = datetime.now()
experiment_dir = (
f"src/rai_bench/rai_bench/experiments/{now.strftime('%Y-%m-%d_%H-%M-%S')}"
)
Path(experiment_dir).mkdir(parents=True, exist_ok=True)
log_file = f"{experiment_dir}/benchmark.log"
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)

formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(formatter)
system_prompt = """
You are a robotic arm with interfaces to detect and manipulate objects.
Here are the coordinates information:
x - front to back (positive is forward)
y - left to right (positive is right)
z - up to down (positive is up)
Before starting the task, make sure to grab the camera image to understand the environment.
"""
# define tools
tools: List[BaseTool] = [
GetObjectPositionsTool(
connector=connector,
target_frame="panda_link0",
source_frame="RGBDCamera5",
camera_topic="/color_image5",
depth_topic="/depth_image5",
camera_info_topic="/color_camera_info5",
get_grabbing_point_tool=GetGrabbingPointTool(connector=connector),
),
MoveToPointTool(connector=connector, manipulator_frame="panda_link0"),
GetROS2ImageTool(connector=connector),
GetROS2TopicsNamesAndTypesTool(connector=connector),
]
# define loggers
now = datetime.now()
experiment_dir = (
f"src/rai_bench/rai_bench/experiments/{now.strftime('%Y-%m-%d_%H-%M-%S')}"
)
Path(experiment_dir).mkdir(parents=True, exist_ok=True)
log_file = f"{experiment_dir}/benchmark.log"
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)

bench_logger = logging.getLogger("Benchmark logger")
bench_logger.setLevel(logging.INFO)
bench_logger.addHandler(file_handler)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(formatter)

agent_logger = logging.getLogger("Agent logger")
agent_logger.setLevel(logging.INFO)
agent_logger.addHandler(file_handler)
bench_logger = logging.getLogger("Benchmark logger")
bench_logger.setLevel(logging.INFO)
bench_logger.addHandler(file_handler)

configs_dir = "src/rai_bench/rai_bench/o3de_test_bench/configs/"
connector_path = configs_dir + "o3de_config.yaml"
#### Create scenarios manually
# load different scenes
# one_carrot_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene1.yaml"),
# connector_config_path=Path(connector_path),
# )
# multiple_carrot_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene2.yaml"),
# connector_config_path=Path(connector_path),
# )
# red_cubes_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene3.yaml"),
# connector_config_path=Path(connector_path),
# )
# multiple_cubes_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene4.yaml"),
# connector_config_path=Path(connector_path),
# )
# # combine different scene configs with the tasks to create various scenarios
# scenarios = [
# Scenario(
# task=GrabCarrotTask(logger=bench_logger),
# simulation_config=one_carrot_simulation_config,
# simulation_config_path=configs_dir + "scene1.yaml",
# ),
# Scenario(
# task=GrabCarrotTask(logger=bench_logger),
# simulation_config=multiple_carrot_simulation_config,
# simulation_config_path=configs_dir + "scene2.yaml",
# ),
# Scenario(
# task=PlaceCubesTask(logger=bench_logger),
# simulation_config=red_cubes_simulation_config,
# simulation_config_path=configs_dir + "scene3.yaml",
# ),
# Scenario(
# task=PlaceCubesTask(logger=bench_logger),
# simulation_config=multiple_cubes_simulation_config,
# simulation_config_path=configs_dir + "scene4.yaml",
# ),
# ]
agent_logger = logging.getLogger("Agent logger")
agent_logger.setLevel(logging.INFO)
agent_logger.addHandler(file_handler)

### Create scenarios automatically
simulation_configs_paths = [
configs_dir + "scene1.yaml",
configs_dir + "scene2.yaml",
configs_dir + "scene3.yaml",
configs_dir + "scene4.yaml",
]
simulations_configs = [
O3DExROS2SimulationConfig.load_config(Path(path), Path(connector_path))
for path in simulation_configs_paths
]
tasks: List[Task] = [
GrabCarrotTask(logger=bench_logger),
PlaceCubesTask(logger=bench_logger),
]
scenarios = Benchmark.create_scenarios(
tasks=tasks,
simulation_configs=simulations_configs,
simulation_configs_paths=simulation_configs_paths,
)
configs_dir = "src/rai_bench/rai_bench/o3de_test_bench/configs/"
connector_path = configs_dir + "o3de_config.yaml"
#### Create scenarios manually
# load different scenes
# one_carrot_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene1.yaml"),
# connector_config_path=Path(connector_path),
# )
# multiple_carrot_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene2.yaml"),
# connector_config_path=Path(connector_path),
# )
# red_cubes_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene3.yaml"),
# connector_config_path=Path(connector_path),
# )
# multiple_cubes_simulation_config = O3DExROS2SimulationConfig.load_config(
# base_config_path=Path(configs_dir + "scene4.yaml"),
# connector_config_path=Path(connector_path),
# )
# # combine different scene configs with the tasks to create various scenarios
# scenarios = [
# Scenario(
# task=GrabCarrotTask(logger=bench_logger),
# simulation_config=one_carrot_simulation_config,
# simulation_config_path=configs_dir + "scene1.yaml",
# ),
# Scenario(
# task=GrabCarrotTask(logger=bench_logger),
# simulation_config=multiple_carrot_simulation_config,
# simulation_config_path=configs_dir + "scene2.yaml",
# ),
# Scenario(
# task=PlaceCubesTask(logger=bench_logger),
# simulation_config=red_cubes_simulation_config,
# simulation_config_path=configs_dir + "scene3.yaml",
# ),
# Scenario(
# task=PlaceCubesTask(logger=bench_logger),
# simulation_config=multiple_cubes_simulation_config,
# simulation_config_path=configs_dir + "scene4.yaml",
# ),
# ]

# custom request to arm
base_arm_pose = Pose(
translation=Translation(x=0.1, y=0.5, z=0.4),
rotation=Rotation(x=1.0, y=0.0, z=0.0, w=0.0),
)
### Create scenarios automatically
simulation_configs_paths = [
configs_dir + "scene1.yaml",
configs_dir + "scene2.yaml",
configs_dir + "scene3.yaml",
configs_dir + "scene4.yaml",
]
simulations_configs = [
O3DExROS2SimulationConfig.load_config(Path(path), Path(connector_path))
for path in simulation_configs_paths
]
tasks: List[Task] = [
GrabCarrotTask(logger=bench_logger),
PlaceCubesTask(logger=bench_logger),
]
scenarios = Benchmark.create_scenarios(
tasks=tasks,
simulation_configs=simulations_configs,
simulation_configs_paths=simulation_configs_paths,
)

o3de = O3DEngineArmManipulationBridge(connector, logger=agent_logger)
# define benchamrk
results_filename = f"{experiment_dir}/results.csv"
benchmark = Benchmark(
simulation_bridge=o3de,
scenarios=scenarios,
logger=bench_logger,
results_filename=results_filename,
)
for i, s in enumerate(scenarios):
agent = create_conversational_agent(
llm, tools, system_prompt, logger=agent_logger
# custom request to arm
base_arm_pose = Pose(
translation=Translation(x=0.1, y=0.5, z=0.4),
rotation=Rotation(x=1.0, y=0.0, z=0.0, w=0.0),
)
benchmark.run_next(agent=agent)
o3de.move_arm(
pose=base_arm_pose,
initial_gripper_state=True,
final_gripper_state=False,
frame_id="panda_link0",
) # return to case position
time.sleep(0.2) # admire the end position for a second ;)

bench_logger.info("===============================================================")
bench_logger.info("ALL SCENARIOS DONE. BENCHMARK COMPLETED!")
bench_logger.info("===============================================================")
o3de = O3DEngineArmManipulationBridge(connector, logger=agent_logger)
# define benchamrk
results_filename = f"{experiment_dir}/results.csv"
benchmark = Benchmark(
simulation_bridge=o3de,
scenarios=scenarios,
logger=bench_logger,
results_filename=results_filename,
)
for i, s in enumerate(scenarios):
agent = create_conversational_agent(
llm, tools, system_prompt, logger=agent_logger
)
benchmark.run_next(agent=agent)
o3de.move_arm(
pose=base_arm_pose,
initial_gripper_state=True,
final_gripper_state=False,
frame_id="panda_link0",
) # return to case position
time.sleep(0.2) # admire the end position for a second ;)

connector.shutdown()
o3de.shutdown()
rclpy.shutdown()
bench_logger.info("===============================================================")
bench_logger.info("ALL SCENARIOS DONE. BENCHMARK COMPLETED!")
bench_logger.info("===============================================================")
except Exception as e:
raise e
finally:
connector.shutdown()
o3de.shutdown()
rclpy.shutdown()
51 changes: 51 additions & 0 deletions src/rai_sim/rai_sim/o3de/o3de_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import shlex
import signal
import subprocess
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
Expand All @@ -41,6 +42,10 @@
)


class ProcessMonitorError(Exception):
"""Exception raised when a monitored process terminates unexpectedly."""


class O3DExROS2SimulationConfig(SimulationConfig):
binary_path: Path
robotic_stack_command: str
Expand Down Expand Up @@ -68,6 +73,52 @@ def __init__(
self.current_robotic_stack_process = None
self.current_binary_path = None

self._monitor_exception = None
self._monitor_running = True

self.monitor_thread = threading.Thread(
target=self._process_monitor, daemon=True
)
self.monitor_thread.start()

def _process_monitor(self):
"""Background thread to check if the simulation and robotic stack processes are still running."""
try:
while self._monitor_running:
time.sleep(2) # Check every 2 seconds

if (
self.current_sim_process
and self.current_sim_process.poll() is not None
):
error_msg = "Simulation process has unexpectedly terminated!"
self.logger.error(error_msg)
raise ProcessMonitorError(error_msg)

if (
self.current_robotic_stack_process
and self.current_robotic_stack_process.poll() is not None
):
error_msg = "Robotic stack process has unexpectedly terminated!"
self.logger.error(error_msg)
raise ProcessMonitorError(error_msg)
except Exception as e:
self._monitor_exception = e
self.logger.error(f"Process monitor detected an error: {str(e)}")

def check_monitor_status(self):
"""
Checks if the process monitor has detected any errors.
This method should be called regularly by the main thread.
Raises:
Exception: If the monitor thread has detected an error
"""
if self._monitor_exception:
exception = self._monitor_exception
self._monitor_exception = None
raise exception

def shutdown(self):
self._shutdown_binary()
self._shutdown_robotic_stack()
Expand Down

0 comments on commit 65fca0f

Please sign in to comment.