Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

436 task guidance node for autopilot #502

Open
wants to merge 54 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
917d5bf
Modified guidance algorithim
Oct 9, 2024
cd4d076
Automated autoyapf fixes
invalid-email-address Oct 9, 2024
7845070
added guidance action server
Oct 11, 2024
7157c5c
Resolved merge conflict in guidance.py
Oct 11, 2024
d7dd424
Automated autoyapf fixes
invalid-email-address Oct 11, 2024
cb2e138
Remove unwanted folder
Oct 11, 2024
0e8585a
Merge remote-tracking branch 'origin/436-task-guidance-node-for-autop…
Oct 11, 2024
6ab5edf
Develop (#487)
kluge7 Oct 13, 2024
cd7f490
Fix errors of no executable
Oct 13, 2024
29c1ad3
trying to resolve merge
Oct 13, 2024
07261ee
Automated autoyapf fixes
invalid-email-address Oct 13, 2024
1fb8b90
Made changes to guidance_action_server and guidance_los files
Oct 21, 2024
d694e89
update guidance action server
Oct 21, 2024
a007e80
Automated autoyapf fixes
invalid-email-address Oct 21, 2024
05ff163
Delete unnecessary files.
Oct 21, 2024
f44e105
modification of the guidance
Oct 23, 2024
9ca5b31
Fixed auto-pilot
Oct 27, 2024
15fed69
Automated autoyapf fixes
invalid-email-address Oct 27, 2024
46455e6
intermediate commit
Nov 3, 2024
9e7d7ba
Merge remote-tracking branch 'origin/485-task-auv-gui' into 436-task-…
Nov 3, 2024
fc4780a
feat: create guidance action server
Nov 10, 2024
79a48b2
Merge remote-tracking branch 'origin/develop' into 436-task-guidance-…
Nov 10, 2024
acc38cb
Merge remote-tracking branch 'origin/develop' into 436-task-guidance-…
Nov 10, 2024
bc076c6
fix(gui): resolve merge conflict in auv_gui.py, retain local updates
kluge7 Nov 11, 2024
9b49170
Modified LOS guidance action server with proper comments and doc strings
Nov 16, 2024
2e9e1c8
Added comprehensive docstrings and comments to LOS guidance algorithm
Nov 16, 2024
d1987fa
change LOS guidance to use third order low-pass filter
Nov 17, 2024
ab21880
fix: vortex_msgs develop branch is target
Hallfred Nov 17, 2024
fa599fe
fix: added pyqt install script
Hallfred Nov 17, 2024
b695f1d
fix: correct ref
Hallfred Nov 17, 2024
03bb8d3
feat: removed tests
Hallfred Nov 17, 2024
992487d
refactor: applied formatting changes
Nov 17, 2024
a4700f8
add stuff
Nov 17, 2024
6ad96af
added stuff
Nov 17, 2024
a78dc87
added stuff
Nov 17, 2024
1bfc2ee
Fixed filter and remove tuple
Nov 17, 2024
e614471
fix CMaklists
Nov 17, 2024
d0061dc
Modified package.xml
Nov 17, 2024
aa27f9f
remove unnecessary comments and added a readme.md file and a yaml fil…
Dec 22, 2024
fd6db80
Apply pre-commit hook fixes
Dec 26, 2024
2f4cfa9
Modified guidance action server node.
Dec 28, 2024
2d4cadf
modify action server and launch file
Jan 2, 2025
8ba17ba
working guidance action server
Jan 7, 2025
3160f92
added goal callback
Jan 7, 2025
afb0a89
added cancel request
Jan 7, 2025
845e362
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 7, 2025
62b1fd8
refactor guidance action server
Jan 7, 2025
3b5c8d9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 7, 2025
f748524
feat: remove dependency on transforms3d
Andeshog Jan 8, 2025
87309ba
refactor: remove try-except block
Andeshog Jan 8, 2025
13d1e6f
refactor: correctly initialize new commands after new goal request
Andeshog Jan 8, 2025
e6e3e9f
modified guidance algorithm
Jan 8, 2025
927b03a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 8, 2025
3b80ce0
[pre-commit.ci] auto fixes from pre-commit.com hooks
Hallfred Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions guidance/guidance_los/config/los_guidance_params.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
los_guidance_node:
# Node configurations
ros__parameters:
los_guidance:
h_delta_min: 1.0
Expand All @@ -13,7 +12,6 @@ los_guidance_node:
omega_diag: [2.5, 2.5, 2.5]
zeta_diag: [0.7, 0.7, 0.7]

# Topic configurations
topics:
publishers:
los_commands: '/guidance/LOS_commands'
Expand All @@ -24,7 +22,6 @@ los_guidance_node:
subscribers:
odometry: '/nucleus/odom'

# QoS configurations
qos:
publisher_depth: 10
subscriber_depth: 10
46 changes: 26 additions & 20 deletions guidance/guidance_los/guidance_los/los_guidance_algorithm.py
AbubakarAliyuBadawi marked this conversation as resolved.
Show resolved Hide resolved
AbubakarAliyuBadawi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,28 +1,37 @@
#!/usr/bin/env python3

import numpy as np


class ThirdOrderLOSGuidance:
"""Line-of-Sight (LOS) guidance algorithm."""
"""This class implements the Line-of-Sight (LOS) guidance algorithm.

The LOS provide a control outputs for surge, pitch, and yaw for navigating towards a target in 3D space.
"""

def __init__(self, config_dict: dict):
# Initialize parameters
self.h_delta_min = config_dict['h_delta_min']
self.h_delta_max = config_dict['h_delta_max']
self.h_delta_factor = config_dict['h_delta_factor']
self.nominal_speed = config_dict['nominal_speed']
self.min_speed = config_dict['min_speed']
self.max_pitch_angle = config_dict['max_pitch_angle']
self.depth_gain = config_dict['depth_gain']

# Filter parameters
self.config_dict = config_dict
self.init_los_parameters()
self.init_filter_parameters()
self.setup_filter_matrices()

def init_los_parameters(self):
"""Initialize Line-of-Sight guidance parameters."""
self.h_delta_min = self.config_dict['h_delta_min']
self.h_delta_max = self.config_dict['h_delta_max']
self.h_delta_factor = self.config_dict['h_delta_factor']
self.nominal_speed = self.config_dict['nominal_speed']
self.min_speed = self.config_dict['min_speed']
self.max_pitch_angle = self.config_dict['max_pitch_angle']
self.depth_gain = self.config_dict['depth_gain']
AbubakarAliyuBadawi marked this conversation as resolved.
Show resolved Hide resolved

def init_filter_parameters(self):
"""Initialize filter state and parameters."""
self.x = np.zeros(9)
self.omega = np.diag(config_dict['filter']['omega_diag'])
self.zeta = np.diag(config_dict['filter']['zeta_diag'])
self.omega = np.diag(self.config_dict['filter']['omega_diag'])
self.zeta = np.diag(self.config_dict['filter']['zeta_diag'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for this, consider using a dataclass instead of dictionary. FilterParams would be a fitting name for the class.


# Initialize filter matrices
self.setup_filter_matrices()
@staticmethod
def ssa(angle: float) -> float:
return (angle + np.pi) % (2 * np.pi) - np.pi

def setup_filter_matrices(self):
self.Ad = np.zeros((9, 9))
Expand All @@ -44,9 +53,6 @@ def setup_filter_matrices(self):
# Input matrix
self.Bd[6:9, :] = omega_cubed

def ssa(self, angle: float) -> float:
return (angle + np.pi) % (2 * np.pi) - np.pi

def compute_raw_los_guidance(
self, current_pos: np.ndarray, target_pos: np.ndarray
) -> tuple[np.ndarray, float, float]:
Expand Down
2 changes: 1 addition & 1 deletion guidance/guidance_los/launch/guidance.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def generate_launch_description():
config_path_arg = DeclareLaunchArgument(
'config_path',
default_value=config_file,
description='/home/badawi/Desktop/auto-pilot/src/vortex-auv/guidance/guidance_los/config/los_guidance_params.yaml',
description='Path to the LOS guidance config file',
)

# Create the node
Expand Down
202 changes: 82 additions & 120 deletions guidance/guidance_los/scripts/los_guidance_action_server.py
AbubakarAliyuBadawi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#!/usr/bin/env python3

import os
from dataclasses import dataclass

import numpy as np
import rclpy
import yaml
from geometry_msgs.msg import PoseStamped, Vector3Stamped
from guidance_los.los_guidance_algorithm import ThirdOrderLOSGuidance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to also import the parameter classes.

from nav_msgs.msg import Odometry
Expand Down Expand Up @@ -35,42 +33,12 @@ def as_ndarray(self) -> np.ndarray:
return np.array([self.x, self.y, self.z, self.yaw, self.pitch])


class GuidanceActionServer(Node):
class LOSActionServer(Node):
"""ROS2 Action Server node for waypoint navigation using LOS guidance."""

def __init__(self):
AbubakarAliyuBadawi marked this conversation as resolved.
Show resolved Hide resolved
super().__init__('guidance_action_server')

# Get the configuration file path from parameters
config_path = self.declare_parameter('config_path', '').value

# Load configuration
if config_path and os.path.exists(config_path):
self.get_logger().info(f"Loading parameters from: {config_path}")
try:
with open(config_path) as file:
config_params = yaml.safe_load(file)
node_params = config_params.get('los_guidance_node', {}).get(
'ros__parameters', {}
)

# Extract different parameter groups
self.los_params = node_params.get('los_guidance', {})
self.topic_params = node_params.get('topics', {})
self.qos_params = node_params.get('qos', {})

self.get_logger().info("Configuration loaded successfully")
except Exception as e:
self.get_logger().error(f"Error loading config file: {e}")
self.los_params = {}
self.topic_params = {}
self.qos_params = {}
else:
self.get_logger().warn("No config file path provided. Using defaults.")
self.los_params = {}
self.topic_params = {}
self.qos_params = {}

# Initialize filter status flag
self.filter_initialized = False

Expand All @@ -86,6 +54,8 @@ def __init__(self):
self._declare_topic_parameters()
self._initialize_publishers()
self._initialize_subscribers()
self._initialize_state()
self._initialize_action_server()

# Initialize guidance calculator with third-order filtering
self.guidance_calculator = ThirdOrderLOSGuidance(self.los_params)
AbubakarAliyuBadawi marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -152,7 +122,16 @@ def _initialize_subscribers(self):
callback_group=self.timer_cb_group,
)

# Action server for handling navigation requests
def _initialize_state(self):
"""Initialize state and navigation variables."""
self.state: State = State()
self.waypoints: list[np.ndarray] = []
self.current_waypoint_index: int = 0
self.goal_handle: ServerGoalHandle[NavigateWaypoints] | None = None
self.update_period = 0.1

def _initialize_action_server(self):
"""Initialize the navigation action server."""
self._action_server = ActionServer(
self,
NavigateWaypoints,
Expand All @@ -161,15 +140,6 @@ def _initialize_subscribers(self):
callback_group=self.action_cb_group,
)

# State variables initialization
self.state: State = State()
self.waypoints: list[np.ndarray] = []
self.current_waypoint_index: int = 0
self.goal_handle: ServerGoalHandle[NavigateWaypoints] | None = None
self.update_period = 0.1

self.get_logger().info('Guidance Action Server initialized')

def publish_log(self, message: str):
"""Publish debug log message to ROS topic and node logger (debug mode only)."""
if not self.debug_mode:
Expand All @@ -181,89 +151,81 @@ def publish_log(self, message: str):

def odom_callback(self, msg: Odometry):
"""Process odometry updates and trigger guidance calculations."""
try:
# Extract orientation quaternion to Euler angles
orientation_q = msg.pose.pose.orientation
roll, pitch, yaw = quat2euler(
[orientation_q.w, orientation_q.x, orientation_q.y, orientation_q.z]
)

# Update vehicle state
self.state.x = msg.pose.pose.position.x
self.state.y = msg.pose.pose.position.y
self.state.z = msg.pose.pose.position.z
self.state.yaw = yaw
self.state.pitch = pitch

# Initialize filter on first callback
if not self.filter_initialized:
initial_commands = np.array([0.0, 0.0, yaw])
self.guidance_calculator.reset_filter_state(initial_commands)
self.filter_initialized = True

# Log current position for debugging
self.publish_log(
f"Position: x={self.state.x:.3f}, "
f"y={self.state.y:.3f}, "
f"z={self.state.z:.3f}"
)
self.publish_log(
f"Orientation: roll={roll:.3f}, pitch={pitch:.3f}, yaw={yaw:.3f}"
)
# Extract orientation quaternion to Euler angles
orientation_q = msg.pose.pose.orientation
roll, pitch, yaw = quat2euler(
[orientation_q.w, orientation_q.x, orientation_q.y, orientation_q.z]
)

except Exception as e:
self.get_logger().error(f'Error in odom_callback: {str(e)}')
# Update vehicle state
self.state.x = msg.pose.pose.position.x
self.state.y = msg.pose.pose.position.y
self.state.z = msg.pose.pose.position.z
self.state.yaw = yaw
self.state.pitch = pitch

# Initialize filter on first callback
if not self.filter_initialized:
initial_commands = np.array([0.0, 0.0, yaw])
self.guidance_calculator.reset_filter_state(initial_commands)
self.filter_initialized = True

# Log current position for debugging
self.publish_log(
f"Position: x={self.state.x:.3f}, "
f"y={self.state.y:.3f}, "
f"z={self.state.z:.3f}"
)
self.publish_log(
f"Orientation: roll={roll:.3f}, pitch={pitch:.3f}, yaw={yaw:.3f}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a

if not self.debug_mode:
    return

self.publish_log...

before the log calls would remove the need for going into all the debug methods when it is not necessary. This means the same statement can be removed from inside the log methods.


def process_guidance(self):
"""Process guidance calculations and publish control commands."""
try:
if self.current_waypoint_index >= len(self.waypoints):
return

target_point = self.waypoints[self.current_waypoint_index]
if self.current_waypoint_index >= len(self.waypoints):
return

# Compute guidance commands using current state
commands, distance, depth_error = self.guidance_calculator.compute_guidance(
self.state.as_ndarray(), target_point, self.update_period
)
target_point = self.waypoints[self.current_waypoint_index]

# Publish commands and monitoring data
self.publish_guidance(commands)
self.publish_reference(target_point)
self.publish_errors(target_point, depth_error)
# Compute guidance commands using current state
commands, distance, depth_error = self.guidance_calculator.compute_guidance(
self.state.as_ndarray(), target_point, self.update_period
)

# Log guidance status
self.publish_log(
f"Guidance Status: surge={commands[0]:.2f}, "
f"pitch={commands[1]:.2f}, yaw={commands[2]:.2f}"
)
self.publish_log(f"Distance to target: {distance:.2f}")
# Publish commands and monitoring data
self.publish_guidance(commands)
self.publish_reference(target_point)
self.publish_errors(target_point, depth_error)

# Check if waypoint is reached (0.5m threshold)
if distance < 0.5:
self.publish_log(f'Reached waypoint {self.current_waypoint_index}')
# Log guidance status
self.publish_log(
f"Guidance Status: surge={commands[0]:.2f}, "
f"pitch={commands[1]:.2f}, yaw={commands[2]:.2f}"
)
self.publish_log(f"Distance to target: {distance:.2f}")

# Reset filter state for next waypoint
initial_commands = np.array([0.0, 0.0, self.state.yaw])
self.guidance_calculator.reset_filter_state(initial_commands)
# Check if waypoint is reached (0.5m threshold)
if distance < 0.5:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The treshold should be a parameter. Add it to the config file.

self.publish_log(f'Reached waypoint {self.current_waypoint_index}')

self.current_waypoint_index += 1
# Reset filter state for next waypoint
initial_commands = np.array([0.0, 0.0, self.state.yaw])
self.guidance_calculator.reset_filter_state(initial_commands)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the initial state not just the same as the actual state?


# Check if all waypoints are reached
if self.current_waypoint_index >= len(self.waypoints):
if self.goal_handle and self.goal_handle.is_active:
self.goal_handle.succeed()
self.goal_handle = None
return
self.current_waypoint_index += 1

# Publish progress feedback
if self.goal_handle and self.goal_handle.is_active:
feedback_msg = NavigateWaypoints.Feedback()
feedback_msg.current_waypoint_index = float(self.current_waypoint_index)
self.goal_handle.publish_feedback(feedback_msg)
# Check if all waypoints are reached
if self.current_waypoint_index >= len(self.waypoints):
if self.goal_handle and self.goal_handle.is_active:
self.goal_handle.succeed()
self.goal_handle = None
return

except Exception as e:
self.get_logger().error(f'Error in process_guidance: {str(e)}')
# Publish progress feedback
if self.goal_handle and self.goal_handle.is_active:
feedback_msg = NavigateWaypoints.Feedback()
feedback_msg.current_waypoint_index = float(self.current_waypoint_index)
self.goal_handle.publish_feedback(feedback_msg)

def execute_callback(self, goal_handle: ServerGoalHandle[NavigateWaypoints]):
"""Execute waypoint navigation action."""
Expand Down Expand Up @@ -311,9 +273,9 @@ def execute_callback(self, goal_handle: ServerGoalHandle[NavigateWaypoints]):
def publish_guidance(self, commands):
"""Publish guidance commands for vehicle control."""
msg = LOSGuidance()
msg.surge = commands[0]
msg.pitch = commands[1]
msg.yaw = commands[2]
msg.surge = commands[0] # surge: forward velocity command
msg.pitch = commands[1] # pitch: pitch angle command
msg.yaw = commands[2] # yaw: yaw angle command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont feel like the comments add much here. What I meant was to use typehint for the commands input, a more fitting structure (like the state dataclass), and perhaps a quick description in the docstring.

So something like this

def publish_guidance(self, commands: State):
    """Publish the commanded surge velocity, pitch angle, and yaw angle."""
    msg = LOSGuidance()
    msg.surge = commands.surge_vel
    msg.pitch = commands.pitch
    msg.yaw = commands.yaw

Note that in this example I added surge velocity to the State dataclass.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will require me to modify how I created the commands in the process_guidance method as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this could work

def compute_raw_los_guidance(
        self, current_pos: State, target_pos: State
    ) -> State:
        
        dx = target_pos.x - current_pos.x
        dy = target_pos.y - current_pos.y

        horizontal_distance = np.sqrt(dx**2 + dy**2)
        desired_yaw = np.arctan2(dy, dx)
        yaw_error = self.ssa(desired_yaw - current_pos.yaw)

        depth_error = target_pos.z - current_pos.z
        desired_pitch = self.compute_pitch_command(depth_error, horizontal_distance)

        desired_surge = self.compute_desired_speed(yaw_error, horizontal_distance)

        commands = State(surge_vel=desired_surge, pitch=desired_pitch, yaw=desired_yaw)

        return commands
def apply_reference_filter(self, commands: State) -> State:
        x_dot = self.Ad @ self.x + self.Bd @ commands.as_los_array()
        self.x = self.x + x_dot * self.dt
        return State(surge_vel=self.x[0], pitch=self.x[1], yaw=self.x[2])

def compute_guidance(
        self, current_pos: State, target_pos: State
    ) -> State:
        raw_commands = self.compute_raw_los_guidance(current_pos, target_pos)

        filtered_commands = self.apply_reference_filter(raw_commands)
        filtered_commands.pitch = self.ssa(filtered_commands.pitch)
        filtered_commands.yaw = self.ssa(filtered_commands.yaw)
        return filtered_commands

where the State class is modified to

@dataclass
class State:
    x: float = 0.0
    y: float = 0.0
    z: float = 0.0
    pitch: float = 0.0
    yaw: float = 0.0
    surge_vel: float = 0.0

    def __add__(self, other: "State") -> "State":
        return State(
            x=self.x + other.x,
            y=self.y + other.y,
            z=self.z + other.z,
            pitch=self.pitch + other.pitch,
            yaw=self.yaw + other.yaw,
        )

    def __sub__(self, other: "State") -> "State":
        return State(
            x=self.x - other.x,
            y=self.y - other.y,
            z=self.z - other.z,
            pitch=self.pitch - other.pitch,
            yaw=self.yaw - other.yaw,
        )
    
    def as_los_array(self):
        return np.array([self.surge_vel, self.pitch, self.yaw])

self.guidance_cmd_pub.publish(msg)

def publish_reference(self, target):
Expand All @@ -324,9 +286,9 @@ def publish_reference(self, target):
msg = PoseStamped()
msg.header.frame_id = "odom"
msg.header.stamp = self.get_clock().now().to_msg()
msg.pose.position.x = target[0]
msg.pose.position.y = target[1]
msg.pose.position.z = target[2]
msg.pose.position.x = target[0] # x: x-coordinate of the target point
msg.pose.position.y = target[1] # y: y-coordinate of the target point
msg.pose.position.z = target[2] # z: z-coordinate of the target point
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you utilize Point for target (thus, generally for waypoints), this could be simplified to

def publish_reference(self, target: Point):
    ...
    msg.pose.position = target

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I change this I might need to modify how waypoints are stored and processed, right now they are stored as a numpy array rather than points.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and it is probably better to just do it straight away since waypoints will be sent and received through a topic as Points in the full system.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh I see.

self.guidance_ref_pub.publish(msg)

def publish_errors(self, target, depth_error):
Expand All @@ -348,7 +310,7 @@ def main(args=None):
rclpy.init(args=args)

# Create and initialize node
action_server = GuidanceActionServer()
action_server = LOSActionServer()

# Create and use multithreaded executor for concurrent processing
executor = MultiThreadedExecutor()
Expand Down