-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: added code for the calov controller
- Loading branch information
Showing
5 changed files
with
262 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
#!/usr/bin/python3 | ||
|
||
from geometry_msgs.msg import Point, Vector3 | ||
from nav_msgs.msg import Odometry | ||
import math | ||
from enum import Enum | ||
import numpy as np | ||
|
||
|
||
class Zones(Enum): | ||
NOCOL = 1 | ||
COLIMM = 2 | ||
STOPNOW = 3 | ||
|
||
|
||
class Approaches(Enum): | ||
FRONT = 1 | ||
RIGHT = 2 | ||
LEFT = 3 | ||
BEHIND = 4 | ||
|
||
|
||
class Obstacle: | ||
|
||
def __init__(self) -> None: | ||
self.vx = 0 | ||
self.vy = 0 | ||
self.r = 0 | ||
self.x = 0 | ||
self.y = 0 | ||
self.heading = 0 | ||
self.speed = 0 | ||
|
||
|
||
class VelocityObstacle: | ||
""" | ||
The Velocity Obstacle class | ||
A computational class used by the collision avoidance system | ||
to determine if a collison can occur, and how the UAV should respond | ||
obstacle: An odometry of the object to avoid | ||
radius_o: The radius of obstacle | ||
vessel: An odometry of the UAV vessel | ||
""" | ||
|
||
def __init__(self, vessel: Obstacle, obstacle: Obstacle) -> None: | ||
|
||
self.vessel = vessel | ||
self.obstacle = obstacle | ||
|
||
self.left_angle = 0.0 | ||
self.right_angle = 0.0 | ||
|
||
self.truncated_time = 5 #placeholder | ||
|
||
def set_cone_angles(self) -> None: | ||
""" | ||
Calculates the largest and smallest heading-angle where a collision can occur | ||
""" | ||
theta_ro = math.atan2(self.obstacle.y - self.vessel.y, | ||
self.obstacle.x - self.vessel.x) | ||
print("ob", self.vessel.r, self.obstacle.r) | ||
theta_ray = math.asin( | ||
(self.vessel.r + self.obstacle.r) / | ||
(math.sqrt((self.obstacle.x - self.vessel.x)**2 + | ||
(self.obstacle.y - self.vessel.y)**2))) | ||
self.right_angle = theta_ro - theta_ray | ||
self.left_angle = theta_ro + theta_ray | ||
|
||
def check_if_collision(self) -> bool: | ||
""" | ||
Returns true if the current velocity results in a collision. | ||
Uses a truncated VO collision cone | ||
""" | ||
|
||
bouffer = 0 | ||
dvx = self.obstacle.vx - self.vessel.vx | ||
dvy = self.obstacle.vy - self.vessel.vy | ||
angle = math.atan2(-dvy, -dvx) | ||
print("vels", dvx, dvy) | ||
|
||
return angle > self.right_angle - bouffer and angle < self.left_angle + bouffer |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import rclpy | ||
from rclpy.node import Node | ||
from nav_msgs.msg import Odometry | ||
from vortex_msgs.msg import GuidanceData, OdometryArray | ||
import math | ||
from scripts.VO_math import VelocityObstacle, Obstacle, Zones, Approaches | ||
from tf2_ros import Buffer, TransformListener | ||
from transformations import euler_from_quaternion | ||
from geometry_msgs.msg import Quaternion | ||
|
||
class ColavController(Node): | ||
def __init__(self): | ||
super().__init__("colav_controller") | ||
|
||
|
||
self.declare_parameters(namespace='', parameters=[ | ||
('guidance_interface_colav_data_topic', '/guidance/colav_data'), | ||
('stop_zone_radius', 0.0), | ||
('colimm_max_radius', float('inf')), | ||
]) | ||
|
||
self.obstacle_sub = self.create_subscription(OdometryArray, "/tracking/mul_tracked_cv_objects", self.obst_callback, 10) | ||
self.vessel_sub = self.create_subscription(Odometry, "/pose_gt", self.vessel_callback, 10) | ||
self.colav_pub = self.create_publisher(GuidanceData, self.get_parameter("/guidance_interface/colav_data").get_parameter_value().string_value, 10) | ||
|
||
self.obstacles = [] | ||
self.vessel_odom = Odometry() | ||
self.vessel = Obstacle() | ||
self.stop_zone_radius = self.get_parameter('stop_zone_radius').value | ||
self.colimm_max_radius = self.get_parameter('colimm_max_radius').value | ||
|
||
def vessel_callback(self, msg): | ||
self.vessel_odom = msg | ||
self.vessel = self.odometry_to_obstacle(msg) | ||
self.t = msg.header.stamp.sec + msg.header.stamp.nanosec / 1e9 | ||
|
||
def obst_callback(self, msg): | ||
self.obstacles = [self.odometry_to_obstacle(odom) for odom in msg.odometry_array] | ||
if not self.obstacles: | ||
self.get_logger().info('No obstacles detected!') | ||
return | ||
colav_data = self.gen_colav_data() | ||
if colav_data: | ||
self.colav_pub.publish(colav_data) | ||
|
||
def quaternion_to_euler(quaternion: Quaternion): | ||
"""Convert a ROS Quaternion message to Euler angles (roll, pitch, yaw).""" | ||
q = (quaternion.x, quaternion.y, quaternion.z, quaternion.w) | ||
return euler_from_quaternion(q) | ||
|
||
def odometry_to_obstacle(self, odometry: Odometry): | ||
# Convert Odometry message to an Obstacle object | ||
# Assuming Obstacle class exists and is properly defined elsewhere | ||
quaternion = ( | ||
odometry.pose.pose.orientation.x, | ||
odometry.pose.pose.orientation.y, | ||
odometry.pose.pose.orientation.z, | ||
odometry.pose.pose.orientation.w) | ||
_, _, yaw = euler_from_quaternion(quaternion) | ||
|
||
return Obstacle( | ||
x=odometry.pose.pose.position.x, | ||
y=odometry.pose.pose.position.y, | ||
vx=odometry.twist.twist.linear.x, | ||
vy=odometry.twist.twist.linear.y, | ||
heading=yaw, | ||
speed=math.hypot(odometry.twist.twist.linear.x, odometry.twist.twist.linear.y), | ||
radius=2) # Assuming a fixed radius for simplicity | ||
|
||
def get_distance(self, obstacle1: Obstacle, obstacle2: Obstacle): | ||
return math.hypot(obstacle1.x - obstacle2.x, obstacle1.y - obstacle2.y) | ||
|
||
def get_closest_obst(self, obstacles: list[Obstacle], vessel: Obstacle): | ||
return min(obstacles, key=lambda obs: self.get_distance(obs, vessel), default=None) | ||
|
||
def gen_colav_data(self): | ||
closest_obst = self.get_closest_obst(self.obstacles) | ||
if closest_obst is None: | ||
self.get_logger().info('No obstacles detected.') | ||
return None | ||
|
||
VO = VelocityObstacle(self.vessel, closest_obst) | ||
VO.set_cone_angles() | ||
|
||
self.get_logger().info(f'Closest obstacle at {closest_obst.x}, {closest_obst.y}') | ||
self.get_logger().info(f'Vessel at {self.vessel.x}, {self.vessel.y}') | ||
self.get_logger().info(f'VO Angles: Left {VO.left_angle}, Right {VO.right_angle}') | ||
|
||
zone = self.get_zone(closest_obst, self.vessel) | ||
self.get_logger().info(f'Zone: {zone}') | ||
|
||
if zone == Zones.NOCOL: | ||
return None | ||
elif zone == Zones.STOPNOW: | ||
return self.create_guidance_data(0, 0, self.vessel.heading, self.vessel_odom) | ||
elif zone == Zones.COLIMM and not VO.check_if_collision(): | ||
return None | ||
|
||
approach = self.gen_approach(closest_obst, self.vessel) | ||
|
||
if approach in [Approaches.FRONT, Approaches.RIGHT]: | ||
buffer = math.pi / 6 # 30 degrees | ||
new_heading = VO.right_angle - buffer | ||
return self.create_guidance_data(self.vessel.speed, new_heading, self.vessel.heading, self.vessel_odom) | ||
elif approach in [Approaches.BEHIND, Approaches.LEFT]: | ||
return None | ||
return None | ||
|
||
def create_guidance_data(self, speed, psi_d, vessel_heading, vessel_odom): | ||
data = GuidanceData() | ||
data.u_d = speed | ||
data.psi_d = psi_d | ||
data.u = speed # Assuming this is the desired speed | ||
data.t = self.get_clock().now().to_msg() | ||
orientation_q = Quaternion( | ||
x=vessel_odom.pose.pose.orientation.x, | ||
y=vessel_odom.pose.pose.orientation.y, | ||
z=vessel_odom.pose.pose.orientation.z, | ||
w=vessel_odom.pose.pose.orientation.w) | ||
_, _, yaw = self.quaternion_to_euler(orientation_q) | ||
data.psi = yaw | ||
return data | ||
|
||
def gen_approach(self, obstacle: Obstacle, vessel: Obstacle): | ||
dx = obstacle.x - vessel.x | ||
dy = obstacle.y - vessel.y | ||
buffer = 10 * math.pi / 180 # 10 degrees in radians | ||
phi = math.atan2(dy, dx) | ||
|
||
if vessel.heading + buffer > phi > vessel.heading - buffer: | ||
return Approaches.FRONT | ||
elif phi < vessel.heading - buffer: | ||
return Approaches.RIGHT | ||
elif phi > vessel.heading + buffer: | ||
return Approaches.LEFT | ||
return Approaches.BEHIND | ||
|
||
def get_zone(self, obstacle: Obstacle, vessel: Obstacle): | ||
distance = self.get_distance(obstacle, vessel) | ||
if distance > self.colimm_max_radius: | ||
return Zones.NOCOL | ||
elif self.stop_zone_radius < distance <= self.colimm_max_radius: | ||
return Zones.COLIMM | ||
return Zones.STOPNOW | ||
|
||
|
||
|
||
if __name__ == '__main__': | ||
rclpy.init() | ||
colav_controller = ColavController() | ||
rclpy.spin(colav_controller) | ||
rclpy.shutdown() |