Skip to content

Commit

Permalink
refactor: refactor of is_ros2_stack_ready method, handling ros2 actio…
Browse files Browse the repository at this point in the history
…ns checking, improved logging of waiting for not yet available services, actions and topics
  • Loading branch information
MagdalenaKotynia committed Feb 27, 2025
1 parent 3e821da commit 38402c6
Showing 1 changed file with 76 additions and 39 deletions.
115 changes: 76 additions & 39 deletions src/rai_sim/rai_sim/o3de/o3de_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import subprocess
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Set

import yaml
from geometry_msgs.msg import Point, PoseStamped, Quaternion
Expand All @@ -44,9 +44,8 @@
class O3DExROS2SimulationConfig(SimulationConfig):
binary_path: Path
robotic_stack_command: str
required_services: List[str]
required_topics: List[str]
required_actions: List[str]
required_binary_ros2_stack: dict[str, List[str]]
required_robotic_ros2_stack: dict[str, List[str]]

@classmethod
def load_config(
Expand Down Expand Up @@ -198,72 +197,110 @@ def get_scene_state(self) -> SceneState:
)
return SceneState(entities=entities)

def _is_robotic_stack_ready(
self, simulation_config: O3DExROS2SimulationConfig, retries: int = 30
def _is_ros2_stack_ready(
self, required_ros2_stack: dict[str, List[str]], retries: int = 30
) -> bool:
i = 0
while i < retries:
topics = self.connector.get_topics_names_and_types()
services = self.connector.node.get_service_names_and_types()
topics_names = [tp[0] for tp in topics]
service_names = [srv[0] for srv in services]
self.logger.info(
f"required services: {simulation_config.required_services}"
)
self.logger.info(f"required topics: {simulation_config.required_topics}")
self.logger.info(f"required actions: {simulation_config.required_actions}")
# NOTE actions will be listed in services and topics
if (
all(srv in service_names for srv in simulation_config.required_services)
and all(tp in topics_names for tp in simulation_config.required_topics)
and all(
ac in service_names for ac in simulation_config.required_actions
)
):
self.logger.info("All required services are available.")
for i in range(retries):
available_topics = self.connector.get_topics_names_and_types()
available_services = self.connector.node.get_service_names_and_types()
available_topics_names = [tp[0] for tp in available_topics]
available_services_names = [srv[0] for srv in available_services]

# Extract action names
available_actions_names: Set[str] = set()
for service in available_services_names:
if "/_action" in service:
action_name = service.split("/_action")[0]
print("action_name: ", action_name)
available_actions_names.add(action_name)

required_services = required_ros2_stack["services"]
required_topics = required_ros2_stack["topics"]
required_actions = required_ros2_stack["actions"]
self.logger.info(f"required services: {required_services}")
self.logger.info(f"required topics: {required_topics}")
self.logger.info(f"required actions: {required_actions}")
self.logger.info(f"available actions: {available_actions_names}")

missing_services = [
service
for service in required_services
if service not in available_services_names
]
missing_topics = [
topic
for topic in required_topics
if topic not in available_topics_names
]
missing_actions = [
action
for action in required_actions
if action not in available_actions_names
]

if missing_services:
for service in missing_services:
self.logger.warning(f"Waiting for service: {service}")

if missing_topics:
for topic in missing_topics:
self.logger.warning(f"Waiting for topic: {topic}")

if missing_actions:
for action in missing_actions:
self.logger.warning(f"Waiting for action: {action}")

if not (missing_services or missing_topics or missing_actions):
self.logger.info("All required ROS2 stack components are available.")
return True

time.sleep(5)
retries += 1
time.sleep(3)

self.logger.error(
"Maximum number of retries reached. Required ROS2 stack components are not fully available."
)
return False

def setup_scene(self, simulation_config: O3DExROS2SimulationConfig):
if self.current_binary_path != simulation_config.binary_path:
if self.current_sim_process:
self.shutdown()
self._launch_binary(simulation_config.binary_path)
self._launch_robotic_stack(simulation_config.robotic_stack_command)
self._launch_binary(simulation_config)
self._launch_robotic_stack(simulation_config)
self.current_binary_path = simulation_config.binary_path

else:
while self.spawned_entities:
self._despawn_entity(self.spawned_entities[0])

if not self._is_robotic_stack_ready(simulation_config=simulation_config):
raise RuntimeError(
"Not all required services, topics and actions are available"
)

for entity in simulation_config.entities:
self._spawn_entity(entity)

def _launch_binary(self, binary_path: Path):
command = [binary_path.as_posix()]
def _launch_binary(self, simulation_config: O3DExROS2SimulationConfig):
command = [simulation_config.binary_path.as_posix()]
self.logger.info(f"Running command: {command}")
self.current_sim_process = subprocess.Popen(
command,
)
if not self._has_process_started(process=self.current_sim_process):
raise RuntimeError("Process did not start in time.")
if not self._is_ros2_stack_ready(
required_ros2_stack=simulation_config.required_binary_ros2_stack
):
raise RuntimeError("ROS2 stack is not ready in time.")

def _launch_robotic_stack(self, robotic_stack_command: str):
command = shlex.split(robotic_stack_command)
def _launch_robotic_stack(self, simulation_config: O3DExROS2SimulationConfig):
command = shlex.split(simulation_config.robotic_stack_command)
self.logger.info(f"Running command: {command}")
self.current_robotic_stack_process = subprocess.Popen(
command,
)
if not self._has_process_started(self.current_robotic_stack_process):
raise RuntimeError("Process did not start in time.")
if not self._is_ros2_stack_ready(
required_ros2_stack=simulation_config.required_robotic_ros2_stack
):
raise RuntimeError("ROS2 stack is not ready in time.")

def _has_process_started(self, process: subprocess.Popen[Any], timeout: int = 15):
start_time = time.time()
Expand Down

0 comments on commit 38402c6

Please sign in to comment.