diff --git a/examples/demo.py b/examples/demo.py index 5fb66aa1..10f5dbda 100755 --- a/examples/demo.py +++ b/examples/demo.py @@ -22,7 +22,12 @@ from examples.exampleitems import Box, Circle, Text from gaphas import Canvas, GtkView -from gaphas.guide import GuidePainter +from gaphas.aspect.handlemove import HandleMove, ItemHandleMove +from gaphas.collision import ( + CollisionAvoidingLineHandleMoveMixin, + update_colliding_lines, +) +from gaphas.guide import GuidedItemHandleMoveMixin, GuidePainter from gaphas.item import Line from gaphas.painter import ( BoundingBoxPainter, @@ -41,6 +46,7 @@ zoom_tool, ) from gaphas.tool.rubberband import RubberbandPainter, RubberbandState, rubberband_tool +from gaphas.types import Pos from gaphas.util import text_extents, text_underline # Global undo list @@ -63,6 +69,16 @@ def wrapper(): return wrapper +@HandleMove.register(Line) +class MyLineHandleMove( + CollisionAvoidingLineHandleMoveMixin, GuidedItemHandleMoveMixin, ItemHandleMove +): + """Our custom line handle move, based on guides and (experimental) + collision avoidance.""" + + pass + + class MyBox(Box): """Box with an example connection protocol.""" @@ -232,6 +248,14 @@ def on_delete_focused_clicked(_button): b.connect("clicked", on_delete_focused_clicked) v.add(b) + b = Gtk.Button.new_with_label("Route lines") + + def on_route_lines_clicked(_button): + update_colliding_lines(canvas, view._qtree) + + b.connect("clicked", on_route_lines_clicked) + v.add(b) + v.add(Gtk.Label.new("Export:")) b = Gtk.Button.new_with_label("Write demo.png") diff --git a/gaphas/aspect/connector.py b/gaphas/aspect/connector.py index 4656b58c..d9d5c5c8 100644 --- a/gaphas/aspect/connector.py +++ b/gaphas/aspect/connector.py @@ -53,8 +53,7 @@ def glue(self, sink: ConnectionSinkType) -> Optional[Pos]: ) glue_pos = sink.glue(pos, secondary_pos) if glue_pos and self.allow(sink): - matrix.invert() - new_pos = matrix.transform_point(*glue_pos) + new_pos = matrix.inverse().transform_point(*glue_pos) handle.pos = new_pos return new_pos return None diff --git a/gaphas/collision.py b/gaphas/collision.py new file mode 100644 index 00000000..58407ad7 --- /dev/null +++ b/gaphas/collision.py @@ -0,0 +1,313 @@ +"""Collision avoiding. + +Reroute lines when they cross elements and other lines. + +Limitations: + +- can only deal with normal lines (not orthogonal) +- uses bounding box for grid occupancy (for element and line!) + +THIS FEATURE IS EXPERIMENTAL! +""" + +from __future__ import annotations + +import time +from itertools import groupby +from operator import attrgetter, itemgetter +from typing import Callable, Iterable, Literal, NamedTuple, Tuple, Union + +from gaphas.connections import Handle +from gaphas.decorators import g_async +from gaphas.geometry import intersect_rectangle_line +from gaphas.item import Item, Line +from gaphas.quadtree import Quadtree +from gaphas.segment import Segment +from gaphas.types import Pos +from gaphas.view.gtkview import GtkView +from gaphas.view.model import Model + +Tile = Tuple[int, int] + + +class Node(NamedTuple): + parent: object | None # type should be Node + position: Tile + direction: Tile + g: int + f: int + + +Walker = Callable[[int, int], bool] +Heuristic = Callable[[int, int], int] +Weight = Callable[[int, int, Node], Union[int, Literal["inf"]]] + + +def measure(func): + def _measure(*args, **kwargs): + start = time.time() + try: + return func(*args, **kwargs) + finally: + print(func.__name__, time.time() - start) + + return _measure + + +class CollisionAvoidingLineHandleMoveMixin: + view: GtkView + item: Item + handle: Handle + + def move(self, pos: Pos) -> None: + super().move(pos) # type: ignore[misc] + line = self.item + assert isinstance(line, Line) + if self.handle in (line.head, line.tail): + self.update_line_to_avoid_collisions() + + @g_async(single=True) + def update_line_to_avoid_collisions(self): + model = self.view.model + assert model + assert isinstance(self.item, Line) + update_line_to_avoid_collisions(self.item, model, self.view._qtree) + + +def colliding_lines(qtree: Quadtree) -> Iterable[tuple[Line, Item]]: + lines = ( + item for item in qtree.items if isinstance(item, Line) and not item.orthogonal + ) + for line in lines: + items = qtree.find_intersect(qtree.get_bounds(line)) + if not items: + continue + + segments = [ + ( + line.matrix_i2c.transform_point(*start), + line.matrix_i2c.transform_point(*end), + ) + for start, end in line.segments + ] + for item in items: + if item is line: + continue + bounds = qtree.get_bounds(item) + for seg_start, seg_end in segments: + if intersect_rectangle_line(bounds, seg_end, seg_start): + yield (line, item) + break + + +def update_colliding_lines(model: Model, qtree: Quadtree, grid_size: int = 20) -> None: + for line, _item in colliding_lines(qtree): + update_line_to_avoid_collisions(line, model, qtree, grid_size) + + +def update_line_to_avoid_collisions( + line: Line, model: Model, qtree: Quadtree, grid_size: int = 20 +) -> None: + # find start and end pos in terms of the grid + matrix = line.matrix_i2c + start_x, start_y = ( + int(v / grid_size) for v in matrix.transform_point(*line.head.pos) + ) + end_x, end_y = (int(v / grid_size) for v in matrix.transform_point(*line.tail.pos)) + excluded_items: set[Item] = {line} + start_end_tiles = ((start_x, start_y), (end_x, end_y)) + orthogonal = line.orthogonal + + def weight(x, y, current_node): + direction_penalty = 0 if same_direction(x, y, current_node) else 1 + diagonal_penalty = 0 if prefer_orthogonal(x, y, current_node) else 1 + occupied_penalty = ( + 0 + if (x, y) in start_end_tiles + or not tile_occupied(x, y, grid_size, qtree, excluded_items) + else 5 + ) + return 1 + direction_penalty + diagonal_penalty + occupied_penalty + + path_with_direction = route( + (start_x, start_y), + (end_x, end_y), + weight=weight, + heuristic=manhattan_distance(end_x, end_y), + orthogonal=orthogonal, + ) + if len(path_with_direction) < 2: + return + path = list(turns_in_path(path_with_direction)) + + min_handles = 3 if orthogonal else 2 + + segment = Segment(line, model) + while len(path) > len(line.handles()): + segment.split_segment(0) + while min_handles < len(path) < len(line.handles()): + segment.merge_segment(0) + + imatrix = matrix.inverse() + for pos, handle in zip(path[1:-1], line.handles()[1:-1]): + cx = pos[0] * grid_size + grid_size / 2 + cy = pos[1] * grid_size + grid_size / 2 + handle.pos = imatrix.transform_point(cx, cy) + + if orthogonal: + _, dir = path_with_direction[1] + line.horizontal = dir == (1, 0) + if line.horizontal: + line.handles()[1].pos.y = line.handles()[0].pos.y + else: + line.handles()[1].pos.x = line.handles()[0].pos.x + + model.request_update(line) + + +def same_direction(x: int, y: int, node: Node) -> bool: + node_pos = node.position + dir_x = x - node_pos[0] + dir_y = y - node_pos[1] + node_dir = node.direction + return dir_x == node_dir[0] and dir_y == node_dir[1] + + +def prefer_orthogonal(x: int, y: int, node: Node) -> bool: + return x == node.position[0] or y == node.position[1] + + +def tile_occupied( + x: int, y: int, grid_size: int, qtree: Quadtree, excluded_items: set[Item] +) -> bool: + items = ( + qtree.find_intersect((x * grid_size, y * grid_size, grid_size, grid_size)) + - excluded_items + ) + return bool(items) + + +def turns_in_path(path_and_dir: list[tuple[Tile, Tile]]) -> Iterable[Tile]: + for _, group in groupby(path_and_dir, key=itemgetter(1)): + *_, (position, _) = group + yield position + + +# Heuristics: + + +def constant_heuristic(cost): + def heuristic(_x, _y): + return cost + + return heuristic + + +def manhattan_distance(end_x, end_y): + def heuristic(x, y): + return abs(x - end_x) + abs(y - end_y) + + return heuristic + + +def quadratic_distance(end_x, end_y): + def heuristic(x, y): + return ((x - end_x) ** 2) + ((y - end_y) ** 2) + + return heuristic + + +def route( + start: Tile, + end: Tile, + weight: Weight, + heuristic: Heuristic = constant_heuristic(1), + orthogonal: bool = False, +) -> list[tuple[Tile, Tile]]: + """Simple A* router/solver. + + This solver is tailored towards grids (mazes). + + Args: + start: Start position + end: Final position + weight: + Provide a cost for the move to the new position (x, y). Weight can be "inf" + to point out you can never move there. Weight can consist of many parts: + a weight of travel (normally 1), but also a cost for bending, for example. + heuristic: + An (optimistic) estimate of how long it would take to reach `end` from the + position (x, y). Normally this is some distance (manhattan or quadratic). + Default is a constant distance (1), which would make it a standard shortest + path algorithm a la Dijkstra. + + Returns: + A list of the shortest path found in tuples (position, direction), or []. + """ + open_nodes = [Node(None, start, (0, 0), 0, 0)] + closed_positions = set() + + f = attrgetter("f") + directions = [ + (0, -1), + (0, 1), + (-1, 0), + (1, 0), + ] + + if not orthogonal: + directions += [ + (-1, -1), + (-1, 1), + (1, -1), + (1, 1), + ] + + while open_nodes: + current_node = min(open_nodes, key=f) + + if current_node.position == end: + return reconstruct_path(current_node) + + for direction in directions: + node_x = current_node.position[0] + direction[0] + node_y = current_node.position[1] + direction[1] + + w = weight(node_x, node_y, current_node) + if w == "inf": + continue + + node_position = (node_x, node_y) + + if node_position in closed_positions: + continue + + g = current_node.g + w + + for open_node in open_nodes: + if node_position == open_node.position and g > open_node.g: + continue + + open_nodes.append( + Node( + current_node, + node_position, + direction, + g, + g + heuristic(node_x, node_y), + ) + ) + + open_nodes.remove(current_node) + closed_positions.add(current_node.position) + + return [] + + +def reconstruct_path(node: Node) -> list[tuple[Tile, Tile]]: + path = [] + current = node + while current: + path.append((current.position, current.direction)) + current = current.parent # type: ignore[assignment] + return path[::-1] diff --git a/gaphas/geometry.py b/gaphas/geometry.py index 403090f3..b7641adc 100644 --- a/gaphas/geometry.py +++ b/gaphas/geometry.py @@ -302,21 +302,12 @@ def distance_rectangle_point(rect: Rect, point: Point) -> float: >>> distance_rectangle_point((0, 0, 10, 10), (-1, 11)) 2 """ - dx = dy = 0.0 px, py = point rx, ry, rw, rh = rect + dx = max(0.0, rx - px, px - (rx + rw)) + dy = max(0.0, ry - py, py - (ry + rh)) - if px < rx: - dx = rx - px - elif px > rx + rw: - dx = px - (rx + rw) - - if py < ry: - dy = ry - py - elif py > ry + rh: - dy = py - (ry + rh) - - return abs(dx) + abs(dy) + return dx + dy def point_on_rectangle(rect: Rect, point: Point, border: bool = False) -> Point: @@ -567,6 +558,23 @@ def intersect_line_line( return x, y +def intersect_rectangle_line( + rect: Rect, line_start: Point, line_end: Point +) -> set[Point]: + rx, ry, rw, rh = rect + side_starts = [(rx, ry), (rx + rw, ry), (rx + rw, ry + rh), (rx, ry + rh)] + side_ends = [(rx + rw, ry), (rx + rw, ry + rh), (rx, ry + rh), (rx, ry)] + return set( + filter( + None, + ( + intersect_line_line(line_start, line_end, side_start, side_end) + for side_start, side_end in zip(side_starts, side_ends) + ), + ) + ) + + def rectangle_contains(inner: Rect, outer: Rect) -> bool: """Returns True if ``inner`` rect is contained in ``outer`` rect.""" ix, iy, iw, ih = inner diff --git a/gaphas/item.py b/gaphas/item.py index 60a6965c..b0b6401e 100644 --- a/gaphas/item.py +++ b/gaphas/item.py @@ -341,6 +341,11 @@ def horizontal(self, horizontal: bool) -> None: self._horizontal = horizontal self.update_orthogonal_constraints(self.orthogonal) + @property + def segments(self): + hpos = [h.pos for h in self._handles] + return zip(tuple(hpos), tuple(hpos[1:])) + def insert_handle(self, index: int, handle: Handle) -> None: self._handles.insert(index, handle) @@ -395,11 +400,10 @@ def point(self, x: float, y: float) -> float: >>> f"{a.point(29, 29):.3f}" '0.784' """ - hpos = [h.pos for h in self._handles] p = (x, y) distance, _point = min( distance_line_point(start, end, p) # type: ignore[arg-type] - for start, end in zip(hpos[:-1], hpos[1:]) + for start, end in self.segments ) return max(0.0, distance - self.fuzziness) diff --git a/gaphas/quadtree.py b/gaphas/quadtree.py index 1605ddc6..c97cecc5 100644 --- a/gaphas/quadtree.py +++ b/gaphas/quadtree.py @@ -127,6 +127,10 @@ def soft_bounds(self) -> Bounds: y1 = max(map(add, x_y_w_h[1], x_y_w_h[3])) return x0, y0, x1 - x0, y1 - y0 + @property + def items(self): + return self._ids.keys() + def add(self, item: T, bounds: Bounds, data: D | None = None) -> None: """Add an item to the tree. diff --git a/gaphas/segment.py b/gaphas/segment.py index 5966de34..7e82c695 100644 --- a/gaphas/segment.py +++ b/gaphas/segment.py @@ -1,14 +1,14 @@ """Allow for easily adding segments to lines.""" from functools import singledispatch -from typing import Optional +from typing import Optional, Sequence, Tuple from cairo import ANTIALIAS_NONE from gi.repository import Gtk from gaphas.aspect import MoveType from gaphas.aspect.handlemove import HandleMove -from gaphas.connector import Handle, LinePort -from gaphas.geometry import distance_line_point, distance_point_point_fast +from gaphas.connector import Handle, LinePort, Port +from gaphas.geometry import Point, distance_line_point, distance_point_point_fast from gaphas.item import Item, Line, matrix_i2i from gaphas.solver import WEAK from gaphas.tool.itemtool import find_item_and_handle_at_point @@ -21,13 +21,17 @@ class Segment: def __init__(self, item, model): raise TypeError - def split_segment(self, segment, count=2): + def split_segment( + self, segment: int, count: int = 2 + ) -> Tuple[Sequence[Handle], Sequence[Port]]: ... - def split(self, pos): + def split(self, pos: Point) -> Handle: ... - def merge_segment(self, segment, count=2): + def merge_segment( + self, segment: int, count: int = 2 + ) -> Tuple[Sequence[Handle], Sequence[Port]]: ... @@ -52,7 +56,6 @@ def split(self, pos): def split_segment(self, segment, count=2): """Split one item segment into ``count`` equal pieces. - def split_segment(self, segment, count=2): Two lists are returned - list of created handles diff --git a/tests/test_collision.py b/tests/test_collision.py new file mode 100644 index 00000000..75f44aa6 --- /dev/null +++ b/tests/test_collision.py @@ -0,0 +1,235 @@ +from gaphas.canvas import Canvas +from gaphas.collision import ( + Node, + colliding_lines, + manhattan_distance, + route, + same_direction, + tile_occupied, + turns_in_path, + update_colliding_lines, + update_line_to_avoid_collisions, +) +from gaphas.connections import Connections +from gaphas.connector import Handle +from gaphas.item import Element, Item, Line +from gaphas.quadtree import Quadtree + + +def test_colliding_lines(): + connections = Connections() + qtree: Quadtree[Item, None] = Quadtree() + + line = Line(connections=connections) + line.head.pos = (0, 50) + line.tail.pos = (200, 50) + + element = Element(connections=connections) + element.height = 100 + element.width = 100 + element.matrix.translate(50, 0) + + qtree.add(line, (0, 50, 200, 50)) + qtree.add(element, (50, 0, 150, 100)) + + collisions = list(colliding_lines(qtree)) + + assert (line, element) in collisions + + +def test_prefer_same_direction(): + node = Node(None, (0, 0), (1, 0), 0, 0) + + assert same_direction(1, 0, node) + assert not same_direction(1, 1, node) + + +def test_tile_occupied(): + connections = Connections() + qtree: Quadtree[Item, None] = Quadtree() + + line = Line(connections=connections) + line.head.pos = (0, 50) + line.tail.pos = (200, 50) + + element = Element(connections=connections) + element.height = 100 + element.width = 100 + element.matrix.translate(50, 0) + + qtree.add(line, (0, 50, 200, 50)) + qtree.add(element, (50, 0, 150, 100)) + + assert not tile_occupied(0, 0, 20, qtree, {line}) + assert tile_occupied(5, 1, 20, qtree, {line}) + + +def test_solve_orthogonal_line(): + canvas = Canvas() + qtree: Quadtree[Item, None] = Quadtree() + + line = Line(connections=canvas.connections) + line.head.pos = (0, 0) + line.tail.pos = (200, 200) + line.insert_handle(1, Handle((100, 100))) + line.orthogonal = True + + element = Element(connections=canvas.connections) + element.height = 150 + element.width = 150 + element.matrix.translate(50, 0) + + qtree.add(line, (0, 0, 200, 200)) + qtree.add(element, (50, 0, 200, 150)) + + update_line_to_avoid_collisions(line, canvas, qtree) + handles = line.handles() + + assert not line.horizontal + assert handles[0].pos.tuple() == (0, 0) + assert handles[1].pos.tuple() == (0, 210) + assert handles[2].pos.tuple() == (200, 200) + + +def test_solve_horizontal_orthogonal_line(): + canvas = Canvas() + qtree: Quadtree[Item, None] = Quadtree() + + line = Line(connections=canvas.connections) + line.head.pos = (0, 0) + line.tail.pos = (200, 200) + line.insert_handle(1, Handle((100, 100))) + line.orthogonal = True + + element = Element(connections=canvas.connections) + element.height = 150 + element.width = 150 + element.matrix.translate(0, 50) + + qtree.add(line, (0, 0, 200, 200)) + qtree.add(element, (0, 50, 150, 200)) + + update_line_to_avoid_collisions(line, canvas, qtree) + handles = line.handles() + + assert line.horizontal + assert handles[0].pos.tuple() == (0, 0) + assert handles[1].pos.tuple() == (210, 0) + assert handles[2].pos.tuple() == (200, 200) + + +def test_update_lines(): + canvas = Canvas() + qtree: Quadtree[Item, None] = Quadtree() + + line = Line(connections=canvas.connections) + line.head.pos = (0, 50) + line.tail.pos = (200, 50) + + element = Element(connections=canvas.connections) + element.height = 100 + element.width = 100 + element.matrix.translate(50, 0) + + qtree.add(line, (0, 50, 200, 50)) + qtree.add(element, (50, 0, 150, 100)) + + update_colliding_lines(canvas, qtree) + assert len(line.handles()) == 6 + assert [h.pos.tuple() for h in line.handles()] == [ + (0.0, 50.0), + (10.0, 10.0), + (50.0, -30.0), + (230.0, -30.0), + (230.0, 30.0), + (200.0, 50.0), + ] + + +def test_maze(): + + _maze = [ + [(0), 0, 0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0, (0), 0, 0], + [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + ] + + start = (0, 0) + end = (7, 6) + + def weight(x, y, _current_node): + return ( + 1 + if 0 <= x < len(_maze) and 0 <= y < len(_maze[x]) and not _maze[y][x] + else 10 + ) + + path_and_dir = route(start, end, weight=weight, heuristic=manhattan_distance(*end)) + dump_maze(_maze, [pd[0] for pd in path_and_dir]) + assert path_and_dir == [ + ((0, 0), (0, 0)), + ((1, 1), (1, 1)), + ((2, 2), (1, 1)), + ((3, 3), (1, 1)), + ((3, 4), (0, 1)), + ((4, 5), (1, 1)), + ((5, 6), (1, 1)), + ((6, 6), (1, 0)), + ((7, 6), (1, 0)), + ], path_and_dir + + +def test_unsolvable_maze(): + + start = (0, 0) + end = (3, 3) + + def weight(x, y, _current_node): + wall = y == 2 + return 1 if 0 <= x < 4 and 0 <= y < 4 and not wall else "inf" + + path = route(start, end, weight=weight) + assert path == [], path + + +def dump_maze(maze, path): + for y, row in enumerate(maze): + for x, v in enumerate(row): + if (x, y) == path[0]: + p = "S" + elif (x, y) == path[-1]: + p = "E" + elif (x, y) in path: + p = "x" + else: + p = v + print(p, end=" ") + print() + + +def test_find_turns_in_path(): + path_and_dir = [ + ((0, 0), (0, 0)), + ((1, 1), (1, 1)), + ((2, 2), (1, 1)), + ((3, 3), (1, 1)), + ((3, 4), (0, 1)), + ((4, 5), (1, 1)), + ((5, 6), (1, 1)), + ((6, 6), (1, 0)), + ((7, 6), (1, 0)), + ] + + expected_path = [(0, 0), (3, 3), (3, 4), (5, 6), (7, 6)] + assert list(turns_in_path(path_and_dir)) == expected_path + + +def test_find_turns_in_empty_path(): + assert list(turns_in_path([])) == []