diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/errata.py b/lib/errata.py new file mode 100644 index 0000000..d494df7 --- /dev/null +++ b/lib/errata.py @@ -0,0 +1,47 @@ +import typing +import dataclasses + +from shapely.geometry import Point, shape + +from .util import Fixer + + +@dataclasses.dataclass +class Erratum: + """ + An erratum is defined by a language to identify matching features, a point to find a matching + shape and a specification of corrections. + """ + language: str + point: Point + fix: dict + + @classmethod + def from_spec(cls, spec: typing.Dict[str, str]): + lon, lat = float(spec['lon']), float(spec['lat']) + return cls( + language=spec['LANGUAGE'], + point=Point(lon, lat), + fix=dict(s.split('=') for s in spec['fix'].split(';')) + ) + + +class Errata(Fixer): + __item_class__ = Erratum + + def __call__(self, props, geom): + language = props['LANGUAGE'] + if language in self.fixes: + obj, eindex = shape(geom), -1 + for i, erratum in enumerate(self.fixes[language]): + if obj.contains(erratum.point): + props.update(erratum.fix) + eindex = i + break + if eindex > -1: + del self.fixes[language] + return props + + @property + def all_done(self): + return not any(bool(len(e)) for e in self.fixes.values()) diff --git a/lib/metadata.py b/lib/metadata.py new file mode 100644 index 0000000..bd5f9e0 --- /dev/null +++ b/lib/metadata.py @@ -0,0 +1,50 @@ +import typing + +import pycountry + +# The normalized field names of the shape metadata: +COLS = ['LANGUAGE', 'COUNTRY_NAME', 'ISLAND_NAME', 'SOVEREIGN'] + + +def normalize(d: typing.Dict[str, str]) -> typing.Union[typing.Dict[str, str], None]: + """ + Normalize field names and field content for country and island names. + + Return `None` if the record does not contain metadata about a language polygon. + """ + for k in ['ISLAND_NAM', 'ISLAND_NA_', 'ISL_NAM']: # Spelling "variants". + if k in d: + v = d.pop(k) + d['ISLAND_NAME'] = { # Typos: + 'apua New Guinea': 'Papua New Guinea', + 'Papua New Gu': 'Papua New Guinea', + }.get(v, v) + if 'CNTRY_NAME' in d: + d['COUNTRY_NAME'] = d.pop('CNTRY_NAME') + ncountries = [] + for name in d['COUNTRY_NAME'].split('/'): + name = { + 'Tailand': 'Thailand', + 'Burma': 'Myanmar', + 'Christmas I.': 'Christmas Island', + 'East Tiimor': 'Timor-Leste', + 'East Timor': 'Timor-Leste', + 'Kampuchea': 'Cambodia', + 'Laos': "Lao People's Democratic Republic", + }.get(name, name) + assert pycountry.countries.lookup(name) + ncountries.append(name) + d['COUNTRY_NAME'] = '/'.join(ncountries) + if 'SOVEREIGN' in d and 'COUNTRY_NAME' not in d: + if d['SOVEREIGN'] == 'Australia': + d['COUNTRY_NAME'] = 'Australia' + if d.get('LANGUAGE', '').startswith('Uninhabite'): + return None + if d.get('LANGUAGE', '').startswith('Unclassified'): + return None + for v in d.values(): + assert ';' not in v + for col in COLS: + d.setdefault(col, '') + assert set(COLS).issubset(set(d.keys())) + return d diff --git a/lib/move_polygons.py b/lib/move_polygons.py new file mode 100644 index 0000000..4029a40 --- /dev/null +++ b/lib/move_polygons.py @@ -0,0 +1,80 @@ +import typing +import dataclasses + +from shapely.geometry import Point, shape + +from .util import Fixer + + +@dataclasses.dataclass +class Move: + """ + A move is defined by a language to identify matching features, a point to find matching polygons + and a translation vector specifying the actual move. + """ + language: str + point: Point + vector: typing.Union[typing.Tuple[float], None] + + @classmethod + def from_spec(cls, spec: typing.Dict[str, str]): + lon, lat = float(spec['source_lon']), float(spec['source_lat']) + move = cls( + language=spec['LANGUAGE'], + point=Point(lon, lat), + vector=(float(spec['target_lon']) - lon, float(spec['target_lat']) - lat) + if spec['target_lon'] else None, + ) + # Make sure translations are reasonably close. + if move.vector and move.language not in {'RAPA', 'EASTER ISLAND'}: + assert abs(move.vector[0]) < 1.3 and abs(move.vector[1]) < 1.3, ( + 'Translation vector too big for {0.language}: {0.vector}'.format(move)) + return move + + def __call__(self, polygon_coordinates): + """ + "Move" a polygon, by adding the translation vector to each coordinate of each ring. + """ + if self.vector: + return [ + [(lon + self.vector[0], lat + self.vector[1]) for lon, lat in ring] + for ring in polygon_coordinates] + return polygon_coordinates + + +class Mover(Fixer): + """ + Functionality to "move" features according to specifications. + """ + __item_class__ = Move + + def __call__(self, feature) -> dict: + """ + Implements the functionality to move polygons by a vector for a feature. + """ + language = feature['properties']['LANGUAGE'] + if language not in self.fixes: + return feature + + geom = feature['geometry'] + out_polys = [] + in_polys = [geom['coordinates']] if geom['type'] == 'Polygon' else geom['coordinates'] + for poly in in_polys: # We operate on individual polygons, not full MultiPolygons. + pshape = shape(dict(type='Polygon', coordinates=poly)) + + move, mindex = None, -1 + for i, m in enumerate(self.fixes[language]): + if pshape.contains(m.point): + # The starting point of the translation vector falls within the polygon! + move, mindex = m, i + break # Assuming non-overlapping polygons we are done with the feature. + if move and move.vector is None: + # If no vector is defined, we remove the polygon from the shape. + pass + else: + out_polys.append(move(poly) if move else poly) + if mindex > -1: # A matching move was found. + del self.fixes[language][mindex] # We keep track of which moves have been made. + geom['type'] = 'Polygon' if len(out_polys) == 1 else 'MultiPolygon' + geom['coordinates'] = out_polys[0] if len(out_polys) == 1 else out_polys + return feature diff --git a/lib/repair_geometry.py b/lib/repair_geometry.py new file mode 100644 index 0000000..daff096 --- /dev/null +++ b/lib/repair_geometry.py @@ -0,0 +1,43 @@ +import functools +import dataclasses + +from shapely.geometry import shape + +from .util import Fixer + + +@dataclasses.dataclass +class Hole: + language: str + geometry: dict + + @classmethod + def from_spec(cls, spec): + res = cls(language=spec['properties']['LANGUAGE'], geometry=spec['geometry']) + assert res.shape.is_valid + return res + + @functools.cached_property + def shape(self): + return shape(self.geometry) + + +class ReinsertHoles(Fixer): + __item_class__ = Hole + + def __call__(self, feature, geom): + hole = self.fixes[feature['properties']['LANGUAGE']].pop() + assert geom['type'] == 'MultiPolygon' + new_polys = [] + for poly in geom['coordinates']: + polyshape = shape(dict(type='Polygon', coordinates=poly)) + assert polyshape.is_valid + if polyshape.contains(hole.shape): + assert len(poly) == 1, 'expected polygon without holes!' + poly = list(poly) + # Add the first ring of the hole geometry as hole: + poly.append(hole.geometry['coordinates'][0]) + new_polys.append(poly) + geom['coordinates'] = new_polys + assert shape(geom).is_valid + return geom diff --git a/lib/util.py b/lib/util.py new file mode 100644 index 0000000..7d808f5 --- /dev/null +++ b/lib/util.py @@ -0,0 +1,28 @@ +import itertools + + +def existing_dir(d): + if not d.exists(): + d.mkdir(parents=True) + assert d.is_dir() + return d + + +class Fixer: + """ + Class implementing support for book-keeping about things to fix, grouped by language name. + """ + __item_class__ = None + + def __init__(self, specs): + self.fixes = { + lg: list(fixes) for lg, fixes in itertools.groupby( + sorted([self.__item_class__.from_spec(s) for s in specs], key=lambda f: f.language), + lambda f: f.language, + ) + } + assert self.fixes + + @property + def all_done(self): + return not any(bool(len(f)) for f in self.fixes.values())