From 38925ec6769d3010b1973f8111dfe723014029f8 Mon Sep 17 00:00:00 2001 From: Tom Arne Pedersen Date: Mon, 15 Apr 2024 07:49:41 +0200 Subject: [PATCH] #27 Now possible to have several WP for own ship. The encounter is set at the start of the situation. --- src/trafficgen/encounter.py | 79 +++++++----- src/trafficgen/read_files.py | 24 ++-- .../settings/encounter_settings.json | 8 +- src/trafficgen/ship_traffic_generator.py | 4 +- src/trafficgen/types.py | 15 +-- src/trafficgen/utils.py | 119 +++++++++++++++++- .../write_traffic_situation_to_file.py | 9 +- tests/test_read_files.py | 46 ++----- 8 files changed, 208 insertions(+), 96 deletions(-) diff --git a/src/trafficgen/encounter.py b/src/trafficgen/encounter.py index 74e075c..deed14e 100644 --- a/src/trafficgen/encounter.py +++ b/src/trafficgen/encounter.py @@ -28,6 +28,8 @@ SituationInput, ) from trafficgen.utils import ( + calculate_bearing_between_waypoints, + calculate_position_along_track_using_waypoints, calculate_position_at_certain_time, convert_angle_0_to_2_pi_to_minus_pi_to_pi, convert_angle_minus_pi_to_pi_to_0_to_2_pi, @@ -98,11 +100,14 @@ def generate_encounter( # Own ship assert own_ship.initial is not None - own_ship_position_future = calculate_position_at_certain_time( - own_ship.initial.position, - lat_lon0, + assert own_ship.waypoints is not None + # Assuming ship is pointing in the direction of wp1 + own_ship_cog = calculate_bearing_between_waypoints( + own_ship.waypoints[0].position, own_ship.waypoints[1].position + ) + own_ship_position_future = calculate_position_along_track_using_waypoints( + own_ship.waypoints, own_ship.initial.sog, - own_ship.initial.cog, vector_time, ) @@ -118,7 +123,7 @@ def generate_encounter( min_target_ship_sog = ( calculate_min_vector_length_target_ship( own_ship.initial.position, - own_ship.initial.cog, + own_ship_cog, target_ship_position_future, beta, lat_lon0, @@ -142,7 +147,7 @@ def generate_encounter( start_position_target_ship, position_found = find_start_position_target_ship( own_ship.initial.position, lat_lon0, - own_ship.initial.cog, + own_ship_cog, target_ship_position_future, target_ship_vector_length, beta, @@ -157,7 +162,8 @@ def generate_encounter( ) encounter_ok: bool = check_encounter_evolvement( own_ship, - own_ship_position_future, + own_ship_cog, + own_ship.initial.position, lat_lon0, target_ship_sog, target_ship_cog, @@ -166,15 +172,17 @@ def generate_encounter( settings, ) - # Check if trajectory passes land - trajectory_on_land = path_crosses_land( - target_ship_initial_position, - target_ship_sog, - target_ship_cog, - lat_lon0, - ) - - encounter_found = encounter_ok and not trajectory_on_land + if settings.disable_land_check is False: + # Check if trajectory passes land + trajectory_on_land = path_crosses_land( + target_ship_initial_position, + target_ship_sog, + target_ship_cog, + lat_lon0, + ) + encounter_found = encounter_ok and not trajectory_on_land + else: + encounter_found = encounter_ok if encounter_found: target_ship_static.id = uuid4() @@ -213,6 +221,7 @@ def generate_encounter( def check_encounter_evolvement( own_ship: OwnShip, + own_ship_cog: float, own_ship_position_future: Position, lat_lon0: Position, target_ship_sog: float, @@ -243,7 +252,6 @@ def check_encounter_evolvement( assert own_ship.initial is not None own_ship_sog: float = own_ship.initial.sog - own_ship_cog: float = own_ship.initial.cog evolve_time: float = settings.evolve_time # Calculating position back in time to ensure that the encounter do not change from one type @@ -299,22 +307,35 @@ def define_own_ship( * own_ship: Own ship """ own_ship_initial: Initial = desired_traffic_situation.own_ship.initial - own_ship_waypoint0 = Waypoint( - position=own_ship_initial.position.model_copy(deep=True), turn_radius=None, data=None - ) - ship_position_future = calculate_position_at_certain_time( - own_ship_initial.position, - lat_lon0, - own_ship_initial.sog, - own_ship_initial.cog, - encounter_settings.situation_length, - ) - own_ship_waypoint1 = Waypoint(position=ship_position_future, turn_radius=None, data=None) + if desired_traffic_situation.own_ship.waypoints is None: + # If waypoints are not given, let initial position be the first waypoint, + # then calculate second waypoint some time in the future + own_ship_waypoint0 = Waypoint( + position=own_ship_initial.position.model_copy(deep=True), turn_radius=None, data=None + ) + ship_position_future = calculate_position_at_certain_time( + own_ship_initial.position, + lat_lon0, + own_ship_initial.sog, + own_ship_initial.cog, + encounter_settings.situation_length, + ) + own_ship_waypoint1 = Waypoint(position=ship_position_future, turn_radius=None, data=None) + own_ship_waypoints: List[Waypoint] = [own_ship_waypoint0, own_ship_waypoint1] + elif len(desired_traffic_situation.own_ship.waypoints) == 1: + # If one waypoint is given, use initial position as first waypoint + own_ship_waypoint0 = Waypoint( + position=own_ship_initial.position.model_copy(deep=True), turn_radius=None, data=None + ) + own_ship_waypoint1 = desired_traffic_situation.own_ship.waypoints[0] + own_ship_waypoints: List[Waypoint] = [own_ship_waypoint0, own_ship_waypoint1] + else: + own_ship_waypoints: List[Waypoint] = desired_traffic_situation.own_ship.waypoints own_ship = OwnShip( static=own_ship_static, initial=own_ship_initial, - waypoints=[own_ship_waypoint0, own_ship_waypoint1], + waypoints=own_ship_waypoints, ) return own_ship diff --git a/src/trafficgen/read_files.py b/src/trafficgen/read_files.py index 805712a..73c653c 100644 --- a/src/trafficgen/read_files.py +++ b/src/trafficgen/read_files.py @@ -11,15 +11,11 @@ TrafficSituation, ) -from trafficgen.types import ( - EncounterSettings, - SituationInput, - UnitType, -) +from trafficgen.types import EncounterSettings, SituationInput from trafficgen.utils import deg_2_rad, knot_2_m_pr_s, min_2_s, nm_2_m -def read_situation_files(situation_folder: Path, input_units: UnitType) -> List[SituationInput]: +def read_situation_files(situation_folder: Path) -> List[SituationInput]: """ Read traffic situation files. @@ -43,8 +39,7 @@ def read_situation_files(situation_folder: Path, input_units: UnitType) -> List[ data["num_situations"] = 1 situation: SituationInput = SituationInput(**data) - if input_units.value == "maritime": - situation = convert_situation_data_from_maritime_to_si_units(situation) + situation = convert_situation_data_from_maritime_to_si_units(situation) situation.input_file_name = file_name situations.append(situation) @@ -98,6 +93,15 @@ def convert_situation_data_from_maritime_to_si_units(situation: SituationInput) situation.own_ship.initial.heading = deg_2_rad(situation.own_ship.initial.heading) situation.own_ship.initial.sog = knot_2_m_pr_s(situation.own_ship.initial.sog) + if situation.own_ship.waypoints is not None: + for waypoint in situation.own_ship.waypoints: + waypoint.position.latitude = deg_2_rad(waypoint.position.latitude) + waypoint.position.longitude = deg_2_rad(waypoint.position.longitude) + if waypoint.data is not None: + assert waypoint.data.model_extra + if waypoint.data.model_extra.get("sog") is not None: + waypoint.data.model_extra["sog"]["value"] = knot_2_m_pr_s(waypoint.data.model_extra["sog"]["value"]) # type: ignore + assert situation.encounters is not None for encounter in situation.encounters: beta: Union[float, None] = encounter.beta @@ -176,9 +180,7 @@ def read_encounter_settings_file(settings_file: Path) -> EncounterSettings: data = check_input_units(data) encounter_settings: EncounterSettings = EncounterSettings(**data) - # assert encounter_settings.input_units is not None - if encounter_settings.input_units.value == "maritime": - encounter_settings = convert_settings_data_from_maritime_to_si_units(encounter_settings) + encounter_settings = convert_settings_data_from_maritime_to_si_units(encounter_settings) return encounter_settings diff --git a/src/trafficgen/settings/encounter_settings.json b/src/trafficgen/settings/encounter_settings.json index 92d9e16..1798cea 100644 --- a/src/trafficgen/settings/encounter_settings.json +++ b/src/trafficgen/settings/encounter_settings.json @@ -34,9 +34,9 @@ 10.0, 30.0 ], - "situation_length": 60.0, + "situation_length": 10.0, "max_meeting_distance": 0.0, - "common_vector": 10.0, - "evolve_time": 120.0, - "input_units": "maritime" + "common_vector": 5.0, + "evolve_time": 5.0, + "disable_land_check": true } \ No newline at end of file diff --git a/src/trafficgen/ship_traffic_generator.py b/src/trafficgen/ship_traffic_generator.py index 02d41e5..fb7b22b 100644 --- a/src/trafficgen/ship_traffic_generator.py +++ b/src/trafficgen/ship_traffic_generator.py @@ -50,9 +50,7 @@ def generate_traffic_situations( own_ship_static: ShipStatic = read_own_ship_static_file(own_ship_file) target_ships_static: List[ShipStatic] = read_target_ship_static_files(target_ship_folder) encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) - desired_traffic_situations: List[SituationInput] = read_situation_files( - situation_folder, encounter_settings.input_units - ) + desired_traffic_situations: List[SituationInput] = read_situation_files(situation_folder) traffic_situations: List[TrafficSituation] = [] for desired_traffic_situation in desired_traffic_situations: diff --git a/src/trafficgen/types.py b/src/trafficgen/types.py index 4c9d5f0..3a834e9 100644 --- a/src/trafficgen/types.py +++ b/src/trafficgen/types.py @@ -1,10 +1,11 @@ """Domain specific data types used in trafficgen.""" from enum import Enum -from typing import List, Union +from typing import List, Optional, Union -from maritime_schema.types.caga import Initial +from maritime_schema.types.caga import Initial, Waypoint from pydantic import BaseModel +from pydantic.fields import Field def to_camel(string: str) -> str: @@ -71,13 +72,6 @@ class Config: populate_by_name = True -class UnitType(Enum): - """Enumeration of encounter types.""" - - SI_UNITS = "si" - MARITIME_UNITS = "maritime" - - class EncounterSettings(BaseModel): """Data type for encounter settings.""" @@ -88,7 +82,7 @@ class EncounterSettings(BaseModel): situation_length: float max_meeting_distance: float evolve_time: float - input_units: UnitType + disable_land_check: bool class Config: """For converting parameters written to file from snake to camel case.""" @@ -101,6 +95,7 @@ class OwnShipInitial(BaseModel): """Data type for initial data for the own ship used for generating a situation.""" initial: Initial + waypoints: Optional[List[Waypoint]] = Field(None, description="An array of `Waypoint` objects.") class SituationInput(BaseModel): diff --git a/src/trafficgen/utils.py b/src/trafficgen/utils.py index c963230..82a1b30 100644 --- a/src/trafficgen/utils.py +++ b/src/trafficgen/utils.py @@ -1,7 +1,9 @@ """Utility functions that are used by several other functions.""" +from typing import List + import numpy as np -from maritime_schema.types.caga import Position +from maritime_schema.types.caga import Position, Waypoint from trafficgen.marine_system_simulator import flat2llh, llh2flat @@ -187,3 +189,118 @@ def calculate_position_at_certain_time( longitude=lon_future, ) return position_future + + +def calculate_distance(position_prev: Position, position_next: Position) -> float: + """ + Calculate the distance in meter between two waypoints. + + Params: + * position_prev{latitude, longitude}: Previous waypoint [rad] + * position_next{latitude, longitude}: Next waypoint [rad] + + Returns + ------- + * distance: Distance between waypoints [m] + """ + # Using position of previous waypoint as reference point + north_next, east_next, _ = llh2flat( + position_next.latitude, position_next.longitude, position_prev.latitude, position_prev.longitude + ) + + distance: float = np.sqrt(north_next**2 + east_next**2) + + return distance + + +def calculate_position_along_track_using_waypoints( + waypoints: List[Waypoint], + inital_speed: float, + vector_time: float, +) -> Position: + """ + Calculate the position of the ship at a given time based on initial position + and delta time, and constant speed and course. + + Params: + * position{latitude, longitude}: Initial ship position [rad] + * speed: Ship speed [m/s] + * course: Ship course [rad] + * delta_time: Delta time from now to the time new position is being calculated [sec] + + Returns + ------- + * position{latitude, longitude}: Estimated ship position in delta time minutes [rad] + """ + accumulate_distance: float = 0 + time_in_transit: float = 0 + + for i in range(1, len(waypoints)): + ship_speed: float = inital_speed + if waypoints[i].data is not None and waypoints[i].data.model_extra["sog"] is not None: # type: ignore + ship_speed = waypoints[i].data.model_extra["sog"]["value"] # type: ignore + + dist_between_waypoints = calculate_distance(waypoints[i - 1].position, waypoints[i].position) + + # find distance ship will travel + dist_travel = ship_speed * (vector_time - time_in_transit) + + if dist_travel > dist_between_waypoints: + time_in_transit = time_in_transit + dist_between_waypoints / ship_speed + else: + bearing = calculate_bearing_between_waypoints( + waypoints[i - 1].position, waypoints[i].position + ) + position_along_track = calculate_destination_along_track( + waypoints[i - 1].position, dist_travel, bearing + ) + return position_along_track + + # if ship reach last waypoint in less time than vector_time, last waypoint is used + return waypoints[-1].position + + +def calculate_bearing_between_waypoints(position_prev: Position, position_next: Position) -> float: + """ + Calculate the bearing in rad between two waypoints. + + Params: + * position_prev{latitude, longitude}: Previous waypoint [rad] + * position_next{latitude, longitude}: Next waypoint [rad] + + Returns + ------- + * bearing: Bearing between waypoints [m] + """ + # Using position of previous waypoint as reference point + north_next, east_next, _ = llh2flat( + position_next.latitude, position_next.longitude, position_prev.latitude, position_prev.longitude + ) + + bearing: float = convert_angle_minus_pi_to_pi_to_0_to_2_pi(np.arctan2(east_next, north_next)) + + return bearing + + +def calculate_destination_along_track( + position_prev: Position, distance: float, bearing: float +) -> Position: + """ + Calculate the destination along the track between two waypoints when distance along the track is given. + + Params: + * position_prev{latitude, longitude}: Previous waypoint [rad] + * distance: Distance to travel [m] + * bearing: Bearing from previous waypoint to next waypoint [rad] + + Returns + ------- + * destination{latitude, longitude}: Destination along the track [rad] + """ + north = distance * np.cos(bearing) + east = distance * np.sin(bearing) + + lat, lon, _ = flat2llh(north, east, position_prev.latitude, position_prev.longitude) + destination = Position(latitude=lat, longitude=lon) + + return destination diff --git a/src/trafficgen/write_traffic_situation_to_file.py b/src/trafficgen/write_traffic_situation_to_file.py index 49722e9..8201875 100644 --- a/src/trafficgen/write_traffic_situation_to_file.py +++ b/src/trafficgen/write_traffic_situation_to_file.py @@ -76,9 +76,10 @@ def convert_ship_data_from_si_units_to_maritime(ship: T_ship) -> T_ship: waypoint.position.longitude = round(rad_2_deg(waypoint.position.longitude), 8) if not waypoint.data: continue - if waypoint.data.sog is not None and waypoint.data.sog.value is not None: # type: ignore - waypoint.data.sog.value = round(m_pr_s_2_knot(waypoint.data.sog.value), 1) # type: ignore - if waypoint.data.heading is not None and waypoint.data.heading.value is not None: # type: ignore - waypoint.data.heading.value = round(m_pr_s_2_knot(waypoint.data.heading.value), 2) # type: ignore + assert waypoint.data.model_extra + if waypoint.data.model_extra.get("sog") is not None: + waypoint.data.model_extra["sog"]["value"] = round(m_pr_s_2_knot(waypoint.data.model_extra["sog"]["value"]), 1) # type: ignore + if waypoint.data.model_extra.get("heading") is not None: + waypoint.data.model_extra["heading"]["value"] = round(m_pr_s_2_knot(waypoint.data.model_extra["heading"]["value"]), 2) # type: ignore return ship diff --git a/tests/test_read_files.py b/tests/test_read_files.py index 40a2ba3..b4ddb93 100644 --- a/tests/test_read_files.py +++ b/tests/test_read_files.py @@ -17,15 +17,12 @@ from trafficgen.types import EncounterSettings, EncounterType, SituationInput -def test_read_situations_1_ts_full_spec(situations_folder_test_01: Path, settings_file: Path): +def test_read_situations_1_ts_full_spec(situations_folder_test_01: Path): """ Test reading traffic situations with full specification, meaning all parameters are specified. """ - encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) - desired_traffic_situations: List[SituationInput] = read_situation_files( - situations_folder_test_01, encounter_settings.input_units - ) + desired_traffic_situations: List[SituationInput] = read_situation_files(situations_folder_test_01) assert len(desired_traffic_situations) == 5 # sourcery skip: no-loop-in-tests @@ -42,15 +39,12 @@ def test_read_situations_1_ts_full_spec(situations_folder_test_01: Path, setting assert situation.encounters[0].vector_time is not None -def test_read_situations_1_ts_partly_spec(situations_folder_test_02: Path, settings_file: Path): +def test_read_situations_1_ts_partly_spec(situations_folder_test_02: Path): """ Test reading traffic situations using partly specification, meaning some of the parameters are specified. """ - encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) - desired_traffic_situations: List[SituationInput] = read_situation_files( - situations_folder_test_02, encounter_settings.input_units - ) + desired_traffic_situations: List[SituationInput] = read_situation_files(situations_folder_test_02) assert len(desired_traffic_situations) == 2 # sourcery skip: no-loop-in-tests @@ -62,15 +56,12 @@ def test_read_situations_1_ts_partly_spec(situations_folder_test_02: Path, setti assert situation.encounters[0].beta is None -def test_read_situations_1_ts_minimum_spec(situations_folder_test_03: Path, settings_file: Path): +def test_read_situations_1_ts_minimum_spec(situations_folder_test_03: Path): """ Test reading traffic situations using using minimum specification, meaning only type of situation is specified. """ - encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) - desired_traffic_situations: List[SituationInput] = read_situation_files( - situations_folder_test_03, encounter_settings.input_units - ) + desired_traffic_situations: List[SituationInput] = read_situation_files(situations_folder_test_03) assert len(desired_traffic_situations) == 2 # sourcery skip: no-loop-in-tests @@ -84,16 +75,11 @@ def test_read_situations_1_ts_minimum_spec(situations_folder_test_03: Path, sett assert situation.encounters[0].vector_time is None -def test_read_situations_2_ts_one_to_many_situations( - situations_folder_test_04: Path, settings_file: Path -): +def test_read_situations_2_ts_one_to_many_situations(situations_folder_test_04: Path): """ Test reading a traffic situation file num_situations=5 and 2 encounter specifications. """ - encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) - desired_traffic_situations: List[SituationInput] = read_situation_files( - situations_folder_test_04, encounter_settings.input_units - ) + desired_traffic_situations: List[SituationInput] = read_situation_files(situations_folder_test_04) assert len(desired_traffic_situations) == 1 # sourcery skip: no-loop-in-tests @@ -109,14 +95,11 @@ def test_read_situations_2_ts_one_to_many_situations( assert encounter.vector_time is None -def test_read_situations_one_to_many_situations(situations_folder_test_05: Path, settings_file: Path): +def test_read_situations_one_to_many_situations(situations_folder_test_05: Path): """ Test reading three traffic situation files 1, 2 and 3 encounter specifications. """ - encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) - desired_traffic_situations: List[SituationInput] = read_situation_files( - situations_folder_test_05, encounter_settings.input_units - ) + desired_traffic_situations: List[SituationInput] = read_situation_files(situations_folder_test_05) assert len(desired_traffic_situations) == 3 # sourcery skip: no-loop-in-tests @@ -135,16 +118,11 @@ def test_read_situations_one_to_many_situations(situations_folder_test_05: Path, assert num_situations_values_found == {6, 3, 1} -def test_read_situations_with_different_encounter_types( - situations_folder_test_07: Path, settings_file: Path -): +def test_read_situations_with_different_encounter_types(situations_folder_test_07: Path): """ Test reading 5 traffic situation files with different encounter types. """ - encounter_settings: EncounterSettings = read_encounter_settings_file(settings_file) - desired_traffic_situations: List[SituationInput] = read_situation_files( - situations_folder_test_07, encounter_settings.input_units - ) + desired_traffic_situations: List[SituationInput] = read_situation_files(situations_folder_test_07) assert len(desired_traffic_situations) == 5 # sourcery skip: no-loop-in-tests