Skip to content

Commit

Permalink
refactored code
Browse files Browse the repository at this point in the history
  • Loading branch information
xrotwang committed Jun 14, 2024
1 parent 547f683 commit 83707ba
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 0 deletions.
Empty file added lib/__init__.py
Empty file.
47 changes: 47 additions & 0 deletions lib/errata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import typing
import dataclasses

from shapely.geometry import Point, shape

from .util import Fixer


@dataclasses.dataclass
class Erratum:
"""
An erratum is defined by a language to identify matching features, a point to find a matching
shape and a specification of corrections.
"""
language: str
point: Point
fix: dict

@classmethod
def from_spec(cls, spec: typing.Dict[str, str]):
lon, lat = float(spec['lon']), float(spec['lat'])
return cls(
language=spec['LANGUAGE'],
point=Point(lon, lat),
fix=dict(s.split('=') for s in spec['fix'].split(';'))
)


class Errata(Fixer):
__item_class__ = Erratum

def __call__(self, props, geom):
language = props['LANGUAGE']
if language in self.fixes:
obj, eindex = shape(geom), -1
for i, erratum in enumerate(self.fixes[language]):
if obj.contains(erratum.point):
props.update(erratum.fix)
eindex = i
break
if eindex > -1:
del self.fixes[language]
return props

@property
def all_done(self):
return not any(bool(len(e)) for e in self.fixes.values())
50 changes: 50 additions & 0 deletions lib/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import typing

import pycountry

# The normalized field names of the shape metadata:
COLS = ['LANGUAGE', 'COUNTRY_NAME', 'ISLAND_NAME', 'SOVEREIGN']


def normalize(d: typing.Dict[str, str]) -> typing.Union[typing.Dict[str, str], None]:
"""
Normalize field names and field content for country and island names.
Return `None` if the record does not contain metadata about a language polygon.
"""
for k in ['ISLAND_NAM', 'ISLAND_NA_', 'ISL_NAM']: # Spelling "variants".
if k in d:
v = d.pop(k)
d['ISLAND_NAME'] = { # Typos:
'apua New Guinea': 'Papua New Guinea',
'Papua New Gu': 'Papua New Guinea',
}.get(v, v)
if 'CNTRY_NAME' in d:
d['COUNTRY_NAME'] = d.pop('CNTRY_NAME')
ncountries = []
for name in d['COUNTRY_NAME'].split('/'):
name = {
'Tailand': 'Thailand',
'Burma': 'Myanmar',
'Christmas I.': 'Christmas Island',
'East Tiimor': 'Timor-Leste',
'East Timor': 'Timor-Leste',
'Kampuchea': 'Cambodia',
'Laos': "Lao People's Democratic Republic",
}.get(name, name)
assert pycountry.countries.lookup(name)
ncountries.append(name)
d['COUNTRY_NAME'] = '/'.join(ncountries)
if 'SOVEREIGN' in d and 'COUNTRY_NAME' not in d:
if d['SOVEREIGN'] == 'Australia':
d['COUNTRY_NAME'] = 'Australia'
if d.get('LANGUAGE', '').startswith('Uninhabite'):
return None
if d.get('LANGUAGE', '').startswith('Unclassified'):
return None
for v in d.values():
assert ';' not in v
for col in COLS:
d.setdefault(col, '')
assert set(COLS).issubset(set(d.keys()))
return d
80 changes: 80 additions & 0 deletions lib/move_polygons.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import typing
import dataclasses

from shapely.geometry import Point, shape

from .util import Fixer


@dataclasses.dataclass
class Move:
"""
A move is defined by a language to identify matching features, a point to find matching polygons
and a translation vector specifying the actual move.
"""
language: str
point: Point
vector: typing.Union[typing.Tuple[float], None]

