diff --git a/scenariogeneration/xodr/__init__.py b/scenariogeneration/xodr/__init__.py index 3ed8dfe..430c419 100644 --- a/scenariogeneration/xodr/__init__.py +++ b/scenariogeneration/xodr/__init__.py @@ -12,3 +12,4 @@ from .elevation import * from .junction_creator import * from .utils import * +from .lane_def import * diff --git a/scenariogeneration/xodr/generators.py b/scenariogeneration/xodr/generators.py index 3ac60b4..e1719ef 100644 --- a/scenariogeneration/xodr/generators.py +++ b/scenariogeneration/xodr/generators.py @@ -20,7 +20,7 @@ """ import numpy as np import pyclothoids as pcloth -import copy + from .lane import Lane, RoadMark, LaneSection, Lanes, RoadLine from .enumerations import ( JunctionType, @@ -31,9 +31,21 @@ ObjectType, ) -from .geometry import Line, Arc, Spiral, PlanView +from .geometry import Line, Arc, Spiral, PlanView, AdjustablePlanview from .opendrive import Road from .links import Junction, Connection, _get_related_lanesection, LaneLinker +from .lane_def import ( + LaneDef, + create_lanes_merge_split, + std_roadmark_broken, + std_roadmark_broken_solid, + std_roadmark_broken_tight, + std_roadmark_broken_broken, + std_roadmark_broken_long_line, + std_roadmark_solid_broken, + std_roadmark_solid_solid, + std_roadmark_solid, +) from .exceptions import ( GeneralIssueInputArguments, NotSameAmountOfLanesError, @@ -43,56 +55,6 @@ from warnings import warn -def std_roadmark_solid(): - return RoadMark(RoadMarkType.solid, 0.2) - - -def std_roadmark_broken(): - roadmark = RoadMark(RoadMarkType.broken, 0.2) - roadmark.add_specific_road_line(RoadLine(0.15, 3, 9, 0, 0)) - return roadmark - - -def std_roadmark_broken_long_line(): - roadmark = RoadMark(RoadMarkType.broken, 0.2) - roadmark.add_specific_road_line(RoadLine(0.15, 9, 3, 0, 0)) - return roadmark - - -def std_roadmark_broken_tight(): - roadmark = RoadMark(RoadMarkType.broken, 0.2) - roadmark.add_specific_road_line(RoadLine(0.15, 3, 3, 0, 0)) - return roadmark - - -def std_roadmark_broken_broken(): - roadmark = RoadMark(RoadMarkType.broken_broken) - roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, 0.2, 0)) - roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, -0.2, 0)) - return roadmark - - -def std_roadmark_solid_solid(): - roadmark = RoadMark(RoadMarkType.solid_solid) - roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, 0.2, 0)) - roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, -0.2, 0)) - return roadmark - - -def std_roadmark_solid_broken(): - roadmark = RoadMark(RoadMarkType.solid_broken) - roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, 0.2, 0)) - roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, -0.2, 0)) - return roadmark - - -def std_roadmark_broken_solid(): - roadmark = RoadMark(RoadMarkType.broken_solid) - roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, -0.2, 0)) - roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, 0.2, 0)) - return roadmark - - STD_ROADMARK_SOLID = std_roadmark_broken() STD_ROADMARK_BROKEN = std_roadmark_broken() STD_ROADMARK_BROKEN_TIGHT = std_roadmark_broken_tight() @@ -125,249 +87,6 @@ def standard_lane(offset=3, rm=std_roadmark_broken()): return lc -def create_lanes_merge_split( - right_lane_def, - left_lane_def, - road_length, - center_road_mark, - lane_width, - lane_width_end, -): - """create_lanes_merge_split is a generator that will create the Lanes of a road road that can contain one or more lane merges/splits - This is a simple implementation and has some constraints: - - left and right merges has to be at the same place (or one per lane), TODO: will be fixed with the singleSide attribute later on. - - the change will be a 3 degree polynomial with the derivative 0 on both start and end. - - Please note that the merges/splits are defined in the road direction, NOT the driving direction. - - Parameters - ---------- - right_lane_def (list of LaneDef, or an int): a list of the splits/merges that are wanted on the right side of the road, if int constant number of lanes - - left_lane_def (list of LaneDef, or an int): a list of the splits/merges that are wanted on the left side of the road, if int constant number of lanes. - - road_length (float): the full length of the road - - center_road_mark (RoadMark): roadmark for the center line - - lane_width (float): the width of the lanes - - lane_width_end (float): the end width of the lanes - - Return - ------ - road (Lanes): the lanes of a road - """ - - lanesections = [] - # expand the lane list - - right_lane, left_lane = _create_lane_lists( - right_lane_def, left_lane_def, road_length, lane_width - ) - - # create the lanesections needed - for ls in range(len(left_lane)): - lc = Lane(a=0) - lc.add_roadmark(copy.deepcopy(center_road_mark)) - lsec = LaneSection(left_lane[ls].s_start, lc) - # do the right lanes - for i in range(max(right_lane[ls].n_lanes_start, right_lane[ls].n_lanes_end)): - # add broken roadmarks for all lanes, except for the outer lane where a solid line is added - if i == max(right_lane[ls].n_lanes_start, right_lane[ls].n_lanes_end) - 1: - rm = std_roadmark_solid() - else: - rm = std_roadmark_broken() - - # check if the number of lanes should change or not - if ( - right_lane[ls].n_lanes_start > right_lane[ls].n_lanes_end - and i == np.abs(right_lane[ls].sub_lane) - 1 - ): - # lane merge - coeff = get_coeffs_for_poly3( - right_lane[ls].s_end - right_lane[ls].s_start, - right_lane[ls].lane_start_widths[i], - False, - right_lane[ls].lane_end_widths[i], - ) - rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - rightlane.add_roadmark(rm) - elif ( - right_lane[ls].n_lanes_start < right_lane[ls].n_lanes_end - and i == np.abs(right_lane[ls].sub_lane) - 1 - ): - # lane split - coeff = get_coeffs_for_poly3( - right_lane[ls].s_end - right_lane[ls].s_start, - right_lane[ls].lane_start_widths[i], - True, - right_lane[ls].lane_end_widths[i], - ) - rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - rightlane.add_roadmark(rm) - elif (lane_width_end is not None) and (lane_width != lane_width_end): - coeff = get_coeffs_for_poly3( - right_lane[ls].s_end - right_lane[ls].s_start, - lane_width, - False, - lane_width_end=lane_width_end, - ) - rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - rightlane.add_roadmark(rm) - elif right_lane[ls].lane_start_widths: - coeff = get_coeffs_for_poly3( - right_lane[ls].s_end - right_lane[ls].s_start, - right_lane[ls].lane_start_widths[i], - False, - lane_width_end=right_lane[ls].lane_end_widths[i], - ) - rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - rightlane.add_roadmark(rm) - else: - rightlane = standard_lane(lane_width, rm) - - lsec.add_right_lane(rightlane) - - # do the left lanes - for i in range(max(left_lane[ls].n_lanes_start, left_lane[ls].n_lanes_end)): - # add broken roadmarks for all lanes, except for the outer lane where a solid line is added - if i == max(left_lane[ls].n_lanes_start, left_lane[ls].n_lanes_end) - 1: - rm = std_roadmark_solid() - else: - rm = std_roadmark_broken() - - # check if the number of lanes should change or not - if ( - left_lane[ls].n_lanes_start < left_lane[ls].n_lanes_end - and i == left_lane[ls].sub_lane - 1 - ): - # lane split - coeff = get_coeffs_for_poly3( - left_lane[ls].s_end - left_lane[ls].s_start, - left_lane[ls].lane_start_widths[i], - True, - left_lane[ls].lane_end_widths[i], - ) - leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - leftlane.add_roadmark(rm) - elif ( - left_lane[ls].n_lanes_start > left_lane[ls].n_lanes_end - and i == left_lane[ls].sub_lane - 1 - ): - # lane merge - coeff = get_coeffs_for_poly3( - left_lane[ls].s_end - left_lane[ls].s_start, - left_lane[ls].lane_start_widths[i], - False, - left_lane[ls].lane_end_widths[i], - ) - leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - leftlane.add_roadmark(rm) - elif (lane_width_end is not None) and (lane_width != lane_width_end): - coeff = get_coeffs_for_poly3( - left_lane[ls].s_end - left_lane[ls].s_start, - lane_width, - False, - lane_width_end=lane_width_end, - ) - leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - leftlane.add_roadmark(rm) - elif left_lane[ls].lane_start_widths: - coeff = get_coeffs_for_poly3( - left_lane[ls].s_end - left_lane[ls].s_start, - left_lane[ls].lane_start_widths[i], - False, - lane_width_end=left_lane[ls].lane_end_widths[i], - ) - leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) - leftlane.add_roadmark(rm) - else: - leftlane = standard_lane(lane_width, rm) - - lsec.add_left_lane(leftlane) - - lanesections.append(lsec) - - # create the lane linker to link the lanes correctly - lanelinker = LaneLinker() - for i in range(1, len(right_lane)): - if right_lane[i].n_lanes_end > right_lane[i].n_lanes_start: - # lane split - for j in range(0, right_lane[i - 1].n_lanes_end + 1): - # adjust for the new lane - if right_lane[i].sub_lane < -(j + 1): - lanelinker.add_link( - lanesections[i - 1].rightlanes[j], lanesections[i].rightlanes[j] - ) - elif right_lane[i].sub_lane > -(j + 1): - lanelinker.add_link( - lanesections[i - 1].rightlanes[j - 1], - lanesections[i].rightlanes[j], - ) - elif right_lane[i - 1].n_lanes_end < right_lane[i - 1].n_lanes_start: - # lane merge - for j in range(0, right_lane[i - 1].n_lanes_end + 1): - # adjust for the lost lane - if right_lane[i - 1].sub_lane < -(j + 1): - lanelinker.add_link( - lanesections[i - 1].rightlanes[j], lanesections[i].rightlanes[j] - ) - elif right_lane[i - 1].sub_lane > -(j + 1): - lanelinker.add_link( - lanesections[i - 1].rightlanes[j], - lanesections[i].rightlanes[j - 1], - ) - - else: - # same number of lanes, just add the links - for j in range(right_lane[i - 1].n_lanes_end): - lanelinker.add_link( - lanesections[i - 1].rightlanes[j], lanesections[i].rightlanes[j] - ) - - for i in range(1, len(left_lane)): - if left_lane[i].n_lanes_end > left_lane[i].n_lanes_start: - # lane split - for j in range(0, left_lane[i - 1].n_lanes_end + 1): - # adjust for the new lane - if left_lane[i].sub_lane < (j + 1): - lanelinker.add_link( - lanesections[i - 1].leftlanes[j - 1], - lanesections[i].leftlanes[j], - ) - elif left_lane[i].sub_lane > (j + 1): - lanelinker.add_link( - lanesections[i - 1].leftlanes[j], lanesections[i].leftlanes[j] - ) - elif left_lane[i - 1].n_lanes_end < left_lane[i - 1].n_lanes_start: - # lane merge - for j in range(0, left_lane[i - 1].n_lanes_end + 1): - # adjust for the lost lane - if left_lane[i - 1].sub_lane < (j + 1): - lanelinker.add_link( - lanesections[i - 1].leftlanes[j], - lanesections[i].leftlanes[j - 1], - ) - elif left_lane[i - 1].sub_lane > (j + 1): - lanelinker.add_link( - lanesections[i - 1].leftlanes[j], lanesections[i].leftlanes[j] - ) - - else: - # same number of lanes, just add the links - for j in range(left_lane[i - 1].n_lanes_end): - lanelinker.add_link( - lanesections[i - 1].leftlanes[j], lanesections[i].leftlanes[j] - ) - - # Add the lanesections to the lanes struct together the lanelinker - lanes = Lanes() - for ls in lanesections: - lanes.add_lanesection(ls, lanelinker) - return lanes - - def create_road( geometry, id, @@ -384,7 +103,7 @@ def create_road( Parameters ---------- - geometry (Line, Spiral, ParamPoly3, or Arc, or list with these): geometries to build the road + geometry (Line, Spiral, ParamPoly3, Arc, a list of(Line, Spiral, ParamPoly3, Arc), or AdjustablePlanview ): geometries to build the road id (int): id of the new road @@ -406,36 +125,63 @@ def create_road( ------- road (Road): a straight road """ + + if isinstance(left_lanes, LaneDef): + left_lanes = [left_lanes] + if isinstance(right_lanes, LaneDef): + right_lanes = [right_lanes] + pv = PlanView() raw_length = 0 if isinstance(geometry, list): for g in geometry: pv.add_geometry(g) raw_length += g.length + elif isinstance(geometry, AdjustablePlanview): + pv = geometry + pv.left_lane_defs = left_lanes + pv.right_lane_defs = right_lanes + pv.center_road_mark = center_road_mark + pv.lane_width = lane_width + pv.lane_width_end = lane_width_end + # create a dummy length + raw_length = 0 + + if isinstance(left_lanes, list) and any( + isinstance(x, LaneDef) for x in left_lanes + ): + raw_length = max(raw_length, max([x.s_end for x in left_lanes])) + if isinstance(right_lanes, list) and any( + isinstance(x, LaneDef) for x in right_lanes + ): + raw_length = max(raw_length, max([x.s_end for x in right_lanes])) + if raw_length == 0: + raw_length = 100 # just a dummy value else: pv.add_geometry(geometry) raw_length += geometry.length - if isinstance(left_lanes, LaneDef): - left_lanes = [left_lanes] - if isinstance(right_lanes, LaneDef): - right_lanes = [right_lanes] - if (lane_width_end is not None) and ( (type(left_lanes) != int) or (type(right_lanes) != int) ): raise RuntimeError( "lane_width_end can only be used when left_lanes and right_lanes are int" ) - - lanes = create_lanes_merge_split( - right_lanes, - left_lanes, - raw_length, - center_road_mark, - lane_width, - lane_width_end=lane_width_end, - ) + if ( + right_lanes is not None + and left_lanes is not None + or center_road_mark is not None + ): + lanes = create_lanes_merge_split( + right_lanes, + left_lanes, + raw_length, + center_road_mark, + lane_width, + lane_width_end=lane_width_end, + ) + else: + lanes = None road = Road(id, pv, lanes, road_type=road_type) @@ -1221,322 +967,3 @@ def get_road_by_id(roads, id): for r in roads: if r.id == id: return r - - -class LaneDef: - """LaneDef is used to help create a lane merge or split. Can handle one lane merging or spliting. - - NOTE: This is not part of the OpenDRIVE standard, but a helper for the xodr module. - - Parameters - ---------- - s_start (float): s coordinate of the start of the change - - s_end (float): s coordinate of the end of the change - - n_lanes_start (int): number of lanes at s_start - - n_lanes_end (int): number of lanes at s_end - - sub_lane (int): the lane that should be created (split) or removed (merge) - - lane_start_widths (list of float): widths of lanes at start, must be [] or same length as n_lanes_start - Default: [] - - lane_end_widths (list of float): widths of lanes at end, must be [] or same length as n_lanes_end - Default: same as lane_start_widths - - Attributes - ---------- - s_start (float): s coordinate of the start of the change - - s_end (float): s coordinate of the end of the change - - n_lanes_start (int): number of lanes at s_start - - n_lanes_end (int): number of lanes at s_end - - sub_lane (int): the lane that should be created (split) or removed (merge) - - lane_start_widths (list of float): widths of lanes at start, must be [] or same length as n_lanes_start - - lane_end_widths (list of float): widths of lanes at end, must be [] or same length as n_lanes_end - """ - - def __init__( - self, - s_start, - s_end, - n_lanes_start, - n_lanes_end, - sub_lane=None, - lane_start_widths=[], - lane_end_widths=[], - ): - self.s_start = s_start - self.s_end = s_end - self.n_lanes_start = n_lanes_start - self.n_lanes_end = n_lanes_end - self.sub_lane = sub_lane - self.lane_start_widths = lane_start_widths - if lane_end_widths == []: - self.lane_end_widths = self.lane_start_widths.copy() - else: - self.lane_end_widths = lane_end_widths - - def _adjust_lane_widths(self): - if self.sub_lane: - if self.lane_end_widths and len(self.lane_end_widths) < self.n_lanes_start: - # mergeo - self.lane_end_widths.insert(abs(self.sub_lane) - 1, 0) - elif ( - self.lane_start_widths - and len(self.lane_start_widths) < self.n_lanes_end - ): - # split - self.lane_start_widths.insert(abs(self.sub_lane) - 1, 0) - # TODO: add some checks here? - - -def _create_lane_lists(right, left, tot_length, default_lane_width): - """_create_lane_lists is a function used by create_lanes_merge_split to expand the list of LaneDefs to be used to create stuffs - - Parameters - ---------- - right (list of LaneDef, or int): the list of LaneDef for the right lane - - left (list of LaneDef, or int): the list of LaneDef for the left lane - - tot_length (float): the total length of the road - - default_lane_width (float): lane_width to be used if not defined in LaneDef - """ - - # TODO: implement for left and right lanesection... - def _check_lane_widths(lane): - if lane.lane_start_widths == []: - lane.lane_start_widths = [ - default_lane_width for x in range(lane.n_lanes_start) - ] - if lane.lane_end_widths == []: - lane.lane_end_widths = [default_lane_width for x in range(lane.n_lanes_end)] - - const_right_lanes = None - const_left_lanes = None - - retlanes_right = [] - retlanes_left = [] - present_s = 0 - - r_it = 0 - l_it = 0 - # some primariy checks to handle int instead of LaneDef - - if not isinstance(right, list): - const_right_lanes = right - right = [] - - if not isinstance(left, list): - const_left_lanes = left - left = [] - - while present_s < tot_length: - if r_it < len(right): - # check if there is still a right LaneDef to be used, and is the next one to add - if right[r_it].s_start == present_s: - add_right = True - else: - next_right = right[r_it].s_start - add_right = False - n_r_lanes = right[r_it].n_lanes_start - else: - # no more LaneDefs, just add new right lanes with the const/or last number of lanes - add_right = False - next_right = tot_length - if const_right_lanes or const_right_lanes == 0: - n_r_lanes = const_right_lanes - else: - n_r_lanes = right[-1].n_lanes_end - - if l_it < len(left): - # check if there is still a left LaneDef to be used, and is the next one to add - if left[l_it].s_start == present_s: - add_left = True - else: - next_left = left[l_it].s_start - add_left = False - n_l_lanes = left[l_it].n_lanes_start - else: - # no more LaneDefs, just add new left lanes with the const/or last number of lanes - add_left = False - next_left = tot_length - if const_left_lanes or const_left_lanes == 0: - n_l_lanes = const_left_lanes - else: - n_l_lanes = left[-1].n_lanes_end - - # create and add the requiered LaneDefs - if not add_left and not add_right: - # no LaneDefs, just add same amout of lanes - s_end = min(next_left, next_right) - if const_right_lanes is not None: - retlanes_right.append( - LaneDef( - present_s, - s_end, - n_r_lanes, - n_r_lanes, - lane_start_widths=[ - default_lane_width for x in range(n_r_lanes) - ], - lane_end_widths=[default_lane_width for x in range(n_r_lanes)], - ) - ) - else: - lane_start_widths = [default_lane_width for x in range(n_r_lanes)] - lane_end_widths = [default_lane_width for x in range(n_r_lanes)] - if r_it == len(right): - if right[r_it - 1].lane_end_widths: - lane_start_widths = right[r_it - 1].lane_end_widths.copy() - lane_end_widths = right[r_it - 1].lane_end_widths.copy() - elif right[r_it].lane_start_widths: - lane_start_widths = right[r_it].lane_start_widths.copy() - lane_end_widths = right[r_it].lane_start_widths.copy() - retlanes_right.append( - LaneDef( - present_s, - s_end, - n_r_lanes, - n_r_lanes, - lane_start_widths=lane_start_widths, - lane_end_widths=lane_end_widths, - ) - ) - if const_left_lanes is not None: - retlanes_left.append( - LaneDef( - present_s, - s_end, - n_l_lanes, - n_l_lanes, - lane_start_widths=[ - default_lane_width for x in range(n_l_lanes) - ], - lane_end_widths=[default_lane_width for x in range(n_l_lanes)], - ) - ) - else: - lane_start_widths = [default_lane_width for x in range(n_l_lanes)] - lane_end_widths = [default_lane_width for x in range(n_l_lanes)] - if l_it == len(left): - if left[l_it - 1].lane_end_widths: - lane_start_widths = left[l_it - 1].lane_end_widths.copy() - lane_end_widths = left[l_it - 1].lane_end_widths.copy() - elif left[l_it].lane_start_widths: - lane_start_widths = left[l_it].lane_start_widths.copy() - lane_end_widths = left[l_it].lane_start_widths.copy() - - retlanes_left.append( - LaneDef( - present_s, - s_end, - n_l_lanes, - n_l_lanes, - lane_start_widths=lane_start_widths, - lane_end_widths=lane_end_widths, - ) - ) - - present_s = s_end - elif add_left and add_right: - # Both have changes in the amount of lanes, - _check_lane_widths(left[l_it]) - _check_lane_widths(right[r_it]) - retlanes_left.append(left[l_it]) - retlanes_right.append(right[r_it]) - present_s = left[l_it].s_end - r_it += 1 - l_it += 1 - elif add_right: - # only the right lane changes the amount of lanes, and add a LaneDef with the same amount of lanes to the left - _check_lane_widths(right[r_it]) - retlanes_right.append(right[r_it]) - retlanes_left.append( - LaneDef( - present_s, - right[r_it].s_end, - n_l_lanes, - n_l_lanes, - lane_start_widths=[default_lane_width for x in range(n_l_lanes)], - lane_end_widths=[default_lane_width for x in range(n_l_lanes)], - ) - ) - present_s = right[r_it].s_end - r_it += 1 - elif add_left: - # only the left lane changes the amount of lanes, and add a LaneDef with the same amount of lanes to the right - _check_lane_widths(left[l_it]) - retlanes_left.append(left[l_it]) - retlanes_right.append( - LaneDef( - present_s, - left[l_it].s_end, - n_r_lanes, - n_r_lanes, - lane_start_widths=[default_lane_width for x in range(n_r_lanes)], - lane_end_widths=[default_lane_width for x in range(n_r_lanes)], - ) - ) - present_s = left[l_it].s_end - l_it += 1 - [x._adjust_lane_widths() for x in retlanes_right] - [x._adjust_lane_widths() for x in retlanes_left] - return retlanes_right, retlanes_left - - -def get_coeffs_for_poly3(length, lane_offset, zero_start, lane_width_end=None): - """get_coeffs_for_poly3 creates the coefficients for a third degree polynomial, can be used for all kinds of descriptions in xodr. - - Assuming that the derivative is 0 at the start and end of the segment. - - Parameters - ---------- - length (float): length of the segment in the s direction - - lane_offset (float): the lane offset (width) of the lane - - zero_start (bool): True; start with zero and ends with lane_offset width, - False; start with lane_offset and ends with zero width - - lane_width_end (float): specify the ending lane width for lanes that may start - and end with different widths - - Return - ------ - coefficients (float,float,float,float): polynomial coefficients corresponding to "a, b, c, d" in the OpenDrive polynomials - """ - # might be expanded for other cases, not now if needed yet though - start_heading = 0 - end_heading = 0 - s0 = 0 - s1 = length - - # create the linear system - A = np.array( - [ - [0, 1, 2 * s0, 3 * s0**2], - [0, 1, 2 * s1, 3 * s1**2], - [1, s0, s0**2, s0**3], - [1, s1, s1**2, s1**3], - ] - ) - if zero_start: - B = [start_heading, end_heading, 0, lane_offset] - else: - B = [start_heading, end_heading, lane_offset, 0] - - if lane_width_end is not None: - B = [start_heading, end_heading, lane_offset, lane_width_end] - - # calculate and return the coefficients - return np.linalg.solve(A, B) diff --git a/scenariogeneration/xodr/geometry.py b/scenariogeneration/xodr/geometry.py index 610a898..ffd3579 100644 --- a/scenariogeneration/xodr/geometry.py +++ b/scenariogeneration/xodr/geometry.py @@ -27,6 +27,26 @@ def wrap_pi(angle): return angle % (2 * np.pi) +class AdjustablePlanview: + """AdjustablePlanview can be used to fit a geometry between two fixed roads.""" + + def __init__( + self, + left_lane_defs=None, + right_lane_defs=None, + center_road_mark=None, + lane_width=None, + lane_width_end=None, + ): + self.fixed = False + self.adjusted = False + self.left_lane_defs = left_lane_defs + self.right_lane_defs = right_lane_defs + self.center_road_mark = center_road_mark + self.lane_width = lane_width + self.lane_width_end = lane_width_end + + class PlanView(XodrBase): """the PlanView is the geometrical description of a road, diff --git a/scenariogeneration/xodr/junction_creator.py b/scenariogeneration/xodr/junction_creator.py index 5d71a9f..aae6a17 100644 --- a/scenariogeneration/xodr/junction_creator.py +++ b/scenariogeneration/xodr/junction_creator.py @@ -295,7 +295,7 @@ def _handle_connection_input(self, road, road_connection): road_connection (str): the connection type (predecessor or successor) """ - + # if isinstance(road.planview, AdjustablePlanview): if road_connection is not None: if road_connection == "successor": road.add_successor(ElementType.junction, self.id) @@ -342,9 +342,9 @@ def _get_list_index(self, id): index (int) """ - return [ - self.incoming_roads.index(x) for x in self.incoming_roads if x.id == id - ][0] + for i in range(len(self.incoming_roads)): + if self.incoming_roads[i].id == id: + return i def _set_offset_for_incoming_road(self, road_idx, connecting_road_id, offset): """_set_offset_for_incoming_road is a helper function to set the correct offsets for a incoming road diff --git a/scenariogeneration/xodr/lane_def.py b/scenariogeneration/xodr/lane_def.py new file mode 100644 index 0000000..44f244b --- /dev/null +++ b/scenariogeneration/xodr/lane_def.py @@ -0,0 +1,583 @@ +""" + scenariogeneration + https://github.com/pyoscx/scenariogeneration + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this + file, You can obtain one at https://mozilla.org/MPL/2.0/. + + Copyright (c) 2022 The scenariogeneration Authors. + + + This is a collection methods and classes not related to OpenDRIVE, but relates to automation of lane creations + +""" +import copy +import numpy as np + +from .links import LaneLinker +from .utils import get_coeffs_for_poly3 +from .lane import RoadMark, RoadMarkType, RoadLine, Lane, LaneSection, Lanes + + +def std_roadmark_solid(): + return RoadMark(RoadMarkType.solid, 0.2) + + +def std_roadmark_broken(): + roadmark = RoadMark(RoadMarkType.broken, 0.2) + roadmark.add_specific_road_line(RoadLine(0.15, 3, 9, 0, 0)) + return roadmark + + +def std_roadmark_broken_long_line(): + roadmark = RoadMark(RoadMarkType.broken, 0.2) + roadmark.add_specific_road_line(RoadLine(0.15, 9, 3, 0, 0)) + return roadmark + + +def std_roadmark_broken_tight(): + roadmark = RoadMark(RoadMarkType.broken, 0.2) + roadmark.add_specific_road_line(RoadLine(0.15, 3, 3, 0, 0)) + return roadmark + + +def std_roadmark_broken_broken(): + roadmark = RoadMark(RoadMarkType.broken_broken) + roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, 0.2, 0)) + roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, -0.2, 0)) + return roadmark + + +def std_roadmark_solid_solid(): + roadmark = RoadMark(RoadMarkType.solid_solid) + roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, 0.2, 0)) + roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, -0.2, 0)) + return roadmark + + +def std_roadmark_solid_broken(): + roadmark = RoadMark(RoadMarkType.solid_broken) + roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, 0.2, 0)) + roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, -0.2, 0)) + return roadmark + + +def std_roadmark_broken_solid(): + roadmark = RoadMark(RoadMarkType.broken_solid) + roadmark.add_specific_road_line(RoadLine(0.2, 0, 0, -0.2, 0)) + roadmark.add_specific_road_line(RoadLine(0.2, 3, 3, 0.2, 0)) + return roadmark + + +def create_lanes_merge_split( + right_lane_def, + left_lane_def, + road_length, + center_road_mark, + lane_width, + lane_width_end, +): + """create_lanes_merge_split is a generator that will create the Lanes of a road road that can contain one or more lane merges/splits + This is a simple implementation and has some constraints: + - left and right merges has to be at the same place (or one per lane), TODO: will be fixed with the singleSide attribute later on. + - the change will be a 3 degree polynomial with the derivative 0 on both start and end. + + Please note that the merges/splits are defined in the road direction, NOT the driving direction. + + Parameters + ---------- + right_lane_def (list of LaneDef, or an int): a list of the splits/merges that are wanted on the right side of the road, if int constant number of lanes + + left_lane_def (list of LaneDef, or an int): a list of the splits/merges that are wanted on the left side of the road, if int constant number of lanes. + + road_length (float): the full length of the road + + center_road_mark (RoadMark): roadmark for the center line + + lane_width (float): the width of the lanes + + lane_width_end (float): the end width of the lanes + + Return + ------ + road (Lanes): the lanes of a road + """ + + lanesections = [] + # expand the lane list + right_lane, left_lane = _create_lane_lists( + right_lane_def, left_lane_def, road_length, lane_width + ) + + # create the lanesections needed + for ls in range(len(left_lane)): + lc = Lane(a=0) + lc.add_roadmark(copy.deepcopy(center_road_mark)) + lsec = LaneSection(left_lane[ls].s_start, lc) + # do the right lanes + for i in range(max(right_lane[ls].n_lanes_start, right_lane[ls].n_lanes_end)): + # add broken roadmarks for all lanes, except for the outer lane where a solid line is added + if i == max(right_lane[ls].n_lanes_start, right_lane[ls].n_lanes_end) - 1: + rm = std_roadmark_solid() + else: + rm = std_roadmark_broken() + + # check if the number of lanes should change or not + if ( + right_lane[ls].n_lanes_start > right_lane[ls].n_lanes_end + and i == np.abs(right_lane[ls].sub_lane) - 1 + ): + # lane merge + coeff = get_coeffs_for_poly3( + right_lane[ls].s_end - right_lane[ls].s_start, + right_lane[ls].lane_start_widths[i], + False, + right_lane[ls].lane_end_widths[i], + ) + rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + rightlane.add_roadmark(rm) + elif ( + right_lane[ls].n_lanes_start < right_lane[ls].n_lanes_end + and i == np.abs(right_lane[ls].sub_lane) - 1 + ): + # lane split + coeff = get_coeffs_for_poly3( + right_lane[ls].s_end - right_lane[ls].s_start, + right_lane[ls].lane_start_widths[i], + True, + right_lane[ls].lane_end_widths[i], + ) + rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + rightlane.add_roadmark(rm) + elif (lane_width_end is not None) and (lane_width != lane_width_end): + coeff = get_coeffs_for_poly3( + right_lane[ls].s_end - right_lane[ls].s_start, + lane_width, + False, + lane_width_end=lane_width_end, + ) + rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + rightlane.add_roadmark(rm) + elif right_lane[ls].lane_start_widths: + coeff = get_coeffs_for_poly3( + right_lane[ls].s_end - right_lane[ls].s_start, + right_lane[ls].lane_start_widths[i], + False, + lane_width_end=right_lane[ls].lane_end_widths[i], + ) + rightlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + rightlane.add_roadmark(rm) + else: + rightlane = Lane(lane_width) + rightlane.add_roadmark(rm) + lsec.add_right_lane(rightlane) + + # do the left lanes + for i in range(max(left_lane[ls].n_lanes_start, left_lane[ls].n_lanes_end)): + # add broken roadmarks for all lanes, except for the outer lane where a solid line is added + if i == max(left_lane[ls].n_lanes_start, left_lane[ls].n_lanes_end) - 1: + rm = std_roadmark_solid() + else: + rm = std_roadmark_broken() + + # check if the number of lanes should change or not + if ( + left_lane[ls].n_lanes_start < left_lane[ls].n_lanes_end + and i == left_lane[ls].sub_lane - 1 + ): + # lane split + coeff = get_coeffs_for_poly3( + left_lane[ls].s_end - left_lane[ls].s_start, + left_lane[ls].lane_start_widths[i], + True, + left_lane[ls].lane_end_widths[i], + ) + leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + leftlane.add_roadmark(rm) + elif ( + left_lane[ls].n_lanes_start > left_lane[ls].n_lanes_end + and i == left_lane[ls].sub_lane - 1 + ): + # lane merge + coeff = get_coeffs_for_poly3( + left_lane[ls].s_end - left_lane[ls].s_start, + left_lane[ls].lane_start_widths[i], + False, + left_lane[ls].lane_end_widths[i], + ) + leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + leftlane.add_roadmark(rm) + elif (lane_width_end is not None) and (lane_width != lane_width_end): + coeff = get_coeffs_for_poly3( + left_lane[ls].s_end - left_lane[ls].s_start, + lane_width, + False, + lane_width_end=lane_width_end, + ) + leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + leftlane.add_roadmark(rm) + elif left_lane[ls].lane_start_widths: + coeff = get_coeffs_for_poly3( + left_lane[ls].s_end - left_lane[ls].s_start, + left_lane[ls].lane_start_widths[i], + False, + lane_width_end=left_lane[ls].lane_end_widths[i], + ) + leftlane = Lane(a=coeff[0], b=coeff[1], c=coeff[2], d=coeff[3]) + leftlane.add_roadmark(rm) + else: + leftlane = Lane(lane_width) + leftlane.add_roadmark(rm) + lsec.add_left_lane(leftlane) + + lanesections.append(lsec) + + # create the lane linker to link the lanes correctly + lanelinker = LaneLinker() + for i in range(1, len(right_lane)): + if right_lane[i].n_lanes_end > right_lane[i].n_lanes_start: + # lane split + for j in range(0, right_lane[i - 1].n_lanes_end + 1): + # adjust for the new lane + if right_lane[i].sub_lane < -(j + 1): + lanelinker.add_link( + lanesections[i - 1].rightlanes[j], lanesections[i].rightlanes[j] + ) + elif right_lane[i].sub_lane > -(j + 1): + lanelinker.add_link( + lanesections[i - 1].rightlanes[j - 1], + lanesections[i].rightlanes[j], + ) + elif right_lane[i - 1].n_lanes_end < right_lane[i - 1].n_lanes_start: + # lane merge + for j in range(0, right_lane[i - 1].n_lanes_end + 1): + # adjust for the lost lane + if right_lane[i - 1].sub_lane < -(j + 1): + lanelinker.add_link( + lanesections[i - 1].rightlanes[j], lanesections[i].rightlanes[j] + ) + elif right_lane[i - 1].sub_lane > -(j + 1): + lanelinker.add_link( + lanesections[i - 1].rightlanes[j], + lanesections[i].rightlanes[j - 1], + ) + + else: + # same number of lanes, just add the links + for j in range(right_lane[i - 1].n_lanes_end): + lanelinker.add_link( + lanesections[i - 1].rightlanes[j], lanesections[i].rightlanes[j] + ) + + for i in range(1, len(left_lane)): + if left_lane[i].n_lanes_end > left_lane[i].n_lanes_start: + # lane split + for j in range(0, left_lane[i - 1].n_lanes_end + 1): + # adjust for the new lane + if left_lane[i].sub_lane < (j + 1): + lanelinker.add_link( + lanesections[i - 1].leftlanes[j - 1], + lanesections[i].leftlanes[j], + ) + elif left_lane[i].sub_lane > (j + 1): + lanelinker.add_link( + lanesections[i - 1].leftlanes[j], lanesections[i].leftlanes[j] + ) + elif left_lane[i - 1].n_lanes_end < left_lane[i - 1].n_lanes_start: + # lane merge + for j in range(0, left_lane[i - 1].n_lanes_end + 1): + # adjust for the lost lane + if left_lane[i - 1].sub_lane < (j + 1): + lanelinker.add_link( + lanesections[i - 1].leftlanes[j], + lanesections[i].leftlanes[j - 1], + ) + elif left_lane[i - 1].sub_lane > (j + 1): + lanelinker.add_link( + lanesections[i - 1].leftlanes[j], lanesections[i].leftlanes[j] + ) + + else: + # same number of lanes, just add the links + for j in range(left_lane[i - 1].n_lanes_end): + lanelinker.add_link( + lanesections[i - 1].leftlanes[j], lanesections[i].leftlanes[j] + ) + + # Add the lanesections to the lanes struct together the lanelinker + lanes = Lanes() + for ls in lanesections: + lanes.add_lanesection(ls, lanelinker) + return lanes + + +class LaneDef: + """LaneDef is used to help create a lane merge or split. Can handle one lane merging or spliting. + + NOTE: This is not part of the OpenDRIVE standard, but a helper for the xodr module. + + Parameters + ---------- + s_start (float): s coordinate of the start of the change + + s_end (float): s coordinate of the end of the change + + n_lanes_start (int): number of lanes at s_start + + n_lanes_end (int): number of lanes at s_end + + sub_lane (int): the lane that should be created (split) or removed (merge) + + lane_start_widths (list of float): widths of lanes at start, must be [] or same length as n_lanes_start + Default: [] + + lane_end_widths (list of float): widths of lanes at end, must be [] or same length as n_lanes_end + Default: same as lane_start_widths + + Attributes + ---------- + s_start (float): s coordinate of the start of the change + + s_end (float): s coordinate of the end of the change + + n_lanes_start (int): number of lanes at s_start + + n_lanes_end (int): number of lanes at s_end + + sub_lane (int): the lane that should be created (split) or removed (merge) + + lane_start_widths (list of float): widths of lanes at start, must be [] or same length as n_lanes_start + + lane_end_widths (list of float): widths of lanes at end, must be [] or same length as n_lanes_end + """ + + def __init__( + self, + s_start, + s_end, + n_lanes_start, + n_lanes_end, + sub_lane=None, + lane_start_widths=[], + lane_end_widths=[], + ): + self.s_start = s_start + self.s_end = s_end + self.n_lanes_start = n_lanes_start + self.n_lanes_end = n_lanes_end + self.sub_lane = sub_lane + self.lane_start_widths = lane_start_widths + if lane_end_widths == []: + self.lane_end_widths = self.lane_start_widths.copy() + else: + self.lane_end_widths = lane_end_widths + + def _adjust_lane_widths(self): + if self.sub_lane: + if self.lane_end_widths and len(self.lane_end_widths) < self.n_lanes_start: + # mergeo + self.lane_end_widths.insert(abs(self.sub_lane) - 1, 0) + elif ( + self.lane_start_widths + and len(self.lane_start_widths) < self.n_lanes_end + ): + # split + self.lane_start_widths.insert(abs(self.sub_lane) - 1, 0) + # TODO: add some checks here? + + +def _create_lane_lists(right, left, tot_length, default_lane_width): + """_create_lane_lists is a function used by create_lanes_merge_split to expand the list of LaneDefs to be used to create stuffs + + Parameters + ---------- + right (list of LaneDef, or int): the list of LaneDef for the right lane + + left (list of LaneDef, or int): the list of LaneDef for the left lane + + tot_length (float): the total length of the road + + default_lane_width (float): lane_width to be used if not defined in LaneDef + """ + + # TODO: implement for left and right lanesection... + def _check_lane_widths(lane): + if lane.lane_start_widths == []: + lane.lane_start_widths = [ + default_lane_width for x in range(lane.n_lanes_start) + ] + if lane.lane_end_widths == []: + lane.lane_end_widths = [default_lane_width for x in range(lane.n_lanes_end)] + + const_right_lanes = None + const_left_lanes = None + + retlanes_right = [] + retlanes_left = [] + present_s = 0 + + r_it = 0 + l_it = 0 + # some primariy checks to handle int instead of LaneDef + + if not isinstance(right, list): + const_right_lanes = right + right = [] + + if not isinstance(left, list): + const_left_lanes = left + left = [] + + while present_s < tot_length: + if r_it < len(right): + # check if there is still a right LaneDef to be used, and is the next one to add + if right[r_it].s_start == present_s: + add_right = True + else: + next_right = right[r_it].s_start + add_right = False + n_r_lanes = right[r_it].n_lanes_start + else: + # no more LaneDefs, just add new right lanes with the const/or last number of lanes + add_right = False + next_right = tot_length + if const_right_lanes or const_right_lanes == 0: + n_r_lanes = const_right_lanes + else: + n_r_lanes = right[-1].n_lanes_end + + if l_it < len(left): + # check if there is still a left LaneDef to be used, and is the next one to add + if left[l_it].s_start == present_s: + add_left = True + else: + next_left = left[l_it].s_start + add_left = False + n_l_lanes = left[l_it].n_lanes_start + else: + # no more LaneDefs, just add new left lanes with the const/or last number of lanes + add_left = False + next_left = tot_length + if const_left_lanes or const_left_lanes == 0: + n_l_lanes = const_left_lanes + else: + n_l_lanes = left[-1].n_lanes_end + + # create and add the requiered LaneDefs + if not add_left and not add_right: + # no LaneDefs, just add same amout of lanes + s_end = min(next_left, next_right) + if const_right_lanes is not None: + retlanes_right.append( + LaneDef( + present_s, + s_end, + n_r_lanes, + n_r_lanes, + lane_start_widths=[ + default_lane_width for x in range(n_r_lanes) + ], + lane_end_widths=[default_lane_width for x in range(n_r_lanes)], + ) + ) + else: + lane_start_widths = [default_lane_width for x in range(n_r_lanes)] + lane_end_widths = [default_lane_width for x in range(n_r_lanes)] + if r_it == len(right): + if right[r_it - 1].lane_end_widths: + lane_start_widths = right[r_it - 1].lane_end_widths.copy() + lane_end_widths = right[r_it - 1].lane_end_widths.copy() + elif right[r_it].lane_start_widths: + lane_start_widths = right[r_it].lane_start_widths.copy() + lane_end_widths = right[r_it].lane_start_widths.copy() + retlanes_right.append( + LaneDef( + present_s, + s_end, + n_r_lanes, + n_r_lanes, + lane_start_widths=lane_start_widths, + lane_end_widths=lane_end_widths, + ) + ) + if const_left_lanes is not None: + retlanes_left.append( + LaneDef( + present_s, + s_end, + n_l_lanes, + n_l_lanes, + lane_start_widths=[ + default_lane_width for x in range(n_l_lanes) + ], + lane_end_widths=[default_lane_width for x in range(n_l_lanes)], + ) + ) + else: + lane_start_widths = [default_lane_width for x in range(n_l_lanes)] + lane_end_widths = [default_lane_width for x in range(n_l_lanes)] + if l_it == len(left): + if left[l_it - 1].lane_end_widths: + lane_start_widths = left[l_it - 1].lane_end_widths.copy() + lane_end_widths = left[l_it - 1].lane_end_widths.copy() + elif left[l_it].lane_start_widths: + lane_start_widths = left[l_it].lane_start_widths.copy() + lane_end_widths = left[l_it].lane_start_widths.copy() + + retlanes_left.append( + LaneDef( + present_s, + s_end, + n_l_lanes, + n_l_lanes, + lane_start_widths=lane_start_widths, + lane_end_widths=lane_end_widths, + ) + ) + + present_s = s_end + elif add_left and add_right: + # Both have changes in the amount of lanes, + _check_lane_widths(left[l_it]) + _check_lane_widths(right[r_it]) + retlanes_left.append(left[l_it]) + retlanes_right.append(right[r_it]) + present_s = left[l_it].s_end + r_it += 1 + l_it += 1 + elif add_right: + # only the right lane changes the amount of lanes, and add a LaneDef with the same amount of lanes to the left + _check_lane_widths(right[r_it]) + retlanes_right.append(right[r_it]) + retlanes_left.append( + LaneDef( + present_s, + right[r_it].s_end, + n_l_lanes, + n_l_lanes, + lane_start_widths=[default_lane_width for x in range(n_l_lanes)], + lane_end_widths=[default_lane_width for x in range(n_l_lanes)], + ) + ) + present_s = right[r_it].s_end + r_it += 1 + elif add_left: + # only the left lane changes the amount of lanes, and add a LaneDef with the same amount of lanes to the right + _check_lane_widths(left[l_it]) + retlanes_left.append(left[l_it]) + retlanes_right.append( + LaneDef( + present_s, + left[l_it].s_end, + n_r_lanes, + n_r_lanes, + lane_start_widths=[default_lane_width for x in range(n_r_lanes)], + lane_end_widths=[default_lane_width for x in range(n_r_lanes)], + ) + ) + present_s = left[l_it].s_end + l_it += 1 + [x._adjust_lane_widths() for x in retlanes_right] + [x._adjust_lane_widths() for x in retlanes_left] + return retlanes_right, retlanes_left diff --git a/scenariogeneration/xodr/opendrive.py b/scenariogeneration/xodr/opendrive.py index 0735eb9..d68f81f 100644 --- a/scenariogeneration/xodr/opendrive.py +++ b/scenariogeneration/xodr/opendrive.py @@ -24,7 +24,9 @@ ) from .elevation import LateralProfile, ElevationProfile, _Poly3Profile from .utils import get_lane_sec_and_s_for_lane_calc - +from .geometry import AdjustablePlanview, Spiral, PlanView +from .lane_def import LaneDef, create_lanes_merge_split, std_roadmark_solid +import pyclothoids as pcloth import datetime as dt from itertools import combinations import numpy as np @@ -1078,6 +1080,240 @@ def _connection_sanity_check(self, road_id, connection_type): + " have a mismatch in connections, please check predecessors/sucessors and contact points." ) + def _create_adjustable_planview( + self, + road_id, + predecessor_id, + predecessor_contact_point, + successor_id, + successor_contact_point, + ): + def recalculate_xy( + lane_offset, road, lanesection, x, y, h, common_direct_signs=1 + ): + dist = 0 + start_offset = 0 + if lanesection == -1: + dist = road.planview.get_total_length() + if np.sign(lane_offset) == -1: + angle_addition = -common_direct_signs * np.pi / 2 + for lane_iter in range((np.sign(lane_offset) * lane_offset)): + start_offset += ( + road.lanes.lanesections[lanesection] + .rightlanes[lane_iter] + .get_width(dist) + ) + else: + angle_addition = common_direct_signs * np.pi / 2 + for lane_iter in range((np.sign(lane_offset) * lane_offset)): + start_offset += ( + road.lanes.lanesections[lanesection] + .leftlanes[lane_iter] + .get_width(dist) + ) + new_x = x + start_offset * np.cos(h + angle_addition) + new_y = y + start_offset * np.sin(h + angle_addition) + return new_x, new_y + + if predecessor_contact_point == ContactPoint.start: + start_x, start_y, start_h = self.roads[ + predecessor_id + ].planview.get_start_point() + start_lane_section = 0 + start_h = start_h - np.pi + flip_start = True + + elif predecessor_contact_point == ContactPoint.end: + start_x, start_y, start_h = self.roads[ + predecessor_id + ].planview.get_end_point() + start_lane_section = -1 + flip_start = False + + if ( + self.roads[road_id].pred_direct_junction + and int(predecessor_id) in self.roads[road_id].pred_direct_junction + ): + start_x, start_y = recalculate_xy( + self.roads[road_id].pred_direct_junction[int(predecessor_id)], + self.roads[predecessor_id], + start_lane_section, + start_x, + start_y, + start_h, + ) + + if ( + self.roads[road_id].lane_offset_pred + and predecessor_id in self.roads[road_id].lane_offset_pred + and self.roads[road_id].lane_offset_pred[predecessor_id] != 0 + ): + start_x, start_y = recalculate_xy( + self.roads[road_id].lane_offset_pred[predecessor_id], + self.roads[predecessor_id], + start_lane_section, + start_x, + start_y, + start_h, + -1, + ) + + if successor_contact_point == ContactPoint.start: + end_x, end_y, end_h = self.roads[successor_id].planview.get_start_point() + end_lane_section = 0 + flip_end = False + + elif successor_contact_point == ContactPoint.end: + end_x, end_y, end_h = self.roads[successor_id].planview.get_end_point() + end_lane_section = -1 + end_h = end_h - np.pi + flip_end = True + + if ( + self.roads[road_id].succ_direct_junction + and int(successor_id) in self.roads[road_id].succ_direct_junction + ): + end_x, end_y = recalculate_xy( + self.roads[road_id].succ_direct_junction[int(successor_id)], + self.roads[successor_id], + end_lane_section, + end_x, + end_y, + end_h, + ) + + clothoids = pcloth.SolveG2( + start_x, + start_y, + start_h, + 1 / 1000000000, + end_x, + end_y, + end_h, + 1 / 1000000000, + ) + pv = PlanView(start_x, start_y, start_h) + + [ + pv.add_geometry(Spiral(x.KappaStart, x.KappaEnd, length=x.length)) + for x in clothoids + ] + pv.adjust_geometries() + + s_start = 0 + s_end = 0 + if start_lane_section == -1: + s_start = self.roads[predecessor_id].planview.get_total_length() + if end_lane_section == -1: + s_end = self.roads[successor_id].planview.get_total_length() + + if ( + self.roads[road_id].planview.right_lane_defs is None + and self.roads[road_id].planview.left_lane_defs is None + ): + if flip_start: + right_lanes_start = [ + ll.get_width(s_start) + for ll in self.roads[predecessor_id] + .lanes.lanesections[start_lane_section] + .leftlanes + ] + left_lanes_start = [ + rl.get_width(s_start) + for rl in self.roads[predecessor_id] + .lanes.lanesections[start_lane_section] + .rightlanes + ] + else: + left_lanes_start = [ + ll.get_width(s_start) + for ll in self.roads[predecessor_id] + .lanes.lanesections[start_lane_section] + .leftlanes + ] + right_lanes_start = [ + rl.get_width(s_start) + for rl in self.roads[predecessor_id] + .lanes.lanesections[start_lane_section] + .rightlanes + ] + + if flip_end: + right_lanes_end = [ + ll.get_width(s_end) + for ll in self.roads[successor_id] + .lanes.lanesections[end_lane_section] + .leftlanes + ] + left_lanes_end = [ + rl.get_width(s_end) + for rl in self.roads[successor_id] + .lanes.lanesections[end_lane_section] + .rightlanes + ] + else: + left_lanes_end = [ + ll.get_width(s_end) + for ll in self.roads[successor_id] + .lanes.lanesections[end_lane_section] + .leftlanes + ] + right_lanes_end = [ + rl.get_width(s_end) + for rl in self.roads[successor_id] + .lanes.lanesections[end_lane_section] + .rightlanes + ] + if self.roads[road_id].planview.center_road_mark is None: + center_road_mark = ( + self.roads[predecessor_id] + .lanes.lanesections[start_lane_section] + .centerlane.roadmark[0] + ) + else: + center_road_mark = self.roads[road_id].planview.center_road_mark + + lanes = create_lanes_merge_split( + [ + LaneDef( + 0, + pv.get_total_length(), + len(right_lanes_start), + len(right_lanes_end), + None, + right_lanes_start, + right_lanes_end, + ) + ], + [ + LaneDef( + 0, + pv.get_total_length(), + len(left_lanes_start), + len(left_lanes_end), + None, + left_lanes_start, + left_lanes_end, + ) + ], + pv.get_total_length(), + center_road_mark, + None, + lane_width_end=None, + ) + + else: + lanes = create_lanes_merge_split( + self.roads[road_id].planview.right_lane_defs, + self.roads[road_id].planview.left_lane_defs, + pv.get_total_length(), + self.roads[road_id].planview.center_road_mark, + self.roads[road_id].planview.lane_width, + lane_width_end=self.roads[road_id].planview.lane_width_end, + ) + self.roads[road_id].planview = pv + self.roads[road_id].lanes = lanes + def adjust_startpoints(self): """Adjust starting position of all geoemtries of all roads @@ -1110,7 +1346,9 @@ def adjust_startpoints(self): if fixed_road is False: for key in self.roads.keys(): # make sure it is not a connecting road, patching algorithm can't handle that - if self.roads[key].road_type == -1: + if self.roads[key].road_type == -1 and not isinstance( + self.roads[key].planview, AdjustablePlanview + ): self.roads[key].planview.adjust_geometries() break count_total_adjusted_roads += 1 @@ -1120,8 +1358,115 @@ def adjust_startpoints(self): for k in self.roads: # Check all if self.roads[k].planview.adjusted is False: + # check if road is a adjustable planview + if isinstance(self.roads[k].planview, AdjustablePlanview): + predecessor = None + successor = None + + if ( + self.roads[k].predecessor is None + or self.roads[k].successor is None + ): + raise UndefinedRoadNetwork( + "An AdjustablePlanview needs both a predecessor and a successor." + ) + + if self.roads[k].successor.element_type == ElementType.junction: + if self.roads[k].succ_direct_junction: + for key, value in self.roads[ + k + ].succ_direct_junction.items(): + if self.roads[str(key)].planview.adjusted: + successor = str(key) + if ( + self.roads[str(key)].successor + and self.roads[ + str(key) + ].successor.element_type + == ElementType.junction + and self.roads[ + str(key) + ].successor.element_id + == self.roads[k].successor.element_id + ): + suc_contact_point = ContactPoint.end + else: + suc_contact_point = ContactPoint.start + break + else: + raise UndefinedRoadNetwork( + "cannot handle successor connection to a junction with an AdjustablePlanView" + ) + else: + if self.roads[ + str(self.roads[k].successor.element_id) + ].planview.adjusted: + successor = str(self.roads[k].successor.element_id) + suc_contact_point = self.roads[ + k + ].successor.contact_point + + if ( + self.roads[k].predecessor.element_type + == ElementType.junction + ): + if self.roads[k].pred_direct_junction: + for key, value in self.roads[ + k + ].pred_direct_junction.items(): + if self.roads[str(key)].planview.adjusted: + predecessor = str(key) + if ( + self.roads[str(key)].successor + and self.roads[ + str(key) + ].successor.element_type + == ElementType.junction + and self.roads[ + str(key) + ].successor.element_id + == self.roads[k].predecessor.element_id + ): + pred_contact_point = ContactPoint.end + else: + pred_contact_point = ContactPoint.start + break + else: + for r_id, r in self.roads.items(): + if ( + r.road_type + == self.roads[k].predecessor.element_id + and r.planview.adjusted + ): + if r.predecessor.element_id == int(k): + pred_contact_point = ContactPoint.start + predecessor = r_id + break + elif r.successor.element_id == int(k): + pred_contact_point = ContactPoint.end + predecessor = r_id + break + + else: + if self.roads[ + str(self.roads[k].predecessor.element_id) + ].planview.adjusted: + predecessor = str(self.roads[k].predecessor.element_id) + pred_contact_point = self.roads[ + k + ].predecessor.contact_point + if successor and predecessor: + self._create_adjustable_planview( + k, + predecessor, + pred_contact_point, + successor, + suc_contact_point, + ) + count_adjusted_roads += 1 + # check if it has a normal (road) predecessor - if ( + elif ( self.roads[k].predecessor is not None and self.roads[k].predecessor.element_type is not ElementType.junction @@ -1146,6 +1491,12 @@ def adjust_startpoints(self): str(self.roads[k].successor.element_id) ].planview.adjusted is False + and not isinstance( + self.roads[ + str(self.roads[k].successor.element_id) + ].planview, + AdjustablePlanview, + ) ): succ_id = self.roads[k].successor.element_id if ( @@ -1187,6 +1538,12 @@ def adjust_startpoints(self): str(self.roads[k].predecessor.element_id) ].planview.adjusted is False + and not isinstance( + self.roads[ + str(self.roads[k].successor.element_id) + ].planview, + AdjustablePlanview, + ) ): pred_id = self.roads[k].predecessor.element_id if ( diff --git a/scenariogeneration/xodr/utils.py b/scenariogeneration/xodr/utils.py index bde9f39..5e34add 100644 --- a/scenariogeneration/xodr/utils.py +++ b/scenariogeneration/xodr/utils.py @@ -10,6 +10,7 @@ """ import xml.etree.ElementTree as ET +import numpy as np from .enumerations import ContactPoint from ..helpers import enum2str @@ -45,6 +46,54 @@ def get_lane_sec_and_s_for_lane_calc(road, contact_point): return relevant_lanesection, relevant_s +def get_coeffs_for_poly3(length, lane_offset, zero_start, lane_width_end=None): + """get_coeffs_for_poly3 creates the coefficients for a third degree polynomial, can be used for all kinds of descriptions in xodr. + + Assuming that the derivative is 0 at the start and end of the segment. + + Parameters + ---------- + length (float): length of the segment in the s direction + + lane_offset (float): the lane offset (width) of the lane + + zero_start (bool): True; start with zero and ends with lane_offset width, + False; start with lane_offset and ends with zero width + + lane_width_end (float): specify the ending lane width for lanes that may start + and end with different widths + + Return + ------ + coefficients (float,float,float,float): polynomial coefficients corresponding to "a, b, c, d" in the OpenDrive polynomials + """ + # might be expanded for other cases, not now if needed yet though + start_heading = 0 + end_heading = 0 + s0 = 0 + s1 = length + + # create the linear system + A = np.array( + [ + [0, 1, 2 * s0, 3 * s0**2], + [0, 1, 2 * s1, 3 * s1**2], + [1, s0, s0**2, s0**3], + [1, s1, s1**2, s1**3], + ] + ) + if zero_start: + B = [start_heading, end_heading, 0, lane_offset] + else: + B = [start_heading, end_heading, lane_offset, 0] + + if lane_width_end is not None: + B = [start_heading, end_heading, lane_offset, lane_width_end] + + # calculate and return the coefficients + return np.linalg.solve(A, B) + + class XodrBase: """Sets up common functionality for xodr generating classes by enabling userdata inputs diff --git a/tests/test_generators.py b/tests/test_generators.py index 37d6c0b..4bb6649 100644 --- a/tests/test_generators.py +++ b/tests/test_generators.py @@ -15,94 +15,6 @@ from scenariogeneration import xodr, prettyprint -def test_create_lane_lists_only_int(): - right_lanes, left_lanes = xodr.generators._create_lane_lists(3, 3, 100, 3) - assert len(right_lanes) == 1 - assert len(left_lanes) == 1 - assert right_lanes[0].s_start == 0 - assert left_lanes[0].s_start == 0 - assert right_lanes[0].s_end == 100 - assert left_lanes[0].s_end == 100 - assert right_lanes[0].lane_start_widths == [3, 3, 3] - assert left_lanes[0].lane_start_widths == [3, 3, 3] - - -def test_create_lane_lists_only_left(): - right_lanes, left_lanes = xodr.generators._create_lane_lists(0, 3, 100, 3) - assert len(right_lanes) == 1 - assert len(left_lanes) == 1 - assert right_lanes[0].s_start == 0 - assert left_lanes[0].s_start == 0 - assert right_lanes[0].s_end == 100 - assert left_lanes[0].s_end == 100 - assert right_lanes[0].lane_start_widths == [] - assert left_lanes[0].lane_start_widths == [3, 3, 3] - - -def test_create_lane_lists_only_right(): - right_lanes, left_lanes = xodr.generators._create_lane_lists(3, 0, 100, 3) - assert len(right_lanes) == 1 - assert len(left_lanes) == 1 - assert right_lanes[0].s_start == 0 - assert left_lanes[0].s_start == 0 - assert right_lanes[0].s_end == 100 - assert left_lanes[0].s_end == 100 - assert right_lanes[0].lane_start_widths == [3, 3, 3] - assert left_lanes[0].lane_start_widths == [] - - -def test_create_lane_lists_lane_def_no_widths_right(): - right_lanes, left_lanes = xodr.generators._create_lane_lists( - [xodr.LaneDef(10, 90, 3, 4, 2)], 3, 100, 3 - ) - assert len(right_lanes) == 3 - assert len(left_lanes) == 3 - assert right_lanes[0].s_start == 0 - assert left_lanes[0].s_start == 0 - assert right_lanes[0].s_end == 10 - assert left_lanes[0].s_end == 10 - assert right_lanes[0].lane_start_widths == [3, 3, 3] - assert left_lanes[0].lane_start_widths == [3, 3, 3] - - assert right_lanes[1].s_start == 10 - assert left_lanes[1].s_start == 10 - assert right_lanes[1].s_end == 90 - assert left_lanes[1].s_end == 90 - assert right_lanes[1].lane_start_widths == [3, 0, 3, 3] - assert right_lanes[1].lane_end_widths == [3, 3, 3, 3] - assert left_lanes[1].lane_start_widths == [3, 3, 3] - - assert right_lanes[2].s_start == 90 - assert left_lanes[2].s_start == 90 - assert right_lanes[2].s_end == 100 - assert left_lanes[2].s_end == 100 - assert right_lanes[2].lane_start_widths == [3, 3, 3, 3] - assert right_lanes[2].lane_end_widths == [3, 3, 3, 3] - assert left_lanes[2].lane_start_widths == [3, 3, 3] - - -def test_create_lane_lists_lane_def_no_widths_left(): - right_lanes, left_lanes = xodr.generators._create_lane_lists( - 3, [xodr.LaneDef(10, 90, 3, 4, 2)], 100, 3 - ) - assert len(right_lanes) == 3 - assert len(left_lanes) == 3 - assert right_lanes[0].s_start == 0 - assert left_lanes[0].s_start == 0 - assert right_lanes[0].s_end == 10 - assert left_lanes[0].s_end == 10 - assert right_lanes[0].lane_start_widths == [3, 3, 3] - assert left_lanes[0].lane_start_widths == [3, 3, 3] - - assert right_lanes[1].s_start == 10 - assert left_lanes[1].s_start == 10 - assert right_lanes[1].s_end == 90 - assert left_lanes[1].s_end == 90 - assert left_lanes[1].lane_start_widths == [3, 0, 3, 3] - assert left_lanes[1].lane_end_widths == [3, 3, 3, 3] - assert right_lanes[1].lane_start_widths == [3, 3, 3] - - def test_create_left_lane_split_first_lane(): lanedef = xodr.LaneDef(10, 20, 1, 2, 1) lanes = xodr.create_lanes_merge_split( diff --git a/tests/test_lane_def.py b/tests/test_lane_def.py new file mode 100644 index 0000000..dbda693 --- /dev/null +++ b/tests/test_lane_def.py @@ -0,0 +1,89 @@ +from scenariogeneration import xodr, prettyprint + + +def test_create_lane_lists_only_int(): + right_lanes, left_lanes = xodr.lane_def._create_lane_lists(3, 3, 100, 3) + assert len(right_lanes) == 1 + assert len(left_lanes) == 1 + assert right_lanes[0].s_start == 0 + assert left_lanes[0].s_start == 0 + assert right_lanes[0].s_end == 100 + assert left_lanes[0].s_end == 100 + assert right_lanes[0].lane_start_widths == [3, 3, 3] + assert left_lanes[0].lane_start_widths == [3, 3, 3] + + +def test_create_lane_lists_only_left(): + right_lanes, left_lanes = xodr.lane_def._create_lane_lists(0, 3, 100, 3) + assert len(right_lanes) == 1 + assert len(left_lanes) == 1 + assert right_lanes[0].s_start == 0 + assert left_lanes[0].s_start == 0 + assert right_lanes[0].s_end == 100 + assert left_lanes[0].s_end == 100 + assert right_lanes[0].lane_start_widths == [] + assert left_lanes[0].lane_start_widths == [3, 3, 3] + + +def test_create_lane_lists_only_right(): + right_lanes, left_lanes = xodr.lane_def._create_lane_lists(3, 0, 100, 3) + assert len(right_lanes) == 1 + assert len(left_lanes) == 1 + assert right_lanes[0].s_start == 0 + assert left_lanes[0].s_start == 0 + assert right_lanes[0].s_end == 100 + assert left_lanes[0].s_end == 100 + assert right_lanes[0].lane_start_widths == [3, 3, 3] + assert left_lanes[0].lane_start_widths == [] + + +def test_create_lane_lists_lane_def_no_widths_right(): + right_lanes, left_lanes = xodr.lane_def._create_lane_lists( + [xodr.LaneDef(10, 90, 3, 4, 2)], 3, 100, 3 + ) + assert len(right_lanes) == 3 + assert len(left_lanes) == 3 + assert right_lanes[0].s_start == 0 + assert left_lanes[0].s_start == 0 + assert right_lanes[0].s_end == 10 + assert left_lanes[0].s_end == 10 + assert right_lanes[0].lane_start_widths == [3, 3, 3] + assert left_lanes[0].lane_start_widths == [3, 3, 3] + + assert right_lanes[1].s_start == 10 + assert left_lanes[1].s_start == 10 + assert right_lanes[1].s_end == 90 + assert left_lanes[1].s_end == 90 + assert right_lanes[1].lane_start_widths == [3, 0, 3, 3] + assert right_lanes[1].lane_end_widths == [3, 3, 3, 3] + assert left_lanes[1].lane_start_widths == [3, 3, 3] + + assert right_lanes[2].s_start == 90 + assert left_lanes[2].s_start == 90 + assert right_lanes[2].s_end == 100 + assert left_lanes[2].s_end == 100 + assert right_lanes[2].lane_start_widths == [3, 3, 3, 3] + assert right_lanes[2].lane_end_widths == [3, 3, 3, 3] + assert left_lanes[2].lane_start_widths == [3, 3, 3] + + +def test_create_lane_lists_lane_def_no_widths_left(): + right_lanes, left_lanes = xodr.lane_def._create_lane_lists( + 3, [xodr.LaneDef(10, 90, 3, 4, 2)], 100, 3 + ) + assert len(right_lanes) == 3 + assert len(left_lanes) == 3 + assert right_lanes[0].s_start == 0 + assert left_lanes[0].s_start == 0 + assert right_lanes[0].s_end == 10 + assert left_lanes[0].s_end == 10 + assert right_lanes[0].lane_start_widths == [3, 3, 3] + assert left_lanes[0].lane_start_widths == [3, 3, 3] + + assert right_lanes[1].s_start == 10 + assert left_lanes[1].s_start == 10 + assert right_lanes[1].s_end == 90 + assert left_lanes[1].s_end == 90 + assert left_lanes[1].lane_start_widths == [3, 0, 3, 3] + assert left_lanes[1].lane_end_widths == [3, 3, 3, 3] + assert right_lanes[1].lane_start_widths == [3, 3, 3] diff --git a/tests/test_opendrive.py b/tests/test_opendrive.py index d495a3c..d5b6b21 100644 --- a/tests/test_opendrive.py +++ b/tests/test_opendrive.py @@ -266,3 +266,267 @@ def test_odr_road_patching_connection_types_wrong_types_predecessor(): with pytest.raises(xodr.exceptions.MixingDrivingDirection) as e: odr.adjust_roads_and_lanes() + + +def test_adjustable_geometry_road_connection_pre_suc(): + road1 = xodr.create_road( + xodr.Line(100), 1, 2, 2, center_road_mark=xodr.std_roadmark_broken_broken() + ) + road1.planview.set_start_point(0, 0, 0) + road2 = xodr.create_road( + xodr.AdjustablePlanview(), 2, None, None, center_road_mark=None + ) + + road3 = xodr.create_road(xodr.Line(100), 3, 2, 2) + road3.planview.set_start_point(300, 50, 0.3) + + road1.add_successor(xodr.ElementType.road, 2, xodr.ContactPoint.start) + road2.add_predecessor(xodr.ElementType.road, 1, xodr.ContactPoint.end) + road2.add_successor(xodr.ElementType.road, 3, xodr.ContactPoint.start) + road3.add_predecessor(xodr.ElementType.road, 2, xodr.ContactPoint.end) + + odr = xodr.OpenDrive("my road") + odr.add_road(road1) + odr.add_road(road2) + odr.add_road(road3) + odr.adjust_roads_and_lanes() + + x, y, h = road2.planview.get_start_point() + assert pytest.approx(x, 0.01) == 100 + assert pytest.approx(y, 0.01) == 0 + assert pytest.approx(h, 0.01) == 0 + + x, y, h = road2.planview.get_end_point() + assert pytest.approx(x, 0.01) == 300 + assert pytest.approx(y, 0.01) == 50 + assert pytest.approx(h, 0.01) == 0.3 + + +def test_adjustable_geometry_road_connection_pre_pre_suc_suc(): + road1 = xodr.create_road( + xodr.Line(100), 1, 2, 2, center_road_mark=xodr.std_roadmark_broken_broken() + ) + road1.planview.set_start_point(0, 0, 0.5) + road2 = xodr.create_road( + xodr.AdjustablePlanview(), 2, None, None, center_road_mark=None + ) + + road3 = xodr.create_road(xodr.Line(100), 3, 2, 2) + road3.planview.set_start_point(300, 50, 0) + + road1.add_predecessor(xodr.ElementType.road, 2, xodr.ContactPoint.start) + road2.add_predecessor(xodr.ElementType.road, 1, xodr.ContactPoint.start) + road2.add_successor(xodr.ElementType.road, 3, xodr.ContactPoint.end) + road3.add_successor(xodr.ElementType.road, 2, xodr.ContactPoint.end) + + odr = xodr.OpenDrive("my road") + odr.add_road(road1) + odr.add_road(road2) + odr.add_road(road3) + odr.adjust_roads_and_lanes() + + x, y, h = road2.planview.get_start_point() + assert pytest.approx(x, 0.01) == 0 + assert pytest.approx(y, 0.01) == 0 + assert pytest.approx(h, 0.01) == 3.64 + + x, y, h = road2.planview.get_end_point() + assert pytest.approx(x, 0.01) == 400 + assert pytest.approx(y, 0.01) == 50 + assert pytest.approx(h, 0.01) == 3.14 + + +def test_adjustable_geometry_direct_junction_centered(): + road1 = xodr.create_road( + xodr.Line(100), 1, 2, 2, center_road_mark=xodr.std_roadmark_broken_broken() + ) + road1.planview.set_start_point(0, 0, 0) + road2 = xodr.create_road(xodr.AdjustablePlanview(), 2, 2, 2) + + road3 = xodr.create_road(xodr.Line(100), 3, 2, 1) + road3.planview.set_start_point(200, 0, 0) + + road4 = xodr.create_road(xodr.Arc(-0.01, angle=3.14 / 2), 4, 0, 1) + + road1.add_successor(xodr.ElementType.road, 2, xodr.ContactPoint.start) + road2.add_predecessor(xodr.ElementType.road, 1, xodr.ContactPoint.end) + road2.add_successor(xodr.ElementType.junction, 100) + road3.add_predecessor(xodr.ElementType.junction, 100) + road4.add_predecessor(xodr.ElementType.junction, 100) + + jc = xodr.DirectJunctionCreator(100, "my exit") + jc.add_connection(road2, road3) + jc.add_connection(road2, road4, -2, -1) + + odr = xodr.OpenDrive("my road") + odr.add_road(road1) + odr.add_road(road2) + odr.add_road(road3) + odr.add_road(road4) + odr.add_junction_creator(jc) + + odr.adjust_roads_and_lanes() + + x, y, h = road2.planview.get_start_point() + assert pytest.approx(x, 0.01) == 100 + assert pytest.approx(y, 0.01) == 0 + assert pytest.approx(h, 0.01) == 0 + + x, y, h = road2.planview.get_end_point() + assert pytest.approx(x, 0.01) == 200 + assert pytest.approx(y, 0.01) == 0 + assert pytest.approx(h, 0.01) == 0 + + +def test_adjustable_geometry_direct_junction_offsets(): + road1 = xodr.create_road( + xodr.Line(100), + 1, + 2, + 2, + center_road_mark=xodr.std_roadmark_broken_broken(), + lane_width=4, + ) + road1.planview.set_start_point(0, 0, 0) + road2 = xodr.create_road(xodr.Line(100), 2, 1, 1) + + road3 = xodr.create_road(xodr.AdjustablePlanview(), 3, 0, 1) + road4 = xodr.create_road(xodr.Line(50), 4, 0, 1) + road4.planview.set_start_point(140, -20, 0) + + road5 = xodr.create_road(xodr.AdjustablePlanview(), 5, 0, 1) + road6 = xodr.create_road(xodr.Line(50), 6, 0, 1) + road6.planview.set_start_point(200, 30, 3.14) + + road1.add_successor(xodr.ElementType.junction, 100) + road2.add_predecessor(xodr.ElementType.junction, 100) + road3.add_predecessor(xodr.ElementType.junction, 100) + road3.add_successor(xodr.ElementType.road, 4, xodr.ContactPoint.start) + road4.add_predecessor(xodr.ElementType.road, 3, xodr.ContactPoint.end) + road5.add_successor(xodr.ElementType.junction, 100) + road5.add_predecessor(xodr.ElementType.road, 6, xodr.ContactPoint.end) + road6.add_successor(xodr.ElementType.road, 5, xodr.ContactPoint.start) + + jc = xodr.DirectJunctionCreator(100, "my exit") + jc.add_connection(road1, road2) + jc.add_connection(road1, road3, -2, -1) + jc.add_connection(road1, road5, 2, -1) + + odr = xodr.OpenDrive("my road") + odr.add_road(road1) + odr.add_road(road2) + odr.add_road(road3) + odr.add_road(road4) + odr.add_road(road5) + odr.add_road(road6) + odr.add_junction_creator(jc) + odr.adjust_roads_and_lanes() + + x, y, h = road3.planview.get_start_point() + assert pytest.approx(x, 0.01) == 100 + assert pytest.approx(y, 0.01) == -4 + assert pytest.approx(h, 0.01) == 0 + + x, y, h = road3.planview.get_end_point() + assert pytest.approx(x, 0.01) == 140 + assert pytest.approx(y, 0.01) == -20 + assert pytest.approx(h, 0.01) == 0 + + x, y, h = road5.planview.get_start_point() + assert pytest.approx(x, 0.01) == 150 + assert pytest.approx(y, 0.01) == 30 + assert pytest.approx(h, 0.01) == 3.14 + + x, y, h = road5.planview.get_end_point() + assert pytest.approx(x, 0.01) == 100 + assert pytest.approx(y, 0.01) == 4 + assert pytest.approx(h, 0.01) == 3.14 + + +def test_adjustable_geometry_common_junction_centered(): + road1 = xodr.create_road( + xodr.Line(100), 1, 2, 2, center_road_mark=xodr.std_roadmark_broken_broken() + ) + road1.planview.set_start_point(0, 0, 0) + road2 = xodr.create_road(xodr.AdjustablePlanview(), 2, 2, 2) + road3 = xodr.create_road(xodr.Line(100), 3, 2, 2) + + road4 = xodr.create_road(xodr.Line(100), 4, 2, 2) + road4.planview.set_start_point(300, 50, 0) + + jc = xodr.CommonJunctionCreator(100, "my junc") + + jc.add_incoming_road_cartesian_geometry(road1, 0, 0, 0, "successor") + jc.add_incoming_road_cartesian_geometry(road2, 30, 0, -np.pi, "predecessor") + jc.add_incoming_road_cartesian_geometry(road3, 15, 15, -np.pi / 2, "successor") + + jc.add_connection(1, 2) + jc.add_connection(3, 2) + jc.add_connection(3, 1) + road4.add_predecessor(xodr.ElementType.road, 2, xodr.ContactPoint.end) + road2.add_successor(xodr.ElementType.road, 4, xodr.ContactPoint.start) + + odr = xodr.OpenDrive("my road") + odr.add_road(road1) + odr.add_road(road2) + odr.add_road(road3) + odr.add_road(road4) + odr.add_junction_creator(jc) + + odr.adjust_roads_and_lanes() + + x, y, h = road2.planview.get_start_point() + assert pytest.approx(x, 0.01) == 130 + assert pytest.approx(y, 0.01) == 0 + assert pytest.approx(h, 0.01) == 0 + + x, y, h = road2.planview.get_end_point() + assert pytest.approx(x, 0.01) == 300 + assert pytest.approx(y, 0.01) == 50 + assert pytest.approx(h, 0.01) == 0 + + +@pytest.mark.parametrize( + "data", + [ + (2), + (-2), + ], +) +def test_adjustable_geometry_common_junction_offsets(data): + road1 = xodr.create_road( + xodr.Line(100), 1, 2, 2, center_road_mark=xodr.std_roadmark_broken_broken() + ) + road1.planview.set_start_point(0, 0, 0) + road2 = xodr.create_road(xodr.AdjustablePlanview(), 2, 2, 2) + + road4 = xodr.create_road(xodr.Line(100), 4, 2, 2) + road4.planview.set_start_point(300, 50, 0) + + jc = xodr.CommonJunctionCreator(100, "my junc") + + jc.add_incoming_road_cartesian_geometry(road1, 0, 0, 0, "successor") + jc.add_incoming_road_cartesian_geometry(road2, 30, 0, -np.pi, "predecessor") + + jc.add_connection(1, 2, data, data) + + road4.add_predecessor(xodr.ElementType.road, 2, xodr.ContactPoint.end) + road2.add_successor(xodr.ElementType.road, 4, xodr.ContactPoint.start) + + odr = xodr.OpenDrive("my road") + odr.add_road(road1) + odr.add_road(road2) + odr.add_road(road4) + odr.add_junction_creator(jc) + + odr.adjust_roads_and_lanes() + + x, y, h = road2.planview.get_start_point() + assert pytest.approx(x, 0.01) == 130 + assert pytest.approx(y, 0.01) == 0 + assert pytest.approx(h, 0.01) == 0 + + x, y, h = road2.planview.get_end_point() + assert pytest.approx(x, 0.01) == 300 + assert pytest.approx(y, 0.01) == 50 + assert pytest.approx(h, 0.01) == 0