diff --git a/requirements-dev.txt b/requirements-dev.txt index 7145a2a..29aae65 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -5,4 +5,5 @@ pytest-cov~=4.1.0 # lambdas shapely~=1.8.5.post1 geojson~=3.0.1 -pyshp~=2.3.1 \ No newline at end of file +pyshp~=2.3.1 +packaging~=24.2 \ No newline at end of file diff --git a/src/Dockerfile b/src/Dockerfile index 05deaa4..7a9cedf 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -15,7 +15,7 @@ RUN pip install . -t python # to change the hash of the file and get TF to realize it needs to be # redeployed. Ticket for a better solution: # https://gfw.atlassian.net/browse/GTC-1250 -# change 19 +# change 20 RUN yum install -y zip geos-devel diff --git a/src/datapump/clients/data_api.py b/src/datapump/clients/data_api.py index c14cb94..adefb37 100644 --- a/src/datapump/clients/data_api.py +++ b/src/datapump/clients/data_api.py @@ -52,6 +52,8 @@ def get_1x1_asset(self, dataset: str, version: str) -> str: ) elif dataset == "gadm" and version == "v3.6": return "s3://gfw-files/2018_update/tsv/gadm36_adm2_1_1.csv" + elif dataset == "gadm" and version == "v4.1": + return "s3://gfw-pipelines/geotrellis/features/gadm41_adm2_1x1.tsv" return self.get_asset(dataset, version, "1x1 grid")["asset_uri"] diff --git a/src/datapump/jobs/geotrellis.py b/src/datapump/jobs/geotrellis.py index adec3ab..d7373ee 100644 --- a/src/datapump/jobs/geotrellis.py +++ b/src/datapump/jobs/geotrellis.py @@ -7,6 +7,7 @@ from itertools import groupby from pathlib import Path from pprint import pformat +from packaging.version import Version from typing import Any, Dict, List, Optional, Tuple from ..clients.aws import get_emr_client, get_s3_client, get_s3_path_parts @@ -88,6 +89,8 @@ class GeotrellisJob(Job): result_tables: List[AnalysisResultTable] = [] content_end_date: Optional[str] = None + + def next_step(self): now = datetime.now() if ( @@ -483,7 +486,7 @@ def _get_indices_and_cluster( raise e # schema change in version 2.1.4 - if self.geotrellis_version < "2.1.4": + if Version(self.geotrellis_version) < Version("2.1.4"): threshold_field = "umd_tree_cover_density__threshold" glad_conf_field = "is__confirmed_alert" glad_date_field = "alert__date" @@ -740,7 +743,7 @@ def _get_step(self) -> Dict[str, Any]: ] # after 1.5, analysis is an argument instead of an option - if self.geotrellis_version < "1.5.0": + if Version(self.geotrellis_version) < Version("1.5.0"): step_args.append("--analysis") step_args += [ @@ -805,9 +808,14 @@ def _run_job_flow(self, name, instances, steps, applications, configurations): # Spark/Scala upgrade in version 2.0.0 emr_version = ( - GLOBALS.emr_version if self.geotrellis_version > "2.0.0" else "emr-6.1.0" + GLOBALS.emr_version if Version(self.geotrellis_version) > Version("2.0.0") else "emr-6.1.0" ) + # If using version 2.4.1 or earlier, use older GDAL version + bootstrap_path = f"s3://{GLOBALS.s3_bucket_pipeline}/geotrellis/bootstrap/gdal-3.8.3.sh" + if Version(self.geotrellis_version) < Version("2.4.1"): + bootstrap_path = f"s3://{GLOBALS.s3_bucket_pipeline}/geotrellis/bootstrap/gdal.sh" + request = { "Name": name, "ReleaseLabel": emr_version, @@ -821,8 +829,7 @@ def _run_job_flow(self, name, instances, steps, applications, configurations): { "Name": "Install GDAL", "ScriptBootstrapAction": { - "Path": f"s3://{GLOBALS.s3_bucket_pipeline}/geotrellis/bootstrap/gdal.sh", - "Args": ["3.1.2"], + "Path": bootstrap_path }, }, ], @@ -969,7 +976,7 @@ def _configurations(self, worker_count: int) -> List[Dict[str, Any]]: "spark.dynamicAllocation.enabled": "false", } - if self.geotrellis_version >= "2.0.0": + if Version(self.geotrellis_version) >= Version("2.0.0"): spark_defaults.update( { "spark.decommission.enabled": "true", diff --git a/src/setup.py b/src/setup.py index 13e1119..a1d5ed7 100644 --- a/src/setup.py +++ b/src/setup.py @@ -15,5 +15,6 @@ "pyshp~=2.3.1", "pydantic~=1.10.11", "retry~=0.9.2", + "packaging~=24.2" ], # noqa: E231 )