@classmethod
def from_spec(cls, spec: typing.Dict[str, str]):
lon, lat = float(spec['source_lon']), float(spec['source_lat'])
move = cls(
language=spec['LANGUAGE'],
point=Point(lon, lat),
vector=(float(spec['target_lon']) - lon, float(spec['target_lat']) - lat)
if spec['target_lon'] else None,
)
# Make sure translations are reasonably close.
if move.vector and move.language not in {'RAPA', 'EASTER ISLAND'}:
assert abs(move.vector[0]) < 1.3 and abs(move.vector[1]) < 1.3, (
'Translation vector too big for {0.language}: {0.vector}'.format(move))
return move

def __call__(self, polygon_coordinates):
"""
"Move" a polygon, by adding the translation vector to each coordinate of each ring.
"""
if self.vector:
return [
[(lon + self.vector[0], lat + self.vector[1]) for lon, lat in ring]
for ring in polygon_coordinates]
return polygon_coordinates


class Mover(Fixer):
"""
Functionality to "move" features according to specifications.
"""
__item_class__ = Move

def __call__(self, feature) -> dict:
"""
Implements the functionality to move polygons by a vector for a feature.
"""
language = feature['properties']['LANGUAGE']
if language not in self.fixes:
return feature

geom = feature['geometry']
out_polys = []
in_polys = [geom['coordinates']] if geom['type'] == 'Polygon' else geom['coordinates']
for poly in in_polys: # We operate on individual polygons, not full MultiPolygons.
pshape = shape(dict(type='Polygon', coordinates=poly))

move, mindex = None, -1
for i, m in enumerate(self.fixes[language]):
if pshape.contains(m.point):
# The starting point of the translation vector falls within the polygon!
move, mindex = m, i
break # Assuming non-overlapping polygons we are done with the feature.
if move and move.vector is None:
# If no vector is defined, we remove the polygon from the shape.
pass
else:
out_polys.append(move(poly) if move else poly)
if mindex > -1: # A matching move was found.
del self.fixes[language][mindex] # We keep track of which moves have been made.
geom['type'] = 'Polygon' if len(out_polys) == 1 else 'MultiPolygon'
geom['coordinates'] = out_polys[0] if len(out_polys) == 1 else out_polys
return feature
43 changes: 43 additions & 0 deletions lib/repair_geometry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import functools
import dataclasses

from shapely.geometry import shape

from .util import Fixer


@dataclasses.dataclass
class Hole:
language: str
geometry: dict

@classmethod
def from_spec(cls, spec):
res = cls(language=spec['properties']['LANGUAGE'], geometry=spec['geometry'])
assert res.shape.is_valid
return res

@functools.cached_property
def shape(self):
return shape(self.geometry)


class ReinsertHoles(Fixer):
__item_class__ = Hole

def __call__(self, feature, geom):
hole = self.fixes[feature['properties']['LANGUAGE']].pop()
assert geom['type'] == 'MultiPolygon'
new_polys = []
for poly in geom['coordinates']:
polyshape = shape(dict(type='Polygon', coordinates=poly))
assert polyshape.is_valid
if polyshape.contains(hole.shape):
assert len(poly) == 1, 'expected polygon without holes!'
poly = list(poly)
# Add the first ring of the hole geometry as hole:
poly.append(hole.geometry['coordinates'][0])
new_polys.append(poly)
geom['coordinates'] = new_polys
assert shape(geom).is_valid
return geom
28 changes: 28 additions & 0 deletions lib/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import itertools


def existing_dir(d):
if not d.exists():
d.mkdir(parents=True)
assert d.is_dir()
return d


class Fixer:
"""
Class implementing support for book-keeping about things to fix, grouped by language name.
"""
__item_class__ = None

def __init__(self, specs):
self.fixes = {
lg: list(fixes) for lg, fixes in itertools.groupby(
sorted([self.__item_class__.from_spec(s) for s in specs], key=lambda f: f.language),
lambda f: f.language,
)
}
assert self.fixes

@property
def all_done(self):
return not any(bool(len(f)) for f in self.fixes.values())

0 comments on commit 83707ba

Please sign in to comment.