From 4fdef9809ec73cd72695904d0fa50861e608f535 Mon Sep 17 00:00:00 2001 From: Tom Arne Pedersen Date: Thu, 25 Jan 2024 07:21:05 +0100 Subject: [PATCH] Still working on #17. Fixed tests. Updated several methods. --- src/trafficgen/check_land_crossing.py | 14 +- src/trafficgen/encounter.py | 109 +++++++-------- src/trafficgen/plot_traffic_situation.py | 124 +++++++----------- src/trafficgen/read_files.py | 120 ++++++++++++++++- .../settings/encounter_settings.json | 3 +- src/trafficgen/ship_traffic_generator.py | 5 +- src/trafficgen/types.py | 8 ++ src/trafficgen/utils.py | 62 ++++----- tests/data/test_07/test_07_3.json | 6 +- tests/test_read_files.py | 48 +++++-- tests/test_trafficgen.py | 18 +-- tests/test_write_files.py | 24 +++- 12 files changed, 337 insertions(+), 204 deletions(-) diff --git a/src/trafficgen/check_land_crossing.py b/src/trafficgen/check_land_crossing.py index 4a6df32..3dd0ec9 100644 --- a/src/trafficgen/check_land_crossing.py +++ b/src/trafficgen/check_land_crossing.py @@ -6,7 +6,7 @@ from trafficgen.marine_system_simulator import flat2llh from trafficgen.types import Position -from trafficgen.utils import calculate_position_at_certain_time, deg_2_rad, rad_2_deg +from trafficgen.utils import calculate_position_at_certain_time, rad_2_deg def path_crosses_land( @@ -14,17 +14,17 @@ def path_crosses_land( speed: float, course: float, lat_lon0: List[float], - time_interval: float = 50.0, + time_interval: float = 300.0, ) -> bool: """ Find if path is crossing land. Params: position_1: Ship position in (north, east) [m]. - speed: Ship speed [knots]. - course: Ship course [degree]. - lat_lon0: Reference point, latitudinal [degree] and longitudinal [degree]. - time_interval: The time interval the vessel should travel without crossing land [minutes] + speed: Ship speed [m/s]. + course: Ship course [rad]. + lat_lon0: Reference point, latitudinal [rad] and longitudinal [rad]. + time_interval: The time interval the vessel should travel without crossing land [sec] Returns ------- @@ -41,7 +41,7 @@ def path_crosses_land( position_2 = calculate_position_at_certain_time( Position(north=north_1, east=east_1), speed, course, i * time_interval / num_checks ) - lat, lon, _ = flat2llh(position_2.north, position_2.east, deg_2_rad(lat_0), deg_2_rad(lon_0)) + lat, lon, _ = flat2llh(position_2.north, position_2.east, lat_0, lon_0) lat = rad_2_deg(lat) lon = rad_2_deg(lon) if globe.is_land(lat, lon): # type: ignore (The global_land_mask package is unfortunately not typed.) diff --git a/src/trafficgen/encounter.py b/src/trafficgen/encounter.py index 56d77f7..5b75320 100644 --- a/src/trafficgen/encounter.py +++ b/src/trafficgen/encounter.py @@ -23,12 +23,8 @@ ) from trafficgen.utils import ( calculate_position_at_certain_time, - convert_angle_minus_180_to_180_to_0_to_360, - deg_2_rad, - knot_2_m_pr_min, - m_pr_min_2_knot, - nm_2_m, - rad_2_deg, + convert_angle_0_to_2_pi_to_minus_pi_to_pi, + convert_angle_minus_pi_to_pi_to_0_to_2_pi, ) @@ -104,7 +100,7 @@ def generate_encounter( inner_counter += 1 relative_sog = relative_sog_default if relative_sog is None: - min_target_ship_sog = m_pr_min_2_knot( + min_target_ship_sog = ( calculate_min_vector_length_target_ship( own_ship.initial.position, own_ship.initial.cog, @@ -125,7 +121,7 @@ def generate_encounter( target_ship.initial.sog = np.minimum(target_ship.initial.sog, target_ship.static.speed_max) - target_ship_vector_length = knot_2_m_pr_min(target_ship.initial.sog) * vector_time + target_ship_vector_length = target_ship.initial.sog * vector_time start_position_target_ship, position_found = find_start_position_target_ship( own_ship.initial.position, own_ship.initial.cog, @@ -295,7 +291,7 @@ def find_start_position_target_ship( n_2: float = target_ship_position_future.north e_2: float = target_ship_position_future.east v_r: float = target_ship_vector_length - psi: float = np.deg2rad(own_ship_cog + desired_beta) + psi: float = own_ship_cog + desired_beta n_4: float = n_1 + np.cos(psi) e_4: float = e_1 + np.sin(psi) @@ -323,10 +319,10 @@ def find_start_position_target_ship( s_1 = (-b + np.sqrt(b**2 - 4 * a * c)) / (2 * a) s_2 = (-b - np.sqrt(b**2 - 4 * a * c)) / (2 * a) - e_31 = e_1 + s_1 * (e_4 - e_1) - n_31 = n_1 + s_1 * (n_4 - n_1) - e_32 = e_1 + s_2 * (e_4 - e_1) - n_32 = n_1 + s_2 * (n_4 - n_1) + e_31 = round(e_1 + s_1 * (e_4 - e_1), 0) + n_31 = round(n_1 + s_1 * (n_4 - n_1), 0) + e_32 = round(e_1 + s_2 * (e_4 - e_1), 0) + n_32 = round(n_1 + s_2 * (n_4 - n_1), 0) target_ship_cog_1 = calculate_ship_cog( pos_0=Position(north=n_31, east=e_31), @@ -354,10 +350,17 @@ def find_start_position_target_ship( colreg_state2: EncounterType = determine_colreg( alpha2, beta2, theta13_criteria, theta14_criteria, theta15_criteria, theta15 ) - if desired_encounter_type is colreg_state1 and np.abs(beta1 - desired_beta % 360) < deg_2_rad(0.1): + + if ( + desired_encounter_type is colreg_state1 + and np.abs(convert_angle_0_to_2_pi_to_minus_pi_to_pi(np.abs(beta1 - desired_beta))) < 0.001 + ): start_position_target_ship = Position(north=n_31, east=e_31) start_position_found = True - elif desired_encounter_type is colreg_state2 and np.abs(beta1 - desired_beta % 360) < deg_2_rad(0.1): + elif ( + desired_encounter_type is colreg_state2 + and np.abs(convert_angle_0_to_2_pi_to_minus_pi_to_pi(np.abs(beta1 - desired_beta))) < 0.001 + ): start_position_target_ship = Position(north=n_32, east=e_32) start_position_found = True @@ -377,17 +380,17 @@ def assign_future_position_to_target_ship( * own_ship_position_future: Dict, own ship position at a given time in the future, {north, east} * max_meeting_distance: Maximum distance between own ship and target ship at - a given time in the future [nm] + a given time in the future [m] Returns ------- future_position_target_ship: Future position of target ship {north, east} [m] """ random_angle = random.uniform(0, 1) * 2 * np.pi - random_distance = random.uniform(0, 1) * nm_2_m(max_meeting_distance) + random_distance = random.uniform(0, 1) * max_meeting_distance - north: float = own_ship_position_future.north + random_distance * np.cos(deg_2_rad(random_angle)) - east: float = own_ship_position_future.east + random_distance * np.sin(deg_2_rad(random_angle)) + north: float = own_ship_position_future.north + random_distance * np.cos(random_angle) + east: float = own_ship_position_future.east + random_distance * np.sin(random_angle) return Position(north=north, east=east) @@ -419,23 +422,27 @@ def determine_colreg( * encounter classification """ # Mapping - alpha0360: float = alpha if alpha >= 0.0 else alpha + 360.0 - beta0180: float = beta if (beta >= 0.0) & (beta <= 180.0) else beta - 360.0 + alpha_2_pi: float = alpha if alpha >= 0.0 else alpha + 2 * np.pi + beta_pi: float = beta if (beta >= 0.0) & (beta <= np.pi) else beta - 2 * np.pi # Find appropriate rule set if (beta > theta15[0]) & (beta < theta15[1]) & (abs(alpha) - theta13_criteria <= 0.001): return EncounterType.OVERTAKING_STAND_ON - if (alpha0360 > theta15[0]) & (alpha0360 < theta15[1]) & (abs(beta0180) - theta13_criteria <= 0.001): + if ( + (alpha_2_pi > theta15[0]) + & (alpha_2_pi < theta15[1]) + & (abs(beta_pi) - theta13_criteria <= 0.001) + ): return EncounterType.OVERTAKING_GIVE_WAY - if (abs(beta0180) - theta14_criteria <= 0.001) & (abs(alpha) - theta14_criteria <= 0.001): + if (abs(beta_pi) - theta14_criteria <= 0.001) & (abs(alpha) - theta14_criteria <= 0.001): return EncounterType.HEAD_ON if (beta > 0) & (beta < theta15[0]) & (alpha > -theta15[0]) & (alpha - theta15_criteria <= 0.001): return EncounterType.CROSSING_GIVE_WAY if ( - (alpha0360 > 0) - & (alpha0360 < theta15[0]) - & (beta0180 > -theta15[0]) - & (beta0180 - theta15_criteria <= 0.001) + (alpha_2_pi > 0) + & (alpha_2_pi < theta15[0]) + & (beta_pi > -theta15[0]) + & (beta_pi - theta15_criteria <= 0.001) ): return EncounterType.CROSSING_STAND_ON return EncounterType.NO_RISK_COLLISION @@ -453,18 +460,15 @@ def calculate_relative_bearing( Params: * position_own_ship: Dict, own ship position {north, east} [m] - * heading_own_ship: Own ship cog [deg] + * heading_own_ship: Own ship cog [rad] * position_target_ship: Dict, own ship position {north, east} [m] - * heading_target_ship: Target ship cog [deg] + * heading_target_ship: Target ship cog [rad] Returns ------- - * beta: relative bearing between own ship and target ship seen from own ship [deg] - * alpha: relative bearing between target ship and own ship seen from target ship [deg] + * beta: relative bearing between own ship and target ship seen from own ship [rad] + * alpha: relative bearing between target ship and own ship seen from target ship [rad] """ - heading_own_ship = np.deg2rad(heading_own_ship) - heading_target_ship = np.deg2rad(heading_target_ship) - # POSE combination of relative bearing and contact angle n_own_ship: float = position_own_ship.north e_own_ship: float = position_own_ship.east @@ -515,9 +519,6 @@ def calculate_relative_bearing( while alpha >= np.pi: alpha -= 2 * np.pi - beta = np.rad2deg(beta) - alpha = np.rad2deg(alpha) - return beta, alpha @@ -531,12 +532,12 @@ def calculate_ship_cog(pos_0: Position, pos_1: Position) -> float: Returns ------- - cog: Ship cog [deg] + cog: Ship cog [rad] """ cog: float = np.arctan2(pos_1.east - pos_0.east, pos_1.north - pos_0.north) if cog < 0.0: cog += 2 * np.pi - return round(np.rad2deg(cog), 1) + return round(cog, 3) def assign_vector_time(vector_time_range: List[float]): @@ -567,13 +568,13 @@ def assign_sog_to_target_ship( Params: * encounter_type: Type of encounter - * own_ship_sog: Own ship sog [knot] - * min_target_ship_sog: Minimum target ship sog [knot] + * own_ship_sog: Own ship sog [m/s] + * min_target_ship_sog: Minimum target ship sog [m/s] * relative_sog_setting: Relative sog setting dependent on encounter [-] Returns ------- - target_ship_sog: Target ship sog [knot] + target_ship_sog: Target ship sog [m/s] """ if encounter_type is EncounterType.OVERTAKING_STAND_ON: relative_sog = relative_sog_setting.overtaking_stand_on @@ -613,7 +614,7 @@ def assign_beta(encounter_type: EncounterType, settings: EncounterSettings) -> f Returns ------- - Relative bearing between own ship and target ship seen from own ship [deg] + Relative bearing between own ship and target ship seen from own ship [rad] """ theta13_crit: float = settings.classification.theta13_criteria theta14_crit: float = settings.classification.theta14_criteria @@ -629,7 +630,7 @@ def assign_beta(encounter_type: EncounterType, settings: EncounterSettings) -> f if encounter_type is EncounterType.CROSSING_GIVE_WAY: return 0 + random.uniform(0, 1) * (theta15[0] - 0) if encounter_type is EncounterType.CROSSING_STAND_ON: - return convert_angle_minus_180_to_180_to_0_to_360( + return convert_angle_minus_pi_to_pi_to_0_to_2_pi( -theta15[1] + random.uniform(0, 1) * (theta15[1] + theta15_crit) ) return 0.0 @@ -645,7 +646,7 @@ def update_position_data_target_ship( Params: * target_ship: Target ship data - * lat_lon0: Reference point, latitudinal [degree] and longitudinal [degree] + * lat_lon0: Reference point, latitudinal [rad] and longitudinal [rad] Returns ------- @@ -659,11 +660,11 @@ def update_position_data_target_ship( lat, lon, _ = flat2llh( target_ship.initial.position.north, target_ship.initial.position.east, - deg_2_rad(lat_0), - deg_2_rad(lon_0), + lat_0, + lon_0, ) - target_ship.initial.position.latitude = round(rad_2_deg(lat), 6) - target_ship.initial.position.longitude = round(rad_2_deg(lon), 6) + target_ship.initial.position.latitude = round(lat, 6) + target_ship.initial.position.longitude = round(lon, 6) return target_ship @@ -676,7 +677,7 @@ def update_position_data_own_ship( Params: * ship: Own ship data - * delta_time: Delta time from now to the time new position is being calculated [minutes] + * delta_time: Delta time from now to the time new position is being calculated [sec] Returns ------- @@ -700,11 +701,11 @@ def update_position_data_own_ship( lat_future, lon_future, _ = flat2llh( ship_position_future.north, ship_position_future.east, - deg_2_rad(lat_0), - deg_2_rad(lon_0), + lat_0, + lon_0, ) - ship_position_future.latitude = round(rad_2_deg(lat_future), 6) - ship_position_future.longitude = round(rad_2_deg(lon_future), 6) + ship_position_future.latitude = round(lat_future, 6) + ship_position_future.longitude = round(lon_future, 6) ship.waypoints = [ Waypoint(position=ship.initial.position.model_copy()), diff --git a/src/trafficgen/plot_traffic_situation.py b/src/trafficgen/plot_traffic_situation.py index 7e306e5..5052064 100644 --- a/src/trafficgen/plot_traffic_situation.py +++ b/src/trafficgen/plot_traffic_situation.py @@ -11,7 +11,7 @@ from trafficgen.marine_system_simulator import flat2llh from trafficgen.types import Position, Ship, TargetShip, TrafficSituation -from trafficgen.utils import deg_2_rad, knot_2_m_pr_min, m2nm, rad_2_deg +from trafficgen.utils import m_2_nm, rad_2_deg def calculate_vector_arrow( @@ -25,13 +25,13 @@ def calculate_vector_arrow( Params: position: {north}, {east} position of the ship [m] - direction: direction the arrow is pointing [deg] - vector_length: length of vector - lat_lon0: Reference point, latitudinal [degree] and longitudinal [degree] + direction: direction the arrow is pointing [rad] + vector_length: length of vector [m] + lat_lon0: Reference point, latitudinal [rad] and longitudinal [rad] Returns ------- - arrow_points: Polygon points to draw the arrow + arrow_points: Polygon points to draw the arrow [deg] """ north_start = position.north east_start = position.east @@ -39,23 +39,21 @@ def calculate_vector_arrow( side_length = vector_length / 10 sides_angle = 25 - north_end = north_start + vector_length * np.cos(deg_2_rad(direction)) - east_end = east_start + vector_length * np.sin(deg_2_rad(direction)) + north_end = north_start + vector_length * np.cos(direction) + east_end = east_start + vector_length * np.sin(direction) - north_arrow_side_1 = north_end + side_length * np.cos(deg_2_rad(direction + 180 - sides_angle)) - east_arrow_side_1 = east_end + side_length * np.sin(deg_2_rad(direction + 180 - sides_angle)) - north_arrow_side_2 = north_end + side_length * np.cos(deg_2_rad(direction + 180 + sides_angle)) - east_arrow_side_2 = east_end + side_length * np.sin(deg_2_rad(direction + 180 + sides_angle)) + north_arrow_side_1 = north_end + side_length * np.cos(direction + np.pi - sides_angle) + east_arrow_side_1 = east_end + side_length * np.sin(direction + np.pi - sides_angle) + north_arrow_side_2 = north_end + side_length * np.cos(direction + np.pi + sides_angle) + east_arrow_side_2 = east_end + side_length * np.sin(direction + np.pi + sides_angle) - lat_start, lon_start, _ = flat2llh( - north_start, east_start, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) - ) - lat_end, lon_end, _ = flat2llh(north_end, east_end, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1])) + lat_start, lon_start, _ = flat2llh(north_start, east_start, lat_lon0[0], lat_lon0[1]) + lat_end, lon_end, _ = flat2llh(north_end, east_end, lat_lon0[0], lat_lon0[1]) lat_arrow_side_1, lon_arrow_side_1, _ = flat2llh( - north_arrow_side_1, east_arrow_side_1, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) + north_arrow_side_1, east_arrow_side_1, lat_lon0[0], lat_lon0[1] ) lat_arrow_side_2, lon_arrow_side_2, _ = flat2llh( - north_arrow_side_2, east_arrow_side_2, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) + north_arrow_side_2, east_arrow_side_2, lat_lon0[0], lat_lon0[1] ) point_1 = (rad_2_deg(lat_start), rad_2_deg(lon_start)) @@ -78,14 +76,14 @@ def calculate_ship_outline( Params: position: {north}, {east} position of the ship [m] - course: course of the ship [deg] - lat_lon0: Reference point, latitudinal [degree] and longitudinal [degree] + course: course of the ship [rad] + lat_lon0: Reference point, latitudinal [rad] and longitudinal [rad] ship_length: Ship length. If not given, ship length is set to 100 ship_width: Ship width. If not given, ship width is set to 15 Returns ------- - ship_outline_points: Polygon points to draw the ship + ship_outline_points: Polygon points to draw the ship [deg] """ north_start = position.north east_start = position.east @@ -94,67 +92,41 @@ def calculate_ship_outline( ship_length *= 10 ship_width *= 10 - north_pos1 = ( - north_start - + np.cos(deg_2_rad(course)) * (-ship_length / 2) - - np.sin(deg_2_rad(course)) * ship_width / 2 - ) - east_pos1 = ( - east_start - + np.sin(deg_2_rad(course)) * (-ship_length / 2) - + np.cos(deg_2_rad(course)) * ship_width / 2 - ) - lat_pos1, lon_pos1, _ = flat2llh( - north_pos1, east_pos1, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) - ) + north_pos1 = north_start + np.cos(course) * (-ship_length / 2) - np.sin(course) * ship_width / 2 + east_pos1 = east_start + np.sin(course) * (-ship_length / 2) + np.cos(course) * ship_width / 2 + lat_pos1, lon_pos1, _ = flat2llh(north_pos1, east_pos1, lat_lon0[0], lat_lon0[1]) north_pos2 = ( north_start - + np.cos(deg_2_rad(course)) * (ship_length / 2 - ship_length * 0.1) - - np.sin(deg_2_rad(course)) * ship_width / 2 + + np.cos(course) * (ship_length / 2 - ship_length * 0.1) + - np.sin(course) * ship_width / 2 ) east_pos2 = ( east_start - + np.sin(deg_2_rad(course)) * (ship_length / 2 - ship_length * 0.1) - + np.cos(deg_2_rad(course)) * ship_width / 2 - ) - lat_pos2, lon_pos2, _ = flat2llh( - north_pos2, east_pos2, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) + + np.sin(course) * (ship_length / 2 - ship_length * 0.1) + + np.cos(course) * ship_width / 2 ) + lat_pos2, lon_pos2, _ = flat2llh(north_pos2, east_pos2, lat_lon0[0], lat_lon0[1]) - north_pos3 = north_start + np.cos(deg_2_rad(course)) * (ship_length / 2) - east_pos3 = east_start + np.sin(deg_2_rad(course)) * (ship_length / 2) - lat_pos3, lon_pos3, _ = flat2llh( - north_pos3, east_pos3, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) - ) + north_pos3 = north_start + np.cos(course) * (ship_length / 2) + east_pos3 = east_start + np.sin(course) * (ship_length / 2) + lat_pos3, lon_pos3, _ = flat2llh(north_pos3, east_pos3, lat_lon0[0], lat_lon0[1]) north_pos4 = ( north_start - + np.cos(deg_2_rad(course)) * (ship_length / 2 - ship_length * 0.1) - - np.sin(deg_2_rad(course)) * (-ship_width / 2) + + np.cos(course) * (ship_length / 2 - ship_length * 0.1) + - np.sin(course) * (-ship_width / 2) ) east_pos4 = ( east_start - + np.sin(deg_2_rad(course)) * (ship_length / 2 - ship_length * 0.1) - + np.cos(deg_2_rad(course)) * (-ship_width / 2) - ) - lat_pos4, lon_pos4, _ = flat2llh( - north_pos4, east_pos4, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) + + np.sin(course) * (ship_length / 2 - ship_length * 0.1) + + np.cos(course) * (-ship_width / 2) ) + lat_pos4, lon_pos4, _ = flat2llh(north_pos4, east_pos4, lat_lon0[0], lat_lon0[1]) - north_pos5 = ( - north_start - + np.cos(deg_2_rad(course)) * (-ship_length / 2) - - np.sin(deg_2_rad(course)) * (-ship_width / 2) - ) - east_pos5 = ( - east_start - + np.sin(deg_2_rad(course)) * (-ship_length / 2) - + np.cos(deg_2_rad(course)) * (-ship_width / 2) - ) - lat_pos5, lon_pos5, _ = flat2llh( - north_pos5, east_pos5, deg_2_rad(lat_lon0[0]), deg_2_rad(lat_lon0[1]) - ) + north_pos5 = north_start + np.cos(course) * (-ship_length / 2) - np.sin(course) * (-ship_width / 2) + east_pos5 = east_start + np.sin(course) * (-ship_length / 2) + np.cos(course) * (-ship_width / 2) + lat_pos5, lon_pos5, _ = flat2llh(north_pos5, east_pos5, lat_lon0[0], lat_lon0[1]) point_1 = (rad_2_deg(lat_pos1), rad_2_deg(lon_pos1)) point_2 = (rad_2_deg(lat_pos2), rad_2_deg(lon_pos2)) @@ -226,8 +198,8 @@ def add_ship_to_map( Params: ship: Ship information - vector_time: Vector time [min] - lat_lon0=Reference point, latitudinal [degree] and longitudinal [degree] + vector_time: Vector time [sec] + lat_lon0=Reference point, latitudinal [rad] and longitudinal [rad] map_plot: Instance of Map. If not set, instance is set to None color: Color of the ship. If not set, color is 'black' @@ -236,10 +208,10 @@ def add_ship_to_map( m: Updated instance of Map. """ if map_plot is None: - map_plot = Map(location=(lat_lon0[0], lat_lon0[1]), zoom_start=10) + map_plot = Map(location=(rad_2_deg(lat_lon0[0]), rad_2_deg(lat_lon0[1])), zoom_start=10) assert ship.initial is not None - vector_length = vector_time * knot_2_m_pr_min(ship.initial.sog) + vector_length = vector_time * ship.initial.sog _ = map_plot.add_child( Polygon( calculate_vector_arrow(ship.initial.position, ship.initial.cog, vector_length, lat_lon0), @@ -353,8 +325,8 @@ def find_max_value_for_plot( max_value = np.max( [ max_value, - np.abs(m2nm(ship.initial.position.north)), - np.abs(m2nm(ship.initial.position.east)), + np.abs(m_2_nm(ship.initial.position.north)), + np.abs(m_2_nm(ship.initial.position.east)), ] ) return max_value @@ -380,18 +352,18 @@ def add_ship_to_plot( assert isinstance(axes, plt.Axes) assert ship.initial is not None - pos_0_north = m2nm(ship.initial.position.north) - pos_0_east = m2nm(ship.initial.position.east) + pos_0_north = m_2_nm(ship.initial.position.north) + pos_0_east = m_2_nm(ship.initial.position.east) course = ship.initial.cog speed = ship.initial.sog - vector_length = m2nm(vector_time * knot_2_m_pr_min(speed)) + vector_length = m_2_nm(vector_time * speed) _ = axes.arrow( pos_0_east, pos_0_north, - vector_length * np.sin(deg_2_rad(course)), - vector_length * np.cos(deg_2_rad(course)), + vector_length * np.sin(course), + vector_length * np.cos(course), edgecolor=color, facecolor=color, width=0.0001, diff --git a/src/trafficgen/read_files.py b/src/trafficgen/read_files.py index 697ab57..7c0c902 100644 --- a/src/trafficgen/read_files.py +++ b/src/trafficgen/read_files.py @@ -3,18 +3,27 @@ import json import os from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Union from uuid import UUID, uuid4 -from trafficgen.types import AISNavStatus, EncounterSettings, OwnShip, TargetShip, TrafficSituation +from trafficgen.types import ( + AISNavStatus, + EncounterSettings, + OwnShip, + TargetShip, + TrafficSituation, + UnitType, +) +from trafficgen.utils import deg_2_rad, knot_2_m_pr_s, min_2_s, nm_2_m -def read_situation_files(situation_folder: Path) -> List[TrafficSituation]: +def read_situation_files(situation_folder: Path, input_units: UnitType) -> List[TrafficSituation]: """ Read traffic situation files. Params: situation_folder: Path to the folder where situation files are found + input_units: Specify if the inputs are given in si or maritime units Returns ------- @@ -28,11 +37,72 @@ def read_situation_files(situation_folder: Path) -> List[TrafficSituation]: data = convert_keys_to_snake_case(data) situation: TrafficSituation = TrafficSituation(**data) + if input_units.value == "maritime": + situation = convert_situation_data_from_maritime_to_si_units(situation) + + situation.input_file_name = file_name + situations.append(situation) + return situations + + +def read_generated_situation_files(situation_folder: Path) -> List[TrafficSituation]: + """ + Read the generated traffic situation files. Used for testing the trafficgen algorithm. + + Params: + situation_folder: Path to the folder where situation files are found + + Returns + ------- + situations: List of desired traffic situations + """ + situations: List[TrafficSituation] = [] + for file_name in sorted([file for file in os.listdir(situation_folder) if file.endswith(".json")]): + file_path = os.path.join(situation_folder, file_name) + with open(file_path, encoding="utf-8") as f: + data = json.load(f) + + data = convert_keys_to_snake_case(data) + situation: TrafficSituation = TrafficSituation(**data) + situation.input_file_name = file_name situations.append(situation) return situations +def convert_situation_data_from_maritime_to_si_units(situation: TrafficSituation) -> TrafficSituation: + """ + Convert situation data which is given in maritime units to SI units. + + Params: + own_ship_file: Path to the own_ship_file file + + Returns + ------- + own_ship information + """ + assert situation.own_ship is not None + assert situation.own_ship.initial is not None + situation.own_ship.initial.position.longitude = round( + deg_2_rad(situation.own_ship.initial.position.longitude), 6 + ) + situation.own_ship.initial.position.latitude = round( + deg_2_rad(situation.own_ship.initial.position.latitude), 6 + ) + situation.own_ship.initial.cog = round(deg_2_rad(situation.own_ship.initial.cog), 4) + situation.own_ship.initial.sog = round(knot_2_m_pr_s(situation.own_ship.initial.sog), 2) + + assert situation.encounter is not None + for i in range(len(situation.encounter)): + beta: Union[float, None] = situation.encounter[i].beta + vector_time: Union[float, None] = situation.encounter[i].vector_time + if beta is not None: + situation.encounter[i].beta = round(deg_2_rad(beta), 4) + if vector_time is not None: + situation.encounter[i].vector_time = min_2_s(vector_time) + return situation + + def read_own_ship_file(own_ship_file: Path) -> OwnShip: """ Read own ship file. @@ -99,10 +169,54 @@ def read_encounter_settings_file(settings_file: Path) -> EncounterSettings: """ with open(settings_file, encoding="utf-8") as f: data = json.load(f) + data = check_input_units(data) encounter_settings: EncounterSettings = EncounterSettings(**data) + + # assert encounter_settings.input_units is not None + if encounter_settings.input_units.value == "maritime": + encounter_settings = convert_settings_data_from_maritime_to_si_units(encounter_settings) + return encounter_settings +def convert_settings_data_from_maritime_to_si_units(settings: EncounterSettings) -> EncounterSettings: + """ + Convert situation data which is given in maritime units to SI units. + + Params: + own_ship_file: Path to the own_ship_file file + + Returns + ------- + own_ship information + """ + assert settings.classification is not None + + settings.classification.theta13_criteria = deg_2_rad(settings.classification.theta13_criteria) + settings.classification.theta14_criteria = deg_2_rad(settings.classification.theta14_criteria) + settings.classification.theta15_criteria = deg_2_rad(settings.classification.theta15_criteria) + settings.classification.theta15[0] = deg_2_rad(settings.classification.theta15[0]) + settings.classification.theta15[1] = deg_2_rad(settings.classification.theta15[1]) + + settings.vector_range[0] = min_2_s(settings.vector_range[0]) + settings.vector_range[1] = min_2_s(settings.vector_range[1]) + + settings.situation_length = min_2_s(settings.situation_length) + settings.max_meeting_distance = nm_2_m(settings.max_meeting_distance) + settings.evolve_time = min_2_s(settings.evolve_time) + + return settings + + +def check_input_units(data: Dict[str, Any]) -> Dict[str, Any]: + """Check if input unit is specified, if not specified it is set to SI.""" + + if "input_units" not in data: + data["input_units"] = "si" + + return data + + def camel_to_snake(string: str) -> str: """Convert a camel case string to snake case.""" return "".join([f"_{c.lower()}" if c.isupper() else c for c in string]).lstrip("_") diff --git a/src/trafficgen/settings/encounter_settings.json b/src/trafficgen/settings/encounter_settings.json index 302d71f..0058a40 100644 --- a/src/trafficgen/settings/encounter_settings.json +++ b/src/trafficgen/settings/encounter_settings.json @@ -36,5 +36,6 @@ ], "situation_length": 60.0, "max_meeting_distance": 0.0, - "evolve_time": 120.0 + "evolve_time": 120.0, + "input_units": "maritime" } \ No newline at end of file diff --git a/src/trafficgen/ship_traffic_generator.py b/src/trafficgen/ship_traffic_generator.py index 6227ea7..6021db9 100644 --- a/src/trafficgen/ship_traffic_generator.py +++ b/src/trafficgen/ship_traffic_generator.py @@ -29,6 +29,7 @@ def generate_traffic_situations( Params: * situation_folder: Path to situation folder, files describing the desired situations + * own_ship_file: Path to where own ships is found * target_ship_folder: Path to where different type of target ships is found * settings_file: Path to settings file @@ -38,10 +39,12 @@ def generate_traffic_situations( One situation may consist of one or more encounters. """ - desired_traffic_situations: List[TrafficSituation] = read_situation_files(situation_folder) own_ship: OwnShip = read_own_ship_file(own_ship_file) target_ships: List[TargetShip] = read_target_ship_files(target_ship_folder) encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + desired_traffic_situations: List[TrafficSituation] = read_situation_files( + situation_folder, encounter_settings.input_units + ) traffic_situations: List[TrafficSituation] = [] for desired_traffic_situation in desired_traffic_situations: diff --git a/src/trafficgen/types.py b/src/trafficgen/types.py index 47807c3..a168c58 100644 --- a/src/trafficgen/types.py +++ b/src/trafficgen/types.py @@ -205,6 +205,13 @@ class Config: populate_by_name = True +class UnitType(Enum): + """Enumeration of encounter types.""" + + SI_UNITS = "si" + MARITIME_UNITS = "maritime" + + class EncounterSettings(BaseModel): """Data type for encounter settings.""" @@ -214,6 +221,7 @@ class EncounterSettings(BaseModel): situation_length: float max_meeting_distance: float evolve_time: float + input_units: UnitType class Config: """For converting parameters written to file from snake to camel case.""" diff --git a/src/trafficgen/utils.py b/src/trafficgen/utils.py index bc1e8c5..421d70c 100644 --- a/src/trafficgen/utils.py +++ b/src/trafficgen/utils.py @@ -5,39 +5,39 @@ from trafficgen.types import Position -def m_pr_min_2_knot(speed_in_m_pr_min: float) -> float: +def knot_2_m_pr_s(speed_in_knot: float) -> float: """ - Convert ship speed in meters pr minutes to knot. + Convert ship speed in knots to meters pr second. Params: - speed_in_m_pr_min: Ship speed in meters pr second + speed_in_knot: Ship speed given in knots Returns ------- - speed_in_knot: Ship speed given in knots + speed_in_m_pr_s: Ship speed in meters pr second """ knot_2_m_pr_sec: float = 0.5144 - return speed_in_m_pr_min / (knot_2_m_pr_sec * 60.0) + return speed_in_knot * knot_2_m_pr_sec -def knot_2_m_pr_min(speed_in_knot: float) -> float: +def min_2_s(time_in_min: float) -> float: """ - Convert ship speed in knot to meters pr minutes. + Convert time given in minutes to time given in seconds. Params: - speed_in_knot: Ship speed given in knots + time_in_min: Time given in minutes Returns ------- - speed_in_m_pr_min: Ship speed in meters pr minutes + time_in_s: Time in seconds """ - knot_2_m_pr_sec: float = 0.5144 - return speed_in_knot * knot_2_m_pr_sec * 60.0 + min_2_s_coeff: float = 60.0 + return time_in_min * min_2_s_coeff -def m2nm(length_in_m: float) -> float: +def m_2_nm(length_in_m: float) -> float: """ Convert length given in meters to length given in nautical miles. @@ -49,8 +49,8 @@ def m2nm(length_in_m: float) -> float: length_in_nm: Length given in nautical miles """ - m_2_nm: float = 1.0 / 1852.0 - return m_2_nm * length_in_m + m_2_nm_coeff: float = 1.0 / 1852.0 + return m_2_nm_coeff * length_in_m def nm_2_m(length_in_nm: float) -> float: @@ -100,38 +100,38 @@ def rad_2_deg(angle_in_radians: float) -> float: return angle_in_radians * 180.0 / np.pi -def convert_angle_minus_180_to_180_to_0_to_360(angle_180: float) -> float: +def convert_angle_minus_pi_to_pi_to_0_to_2_pi(angle_pi: float) -> float: """ - Convert an angle given in the region -180 to 180 degrees to an - angle given in the region 0 to 360 degrees. + Convert an angle given in the region -pi to pi degrees to an + angle given in the region 0 to 2pi radians. Params: - angle_180: Angle given in the region -180 to 180 degrees + angle_pi: Angle given in the region -pi to pi radians Returns ------- - angle_360: Angle given in the region 0 to 360 degrees + angle_2_pi: Angle given in the region 0 to 2pi radians """ - return angle_180 if angle_180 >= 0.0 else angle_180 + 360.0 + return angle_pi if angle_pi >= 0.0 else angle_pi + 2 * np.pi -def convert_angle_0_to_360_to_minus_180_to_180(angle_360: float) -> float: +def convert_angle_0_to_2_pi_to_minus_pi_to_pi(angle_2_pi: float) -> float: """ - Convert an angle given in the region 0 to 360 degrees to an - angle given in the region -180 to 180 degrees. + Convert an angle given in the region 0 to 2*pi degrees to an + angle given in the region -pi to pi degrees. Params: - angle_360: Angle given in the region 0 to 360 degrees + angle_2_pi: Angle given in the region 0 to 2pi radians Returns ------- - angle_180: Angle given in the region -180 to 180 degrees + angle_pi: Angle given in the region -pi to pi radians """ - return angle_360 if (angle_360 >= 0.0) & (angle_360 <= 180.0) else angle_360 - 360.0 + return angle_2_pi if (angle_2_pi >= 0.0) & (angle_2_pi <= np.pi) else angle_2_pi - 2 * np.pi def calculate_position_at_certain_time( @@ -142,12 +142,12 @@ def calculate_position_at_certain_time( ) -> Position: """ Calculate the position of the ship at a given time based on initial position - and delta time, and constand speed and course. + and delta time, and constant speed and course. Params: position: Initial ship position [m] - speed: Ship speed [knot] - course: Ship course [deg] + speed: Ship speed [m/s] + course: Ship course [rad] delta_time: Delta time from now to the time new position is being calculated [minutes] Returns @@ -156,8 +156,8 @@ def calculate_position_at_certain_time( """ - north = position.north + knot_2_m_pr_min(speed) * delta_time * np.cos(deg_2_rad(course)) - east = position.east + knot_2_m_pr_min(speed) * delta_time * np.sin(deg_2_rad(course)) + north = position.north + speed * delta_time * np.cos(course) + east = position.east + speed * delta_time * np.sin(course) position_future: Position = Position( north=north, east=east, diff --git a/tests/data/test_07/test_07_3.json b/tests/data/test_07/test_07_3.json index ec41134..6fa5245 100644 --- a/tests/data/test_07/test_07_3.json +++ b/tests/data/test_07/test_07_3.json @@ -2,13 +2,13 @@ "title": "OT-SO", "commonVector": 10.0, "ownShip": { - "startPose": { + "initial": { "position": { "latitude": 58.763449, "longitude": 10.490654 }, - "speed": 8.0, - "course": 110.0 + "sog": 8.0, + "cog": 110.0 } }, "encounter": [ diff --git a/tests/test_read_files.py b/tests/test_read_files.py index 88a38ad..04234c5 100644 --- a/tests/test_read_files.py +++ b/tests/test_read_files.py @@ -19,12 +19,15 @@ ) -def test_read_situations_1_ts_full_spec(situations_folder_test_01: Path): +def test_read_situations_1_ts_full_spec(situations_folder_test_01: Path, settings_file: Path): """ Test reading traffic situations with full specification, meaning all parameters are specified. """ - situations: List[TrafficSituation] = read_situation_files(situations_folder_test_01) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder_test_01, encounter_settings.input_units + ) assert len(situations) == 5 # sourcery skip: no-loop-in-tests @@ -39,12 +42,15 @@ def test_read_situations_1_ts_full_spec(situations_folder_test_01: Path): assert situation.encounter[0].vector_time is not None -def test_read_situations_1_ts_partly_spec(situations_folder_test_02: Path): +def test_read_situations_1_ts_partly_spec(situations_folder_test_02: Path, settings_file: Path): """ Test reading traffic situations using partly specification, meaning some of the parameters are specified. """ - situations: List[TrafficSituation] = read_situation_files(situations_folder_test_02) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder_test_02, encounter_settings.input_units + ) assert len(situations) == 2 # sourcery skip: no-loop-in-tests @@ -57,12 +63,15 @@ def test_read_situations_1_ts_partly_spec(situations_folder_test_02: Path): assert situation.encounter[0].beta is None -def test_read_situations_1_ts_minimum_spec(situations_folder_test_03: Path): +def test_read_situations_1_ts_minimum_spec(situations_folder_test_03: Path, settings_file: Path): """ Test reading traffic situations using using minimum specification, meaning only type of situation is specified. """ - situations: List[TrafficSituation] = read_situation_files(situations_folder_test_03) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder_test_03, encounter_settings.input_units + ) assert len(situations) == 2 # sourcery skip: no-loop-in-tests @@ -77,11 +86,16 @@ def test_read_situations_1_ts_minimum_spec(situations_folder_test_03: Path): assert situation.encounter[0].vector_time is None -def test_read_situations_2_ts_one_to_many_situations(situations_folder_test_04: Path): +def test_read_situations_2_ts_one_to_many_situations( + situations_folder_test_04: Path, settings_file: Path +): """ Test reading a traffic situation file num_situations=5 and 2 encounter specifications. """ - situations: List[TrafficSituation] = read_situation_files(situations_folder_test_04) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder_test_04, encounter_settings.input_units + ) assert len(situations) == 1 # sourcery skip: no-loop-in-tests @@ -98,11 +112,14 @@ def test_read_situations_2_ts_one_to_many_situations(situations_folder_test_04: assert encounter.vector_time is None -def test_read_situations_one_to_many_situations(situations_folder_test_05: Path): +def test_read_situations_one_to_many_situations(situations_folder_test_05: Path, settings_file: Path): """ Test reading three traffic situation files 1, 2 and 3 encounter specifications. """ - situations: List[TrafficSituation] = read_situation_files(situations_folder_test_05) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder_test_05, encounter_settings.input_units + ) assert len(situations) == 3 # sourcery skip: no-loop-in-tests @@ -122,11 +139,16 @@ def test_read_situations_one_to_many_situations(situations_folder_test_05: Path) assert num_situations_values_found == {6, 3, None} -def test_read_situations_with_different_encounter_types(situations_folder_test_07: Path): +def test_read_situations_with_different_encounter_types( + situations_folder_test_07: Path, settings_file: Path +): """ Test reading 5 traffic situation files with different encounter types. """ - situations: List[TrafficSituation] = read_situation_files(situations_folder_test_07) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder_test_07, encounter_settings.input_units + ) assert len(situations) == 5 # sourcery skip: no-loop-in-tests @@ -194,4 +216,4 @@ def test_read_encounter_settings_file(settings_file: Path): assert settings.relative_speed is not None assert settings.vector_range is not None assert settings.max_meeting_distance == 0.0 - assert settings.evolve_time == 120.0 + assert settings.evolve_time == 120.0 * 60 diff --git a/tests/test_trafficgen.py b/tests/test_trafficgen.py index 8ed73a8..f0bc682 100644 --- a/tests/test_trafficgen.py +++ b/tests/test_trafficgen.py @@ -7,7 +7,7 @@ from click.testing import CliRunner from trafficgen import cli -from trafficgen.read_files import read_situation_files +from trafficgen.read_files import read_generated_situation_files from trafficgen.ship_traffic_generator import generate_traffic_situations from trafficgen.types import TrafficSituation @@ -117,7 +117,7 @@ def test_gen_situations_1_ts_full_spec_cli( assert result.exit_code == 0 assert "Generating traffic situations" in result.output - situations: List[TrafficSituation] = read_situation_files(output_folder) + situations: List[TrafficSituation] = read_generated_situation_files(output_folder) assert len(situations) == 5 # sourcery skip: no-loop-in-tests @@ -161,7 +161,7 @@ def test_gen_situations_1_ts_partly_spec_cli( assert result.exit_code == 0 assert "Generating traffic situations" in result.output - situations: List[TrafficSituation] = read_situation_files(output_folder) + situations: List[TrafficSituation] = read_generated_situation_files(output_folder) assert len(situations) == 2 # sourcery skip: no-loop-in-tests @@ -178,7 +178,7 @@ def test_gen_situations_1_ts_partly_spec_cli( # the same assertions but leaves out the CLI part, does not show this # behaviour when run in GitHub. # Claas, 2023-11-25 - assert len(situation.target_ships) in {0, 1} + assert len(situation.target_ships) == 1 def test_gen_situations_1_ts_minimum_spec_cli( @@ -214,7 +214,7 @@ def test_gen_situations_1_ts_minimum_spec_cli( assert result.exit_code == 0 assert "Generating traffic situations" in result.output - situations: List[TrafficSituation] = read_situation_files(output_folder) + situations: List[TrafficSituation] = read_generated_situation_files(output_folder) assert len(situations) == 2 # sourcery skip: no-loop-in-tests @@ -255,7 +255,7 @@ def test_gen_situations_2_ts_one_to_many_situations_cli( assert result.exit_code == 0 assert "Generating traffic situations" in result.output - situations: List[TrafficSituation] = read_situation_files(output_folder) + situations: List[TrafficSituation] = read_generated_situation_files(output_folder) assert len(situations) == 5 # sourcery skip: no-loop-in-tests @@ -296,7 +296,7 @@ def test_gen_situations_one_to_many_situations_cli( assert result.exit_code == 0 assert "Generating traffic situations" in result.output - situations: List[TrafficSituation] = read_situation_files(output_folder) + situations: List[TrafficSituation] = read_generated_situation_files(output_folder) assert len(situations) == 10 # sourcery skip: no-loop-in-tests @@ -338,7 +338,7 @@ def test_gen_situations_ot_gw_target_ship_speed_too_high_cli( assert result.exit_code == 0 assert "Generating traffic situations" in result.output - situations: List[TrafficSituation] = read_situation_files(output_folder) + situations: List[TrafficSituation] = read_generated_situation_files(output_folder) assert len(situations) == 3 # sourcery skip: no-loop-in-tests @@ -380,7 +380,7 @@ def test_gen_situations_baseline_cli( assert result.exit_code == 0 assert "Generating traffic situations" in result.output - situations: List[TrafficSituation] = read_situation_files(output_folder) + situations: List[TrafficSituation] = read_generated_situation_files(output_folder) # assert len(situations) == 5 # sourcery skip: no-loop-in-tests diff --git a/tests/test_write_files.py b/tests/test_write_files.py index 5baea74..46fa7b7 100644 --- a/tests/test_write_files.py +++ b/tests/test_write_files.py @@ -3,31 +3,43 @@ from pathlib import Path from typing import List -from trafficgen.read_files import read_situation_files -from trafficgen.types import TrafficSituation +from trafficgen.read_files import ( + read_encounter_settings_file, + read_generated_situation_files, + read_situation_files, +) +from trafficgen.types import EncounterSettings, TrafficSituation from trafficgen.write_traffic_situation_to_file import write_traffic_situations_to_json_file def test_write_situations_multiple( situations_folder: Path, + settings_file: Path, output_folder: Path, ): """Test writing multiple traffic situations in one call.""" - situations: List[TrafficSituation] = read_situation_files(situations_folder) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder, encounter_settings.input_units + ) write_traffic_situations_to_json_file(situations, output_folder) - reread_situations: List[TrafficSituation] = read_situation_files(output_folder) + reread_situations: List[TrafficSituation] = read_generated_situation_files(output_folder) assert len(situations) == len(reread_situations) def test_write_situations_single( situations_folder: Path, + settings_file: Path, output_folder: Path, ): """Test writing multiple traffic situations, each in a separate single call.""" - situations: List[TrafficSituation] = read_situation_files(situations_folder) + encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) + situations: List[TrafficSituation] = read_situation_files( + situations_folder, encounter_settings.input_units + ) # sourcery skip: no-loop-in-tests # sourcery skip: no-conditionals-in-tests @@ -37,7 +49,7 @@ def test_write_situations_single( if file.is_file(): file.unlink() write_traffic_situations_to_json_file([situation], output_folder) - reread_situation: TrafficSituation = read_situation_files(output_folder)[0] + reread_situation: TrafficSituation = read_generated_situation_files(output_folder)[0] # single difference between the original and the reread situation should be the # input_file_name field reread_situation.input_file_name = situation.input_file_name