From 8d34d876584fc5416d4f8b08bed3a0187e69d2e3 Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Thu, 14 Nov 2024 15:01:49 -0800 Subject: [PATCH 01/28] GTC-3015 Change to use new pixetl that parallelizes fetching of meta-data This is for testing of the pixetl. Will give it an official version name if the testing is successful. --- batch/pixetl.dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/pixetl.dockerfile b/batch/pixetl.dockerfile index 6c7f44fc..32b6055d 100644 --- a/batch/pixetl.dockerfile +++ b/batch/pixetl.dockerfile @@ -1,4 +1,4 @@ -FROM globalforestwatch/pixetl:v1.7.7 +FROM globalforestwatch/pixetl:v1.7.5-gtc-3035 # Copy scripts COPY ./batch/scripts/ /opt/scripts/ From e6a4b4570a29d3cdea9e41b06405c0b166071226 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Mon, 18 Nov 2024 17:02:11 -0500 Subject: [PATCH 02/28] Inconsequential change --- app/routes/datasets/queries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/routes/datasets/queries.py b/app/routes/datasets/queries.py index 51ebdfe8..70d4110c 100755 --- a/app/routes/datasets/queries.py +++ b/app/routes/datasets/queries.py @@ -507,7 +507,7 @@ async def _query_table( sql: str, geometry: Optional[Geometry], ) -> List[Dict[str, Any]]: - # parse and validate SQL statement + # Parse and validate SQL statement try: parsed = parse_sql(unquote(sql)) except ParseError as e: From 6053d729667999ca8d82642fca041e779d29c7ca Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Thu, 21 Nov 2024 15:27:09 -0800 Subject: [PATCH 03/28] Temporarily switch pixetl job queue to on-demand in staging only. DO NOT move to production. --- app/models/pydantic/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/pydantic/jobs.py b/app/models/pydantic/jobs.py index 643292f2..1bcc8cea 100644 --- a/app/models/pydantic/jobs.py +++ b/app/models/pydantic/jobs.py @@ -129,7 +129,7 @@ class TileCacheJob(Job): class PixETLJob(Job): """Use for raster transformations using PixETL.""" - job_queue = PIXETL_JOB_QUEUE + job_queue = ON_DEMAND_COMPUTE_JOB_QUEUE job_definition = PIXETL_JOB_DEFINITION vcpus = MAX_CORES memory = MAX_MEM From 16bb6da9f25361c0b1a385e1886424ea2603e607 Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Sun, 24 Nov 2024 09:37:52 -0800 Subject: [PATCH 04/28] Change back to pixetl job queue being normal spot queue. --- app/models/pydantic/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/pydantic/jobs.py b/app/models/pydantic/jobs.py index 1bcc8cea..643292f2 100644 --- a/app/models/pydantic/jobs.py +++ b/app/models/pydantic/jobs.py @@ -129,7 +129,7 @@ class TileCacheJob(Job): class PixETLJob(Job): """Use for raster transformations using PixETL.""" - job_queue = ON_DEMAND_COMPUTE_JOB_QUEUE + job_queue = PIXETL_JOB_QUEUE job_definition = PIXETL_JOB_DEFINITION vcpus = MAX_CORES memory = MAX_MEM From 808413a2e76c9512bb5145001ad6993d0b2c07fa Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Mon, 25 Nov 2024 13:18:06 -0800 Subject: [PATCH 05/28] Switch to pixetl with more logging for create_geojsons() --- batch/pixetl.dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/pixetl.dockerfile b/batch/pixetl.dockerfile index 32b6055d..6775fad6 100644 --- a/batch/pixetl.dockerfile +++ b/batch/pixetl.dockerfile @@ -1,4 +1,4 @@ -FROM globalforestwatch/pixetl:v1.7.5-gtc-3035 +FROM globalforestwatch/pixetl:v1.7.5d # Copy scripts COPY ./batch/scripts/ /opt/scripts/ From 7be61b9fe214afa2af1627b43e561d234256111f Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Mon, 25 Nov 2024 21:03:51 -0500 Subject: [PATCH 06/28] Test Dan's pixetl changes --- batch/pixetl.dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/pixetl.dockerfile b/batch/pixetl.dockerfile index 6775fad6..a468ac9a 100644 --- a/batch/pixetl.dockerfile +++ b/batch/pixetl.dockerfile @@ -1,4 +1,4 @@ -FROM globalforestwatch/pixetl:v1.7.5d +FROM globalforestwatch/pixetl:v1.7.7_test_parallel # Copy scripts COPY ./batch/scripts/ /opt/scripts/ @@ -18,4 +18,4 @@ WORKDIR /tmp ENV LC_ALL=C.UTF-8 ENV LANG=C.UTF-8 -ENTRYPOINT ["/opt/scripts/report_status.sh"] \ No newline at end of file +ENTRYPOINT ["/opt/scripts/report_status.sh"] From 5236433db7b8f30ce5bc290f507acec279abf9b4 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Fri, 29 Nov 2024 23:35:18 -0500 Subject: [PATCH 07/28] Replace call to pixetl's create_geojsons in resample script with a new Data API implementation --- batch/python/aws_utils.py | 32 ++++++++- batch/python/gdal_utils.py | 13 ++++ batch/python/resample.py | 25 +++++-- batch/python/tiles_geojson.py | 126 ++++++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 8 deletions(-) create mode 100644 batch/python/tiles_geojson.py diff --git a/batch/python/aws_utils.py b/batch/python/aws_utils.py index 32b236fc..e7c4858e 100644 --- a/batch/python/aws_utils.py +++ b/batch/python/aws_utils.py @@ -1,5 +1,5 @@ import os -from typing import Tuple +from typing import List, Sequence, Tuple, Dict, Any import boto3 @@ -29,3 +29,33 @@ def exists_in_s3(target_bucket, target_key): for obj in response.get("Contents", []): if obj["Key"] == target_key: return obj["Size"] > 0 + + +def get_aws_files( + bucket: str, prefix: str, extensions: Sequence[str] = (".tif",) +) -> List[str]: + """Get all matching files in S3.""" + files: List[str] = list() + + s3_client = get_s3_client() + paginator = s3_client.get_paginator("list_objects_v2") + + print("get_aws_files") + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + try: + contents = page["Contents"] + except KeyError: + break + + for obj in contents: + key = str(obj["Key"]) + if any(key.endswith(ext) for ext in extensions): + files.append(f"/vsis3/{bucket}/{key}") + + print("done get_aws_files") + return files + + +def upload_s3(path: str, bucket: str, dst: str) -> Dict[str, Any]: + s3_client = get_s3_client() + return s3_client.upload_file(path, bucket, dst) diff --git a/batch/python/gdal_utils.py b/batch/python/gdal_utils.py index c4fc68c8..fa498791 100644 --- a/batch/python/gdal_utils.py +++ b/batch/python/gdal_utils.py @@ -1,6 +1,7 @@ import os import subprocess from typing import Dict, List, Optional, Tuple +from urllib.parse import urlparse from errors import GDALError, SubprocessKilledError @@ -21,6 +22,18 @@ def from_vsi_path(file_name: str) -> str: return vsi +def to_vsi_path(file_name: str) -> str: + prefix = {"s3": "vsis3", "gs": "vsigs"} + + parts = urlparse(file_name) + try: + path = f"/{prefix[parts.scheme]}/{parts.netloc}{parts.path}" + except KeyError: + raise ValueError(f"Unknown protocol: {parts.scheme}") + + return path + + def run_gdal_subcommand(cmd: List[str], env: Optional[Dict] = None) -> Tuple[str, str]: """Run GDAL as sub command and catch common errors.""" diff --git a/batch/python/resample.py b/batch/python/resample.py index 73252224..e8ca2e0b 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -19,11 +19,13 @@ # Use relative imports because these modules get copied into container from aws_utils import exists_in_s3, get_s3_client, get_s3_path_parts +from batch.python.aws_utils import get_aws_files, upload_s3 from errors import SubprocessKilledError -from gdal_utils import from_vsi_path +from gdal_utils import from_vsi_path, to_vsi_path from gfw_pixetl.grids import grid_factory -from gfw_pixetl.pixetl_prep import create_geojsons from logging_utils import listener_configurer, log_client_configurer, log_listener +from tiles_geojson import generate_geojson_parallel + from pyproj import CRS, Transformer from shapely.geometry import MultiPolygon, Polygon, shape from shapely.ops import unary_union @@ -656,12 +658,21 @@ def resample( for tile_id in executor.map(process_tile, process_tile_args): logger.log(logging.INFO, f"Finished processing tile {tile_id}") - # Now run pixetl_prep.create_geojsons to generate a tiles.geojson and - # extent.geojson in the target prefix. - create_geojsons_prefix = target_prefix.split(f"{dataset}/{version}/")[1] - logger.log(logging.INFO, f"Uploading tiles.geojson to {create_geojsons_prefix}") + # Now generate a tiles.geojson and extent.geojson and upload to the target prefix. + # tile_paths = [to_vsi_path(f) for f in get_aws_files(bucket, target_prefix)] + tile_paths = get_aws_files(bucket, target_prefix) + + tiles_output_file = "tiles.geojson" + extent_output_file = "extent.geojson" + + logger.log(logging.INFO, f"Generating geojsons") + generate_geojson_parallel(tile_paths, tiles_output_file, extent_output_file, NUM_PROCESSES) + logger.log(logging.INFO, f"Finished generating geojsons") - create_geojsons(list(), dataset, version, create_geojsons_prefix, True) + logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") + upload_s3(tiles_output_file, bucket, target_prefix) + upload_s3(extent_output_file, bucket, target_prefix) + logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") log_queue.put_nowait(None) listener.join() diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py new file mode 100644 index 00000000..9cc145c3 --- /dev/null +++ b/batch/python/tiles_geojson.py @@ -0,0 +1,126 @@ +import json +import subprocess +from concurrent.futures import ProcessPoolExecutor, as_completed +from typing import List, Dict, Any +from geojson import Feature, FeatureCollection +from shapely.geometry import Polygon +from shapely.ops import unary_union + + +def run_gdalinfo(file_path: str) -> Dict[str, Any]: + """Run gdalinfo and parse the output as JSON.""" + try: + result = subprocess.run( + ["gdalinfo", "-json", file_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + text=True, + ) + return json.loads(result.stdout) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to run gdalinfo on {file_path}: {e.stderr}") + + +def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, Any]: + """Extract necessary metadata from the gdalinfo JSON output.""" + corner_coordinates = gdalinfo_json["cornerCoordinates"] + geo_transform = gdalinfo_json["geoTransform"] + + bands = [ + { + "data_type": band.get("type", None), + "no_data": band.get("noDataValue", None), + "nbits": band.get("metadata", {}).get("IMAGE_STRUCTURE", {}).get("NBITS", None), + "blockxsize": band.get("block", [None])[0], + "blockysize": band.get("block", [None])[1], + "stats": { + "min": band.get("minimum"), + "max": band.get("maximum"), + "mean": band.get("mean"), + "std_dev": band.get("stdDev"), + } if "minimum" in band and "maximum" in band else None, + "histogram": band.get("histogram", None), + } + for band in gdalinfo_json.get("bands", []) + ] + + metadata = { + "extent": [ + corner_coordinates["lowerLeft"][0], + corner_coordinates["lowerLeft"][1], + corner_coordinates["upperRight"][0], + corner_coordinates["upperRight"][1], + ], + "width": gdalinfo_json["size"][0], + "height": gdalinfo_json["size"][1], + "pixelxsize": geo_transform[1], + "pixelysize": abs(geo_transform[5]), + "crs": gdalinfo_json["coordinateSystem"]["wkt"], + "driver": gdalinfo_json.get("driverShortName", None), + "compression": gdalinfo_json.get("metadata", {}).get("IMAGE_STRUCTURE", {}).get("COMPRESSION", None), + "bands": bands, + "name": gdalinfo_json["description"], + } + + return metadata + + +def process_file(file_path: str) -> Dict[str, Any]: + """Run gdalinfo and extract metadata for a single file.""" + print(f"Running gdalinfo on {file_path}") + gdalinfo_json = run_gdalinfo(file_path) + return extract_metadata_from_gdalinfo(gdalinfo_json) + + +def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_output: str, max_workers: int = None): + """Generate tiles.geojson and extent.geojson files.""" + features = [] + polygons = [] + errors = [] + + with ProcessPoolExecutor(max_workers=max_workers) as executor: + future_to_file = {executor.submit(process_file, file): file for file in geo_tiffs} + for future in as_completed(future_to_file): + file = future_to_file[future] + try: + metadata = future.result() + extent = metadata["extent"] + + # Create a Polygon from the extent + polygon_coords = [ + [extent[0], extent[1]], + [extent[0], extent[3]], + [extent[2], extent[3]], + [extent[2], extent[1]], + [extent[0], extent[1]], + ] + polygon = Polygon(polygon_coords) + + # Add to GeoJSON features + feature = Feature(geometry=polygon.__geo_interface__, properties=metadata) + features.append(feature) + + # Collect for union + polygons.append(polygon) + except Exception as e: + print(f"Error processing file {file}: {e}") + errors.append(f"File {file}: {e}") + + if errors: + raise RuntimeError(f"Failed to process the following files:\n" + "\n".join(errors)) + + # Write tiles.geojson + tiles_fc = FeatureCollection(features) + with open(tiles_output, "w") as f: + json.dump(tiles_fc, f, indent=2) + print(f"GeoJSON written to {tiles_output}") + + # Create and write extent.geojson + union_geometry = unary_union(polygons) + extent_fc = FeatureCollection([ + Feature(geometry=union_geometry.__geo_interface__, properties={}) + ]) + with open(extent_output, "w") as f: + json.dump(extent_fc, f, indent=2) + print(f"GeoJSON written to {extent_output}") From 44bed61b025fe0144aaa59a2794d161313f0fe28 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Sat, 30 Nov 2024 00:31:15 -0500 Subject: [PATCH 08/28] Fix (make relative) imports --- batch/python/resample.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index e8ca2e0b..89e8597c 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -18,10 +18,11 @@ import rasterio # Use relative imports because these modules get copied into container -from aws_utils import exists_in_s3, get_s3_client, get_s3_path_parts -from batch.python.aws_utils import get_aws_files, upload_s3 +from aws_utils import ( + exists_in_s3, get_s3_client, get_s3_path_parts, get_aws_files, upload_s3 +) from errors import SubprocessKilledError -from gdal_utils import from_vsi_path, to_vsi_path +from gdal_utils import from_vsi_path from gfw_pixetl.grids import grid_factory from logging_utils import listener_configurer, log_client_configurer, log_listener from tiles_geojson import generate_geojson_parallel From 821dc28e941b59ed2e46a531c76b096c3780d58e Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Sat, 30 Nov 2024 00:59:54 -0500 Subject: [PATCH 09/28] Upload to full key, not just prefix --- batch/python/resample.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index 89e8597c..4bc7d718 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -671,8 +671,8 @@ def resample( logger.log(logging.INFO, f"Finished generating geojsons") logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") - upload_s3(tiles_output_file, bucket, target_prefix) - upload_s3(extent_output_file, bucket, target_prefix) + upload_s3(tiles_output_file, bucket, os.path.join(target_prefix, tiles_output_file)) + upload_s3(extent_output_file, bucket, os.path.join(target_prefix, extent_output_file)) logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") log_queue.put_nowait(None) From 6ee4704f7533a579c28361d67ae3eb0ce09113f0 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Sat, 30 Nov 2024 17:49:09 -0500 Subject: [PATCH 10/28] Print geojsons for debugging --- batch/python/tiles_geojson.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 9cc145c3..3c5caa75 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -112,8 +112,11 @@ def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_ou # Write tiles.geojson tiles_fc = FeatureCollection(features) + tiles_txt = json.dumps(tiles_fc, indent=2) + print(f"tiles.geojson:\n", tiles_txt) + with open(tiles_output, "w") as f: - json.dump(tiles_fc, f, indent=2) + print(tiles_txt, file=f) print(f"GeoJSON written to {tiles_output}") # Create and write extent.geojson @@ -121,6 +124,9 @@ def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_ou extent_fc = FeatureCollection([ Feature(geometry=union_geometry.__geo_interface__, properties={}) ]) + extent_txt = json.dumps(extent_fc, indent=2) + print(f"extent.geojson:\n", extent_txt) + with open(extent_output, "w") as f: - json.dump(extent_fc, f, indent=2) + print(extent_txt, file=f) print(f"GeoJSON written to {extent_output}") From d5a28d9316c060cf291ac47e253c3ddb55f93750 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Sat, 30 Nov 2024 21:23:27 -0500 Subject: [PATCH 11/28] WIP: Fix resample script uploading to wrong place --- batch/python/resample.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index 4bc7d718..8a851e8d 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -671,8 +671,8 @@ def resample( logger.log(logging.INFO, f"Finished generating geojsons") logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") - upload_s3(tiles_output_file, bucket, os.path.join(target_prefix, tiles_output_file)) - upload_s3(extent_output_file, bucket, os.path.join(target_prefix, extent_output_file)) + upload_s3(tiles_output_file, bucket, os.path.join(target_prefix, "geotiff", tiles_output_file)) + upload_s3(extent_output_file, bucket, os.path.join(target_prefix, "geotiff", extent_output_file)) logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") log_queue.put_nowait(None) From f93b5367c49176c0ff5b97faf8efaa1d0c51863c Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Sat, 30 Nov 2024 21:23:56 -0500 Subject: [PATCH 12/28] Sanitize geojsons for NaN values --- batch/python/tiles_geojson.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 3c5caa75..9959a238 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -1,7 +1,8 @@ import json +import math import subprocess from concurrent.futures import ProcessPoolExecutor, as_completed -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional from geojson import Feature, FeatureCollection from shapely.geometry import Polygon from shapely.ops import unary_union @@ -30,7 +31,13 @@ def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, A bands = [ { "data_type": band.get("type", None), - "no_data": band.get("noDataValue", None), + "no_data": ( + "nan" if ( + band.get("noDataValue", None) is not None + and math.isnan(band.get("noDataValue")) + ) + else band.get("noDataValue", None) + ), "nbits": band.get("metadata", {}).get("IMAGE_STRUCTURE", {}).get("NBITS", None), "blockxsize": band.get("block", [None])[0], "blockysize": band.get("block", [None])[1], From b4ddb5bdf07c05fafa571e31eed32fcd1bff88f8 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Mon, 2 Dec 2024 13:35:35 -0500 Subject: [PATCH 13/28] Set max simul. gdalinfo processes to 16 --- batch/python/resample.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index 4bc7d718..b0f9c235 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -667,7 +667,12 @@ def resample( extent_output_file = "extent.geojson" logger.log(logging.INFO, f"Generating geojsons") - generate_geojson_parallel(tile_paths, tiles_output_file, extent_output_file, NUM_PROCESSES) + generate_geojson_parallel( + tile_paths, + tiles_output_file, + extent_output_file, + min(16, NUM_PROCESSES) + ) logger.log(logging.INFO, f"Finished generating geojsons") logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") From a4879937095c63ca6883f632e3799cd241de5b7c Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Mon, 2 Dec 2024 21:00:15 -0500 Subject: [PATCH 14/28] Always write coords in tiles.geojson in EPSG:4326 for legacy reasons --- batch/python/tiles_geojson.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 9959a238..fdedb89a 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -2,12 +2,22 @@ import math import subprocess from concurrent.futures import ProcessPoolExecutor, as_completed -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Tuple + +import pyproj from geojson import Feature, FeatureCollection +from pyproj import CRS, Transformer from shapely.geometry import Polygon from shapely.ops import unary_union +def to_wgs84(crs: CRS, x: float, y: float) -> Tuple[float, float]: + transformer = Transformer.from_crs( + crs, CRS.from_epsg(4326), always_xy=True + ) + return transformer.transform(x, y) + + def run_gdalinfo(file_path: str) -> Dict[str, Any]: """Run gdalinfo and parse the output as JSON.""" try: @@ -93,14 +103,14 @@ def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_ou try: metadata = future.result() extent = metadata["extent"] - + crs = CRS.from_string(metadata["crs"]) # Create a Polygon from the extent polygon_coords = [ - [extent[0], extent[1]], - [extent[0], extent[3]], - [extent[2], extent[3]], - [extent[2], extent[1]], - [extent[0], extent[1]], + [*to_wgs84(crs, extent[0], extent[1])], + [*to_wgs84(crs, extent[0], extent[3])], + [*to_wgs84(crs, extent[2], extent[3])], + [*to_wgs84(crs, extent[2], extent[1])], + [*to_wgs84(crs, extent[0], extent[1])], ] polygon = Polygon(polygon_coords) From d4126c3f0225adcdc9b6dbdcd29ffa278eb750fd Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Mon, 2 Dec 2024 23:03:21 -0500 Subject: [PATCH 15/28] Transform extent too, add note --- batch/python/tiles_geojson.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index fdedb89a..3dd08b52 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -4,14 +4,13 @@ from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List, Dict, Any, Tuple -import pyproj from geojson import Feature, FeatureCollection from pyproj import CRS, Transformer from shapely.geometry import Polygon from shapely.ops import unary_union -def to_wgs84(crs: CRS, x: float, y: float) -> Tuple[float, float]: +def to_4326(crs: CRS, x: float, y: float) -> Tuple[float, float]: transformer = Transformer.from_crs( crs, CRS.from_epsg(4326), always_xy=True ) @@ -62,12 +61,15 @@ def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, A for band in gdalinfo_json.get("bands", []) ] + crs: CRS = CRS.from_string(gdalinfo_json["coordinateSystem"]["wkt"]) metadata = { + # NOTE: pixetl seems to always write features in tiles.geojson in + # epsg:4326 coordinates (even when the tiles themselves are + # epsg:3857). Reproduce that behavior for compatibility. If that + # ever changes, remove the call to to_4326 here. "extent": [ - corner_coordinates["lowerLeft"][0], - corner_coordinates["lowerLeft"][1], - corner_coordinates["upperRight"][0], - corner_coordinates["upperRight"][1], + *to_4326(crs, *corner_coordinates["lowerLeft"]), + *to_4326(crs, *corner_coordinates["upperRight"]), ], "width": gdalinfo_json["size"][0], "height": gdalinfo_json["size"][1], @@ -103,14 +105,13 @@ def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_ou try: metadata = future.result() extent = metadata["extent"] - crs = CRS.from_string(metadata["crs"]) # Create a Polygon from the extent polygon_coords = [ - [*to_wgs84(crs, extent[0], extent[1])], - [*to_wgs84(crs, extent[0], extent[3])], - [*to_wgs84(crs, extent[2], extent[3])], - [*to_wgs84(crs, extent[2], extent[1])], - [*to_wgs84(crs, extent[0], extent[1])], + [extent[0], extent[1]], + [extent[0], extent[3]], + [extent[2], extent[3]], + [extent[2], extent[1]], + [extent[0], extent[1]], ] polygon = Polygon(polygon_coords) From 961d2666bd2a24c4c10ba7e9242ce65e08d6d2a9 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Mon, 2 Dec 2024 23:04:55 -0500 Subject: [PATCH 16/28] Remove extra fields from tiles.geojson --- batch/python/tiles_geojson.py | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 3dd08b52..a435a843 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -1,5 +1,4 @@ import json -import math import subprocess from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List, Dict, Any, Tuple @@ -35,31 +34,6 @@ def run_gdalinfo(file_path: str) -> Dict[str, Any]: def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, Any]: """Extract necessary metadata from the gdalinfo JSON output.""" corner_coordinates = gdalinfo_json["cornerCoordinates"] - geo_transform = gdalinfo_json["geoTransform"] - - bands = [ - { - "data_type": band.get("type", None), - "no_data": ( - "nan" if ( - band.get("noDataValue", None) is not None - and math.isnan(band.get("noDataValue")) - ) - else band.get("noDataValue", None) - ), - "nbits": band.get("metadata", {}).get("IMAGE_STRUCTURE", {}).get("NBITS", None), - "blockxsize": band.get("block", [None])[0], - "blockysize": band.get("block", [None])[1], - "stats": { - "min": band.get("minimum"), - "max": band.get("maximum"), - "mean": band.get("mean"), - "std_dev": band.get("stdDev"), - } if "minimum" in band and "maximum" in band else None, - "histogram": band.get("histogram", None), - } - for band in gdalinfo_json.get("bands", []) - ] crs: CRS = CRS.from_string(gdalinfo_json["coordinateSystem"]["wkt"]) metadata = { @@ -71,14 +45,6 @@ def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, A *to_4326(crs, *corner_coordinates["lowerLeft"]), *to_4326(crs, *corner_coordinates["upperRight"]), ], - "width": gdalinfo_json["size"][0], - "height": gdalinfo_json["size"][1], - "pixelxsize": geo_transform[1], - "pixelysize": abs(geo_transform[5]), - "crs": gdalinfo_json["coordinateSystem"]["wkt"], - "driver": gdalinfo_json.get("driverShortName", None), - "compression": gdalinfo_json.get("metadata", {}).get("IMAGE_STRUCTURE", {}).get("COMPRESSION", None), - "bands": bands, "name": gdalinfo_json["description"], } From 96dca4078892d36dc39b9cd1b0110882a41160c3 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Mon, 9 Dec 2024 09:57:03 -0500 Subject: [PATCH 17/28] Change a few var and fcn names --- batch/python/resample.py | 4 ++-- batch/python/tiles_geojson.py | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index c8cf3141..0531b5ed 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -25,7 +25,7 @@ from gdal_utils import from_vsi_path from gfw_pixetl.grids import grid_factory from logging_utils import listener_configurer, log_client_configurer, log_listener -from tiles_geojson import generate_geojson_parallel +from tiles_geojson import generate_geojson from pyproj import CRS, Transformer from shapely.geometry import MultiPolygon, Polygon, shape @@ -667,7 +667,7 @@ def resample( extent_output_file = "extent.geojson" logger.log(logging.INFO, f"Generating geojsons") - generate_geojson_parallel( + generate_geojson( tile_paths, tiles_output_file, extent_output_file, diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index a435a843..6bd785f6 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -1,7 +1,7 @@ import json import subprocess from concurrent.futures import ProcessPoolExecutor, as_completed -from typing import List, Dict, Any, Tuple +from typing import Any, Dict, List, Tuple from geojson import Feature, FeatureCollection from pyproj import CRS, Transformer @@ -58,14 +58,14 @@ def process_file(file_path: str) -> Dict[str, Any]: return extract_metadata_from_gdalinfo(gdalinfo_json) -def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_output: str, max_workers: int = None): +def generate_geojson(geotiffs: List[str], tiles_fn: str, extent_fn: str, max_workers: int = None): """Generate tiles.geojson and extent.geojson files.""" features = [] polygons = [] errors = [] with ProcessPoolExecutor(max_workers=max_workers) as executor: - future_to_file = {executor.submit(process_file, file): file for file in geo_tiffs} + future_to_file = {executor.submit(process_file, file): file for file in geotiffs} for future in as_completed(future_to_file): file = future_to_file[future] try: @@ -97,11 +97,9 @@ def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_ou # Write tiles.geojson tiles_fc = FeatureCollection(features) tiles_txt = json.dumps(tiles_fc, indent=2) - print(f"tiles.geojson:\n", tiles_txt) - with open(tiles_output, "w") as f: + with open(tiles_fn, "w") as f: print(tiles_txt, file=f) - print(f"GeoJSON written to {tiles_output}") # Create and write extent.geojson union_geometry = unary_union(polygons) @@ -111,6 +109,6 @@ def generate_geojson_parallel(geo_tiffs: List[str], tiles_output: str, extent_ou extent_txt = json.dumps(extent_fc, indent=2) print(f"extent.geojson:\n", extent_txt) - with open(extent_output, "w") as f: + with open(extent_fn, "w") as f: print(extent_txt, file=f) - print(f"GeoJSON written to {extent_output}") + print(f"GeoJSON written to {extent_fn}") From 0ffabfc2ac2f92eb2af9183843c6793770e31b0d Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 10 Dec 2024 10:11:17 -0500 Subject: [PATCH 18/28] Move writing geojsons into resample script; use existing run_gdal_subcommand helper --- batch/python/resample.py | 12 ++++++-- batch/python/tiles_geojson.py | 53 +++++++++++++---------------------- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index 0531b5ed..13bd918d 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -25,7 +25,7 @@ from gdal_utils import from_vsi_path from gfw_pixetl.grids import grid_factory from logging_utils import listener_configurer, log_client_configurer, log_listener -from tiles_geojson import generate_geojson +from tiles_geojson import generate_geojsons from pyproj import CRS, Transformer from shapely.geometry import MultiPolygon, Polygon, shape @@ -667,7 +667,7 @@ def resample( extent_output_file = "extent.geojson" logger.log(logging.INFO, f"Generating geojsons") - generate_geojson( + tiles_fc, extent_fc = generate_geojsons( tile_paths, tiles_output_file, extent_output_file, @@ -675,6 +675,14 @@ def resample( ) logger.log(logging.INFO, f"Finished generating geojsons") + tiles_txt = json.dumps(tiles_fc, indent=2) + with open(tiles_output_file, "w") as f: + print(tiles_txt, file=f) + + extent_txt = json.dumps(extent_fc, indent=2) + with open(extent_output_file, "w") as f: + print(extent_txt, file=f) + logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") upload_s3(tiles_output_file, bucket, os.path.join(target_prefix, "geotiff", tiles_output_file)) upload_s3(extent_output_file, bucket, os.path.join(target_prefix, "geotiff", extent_output_file)) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 6bd785f6..2d971b9c 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -8,6 +8,9 @@ from shapely.geometry import Polygon from shapely.ops import unary_union +from batch.python.errors import GDALError +from gdal_utils import run_gdal_subcommand + def to_4326(crs: CRS, x: float, y: float) -> Tuple[float, float]: transformer = Transformer.from_crs( @@ -16,21 +19,6 @@ def to_4326(crs: CRS, x: float, y: float) -> Tuple[float, float]: return transformer.transform(x, y) -def run_gdalinfo(file_path: str) -> Dict[str, Any]: - """Run gdalinfo and parse the output as JSON.""" - try: - result = subprocess.run( - ["gdalinfo", "-json", file_path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - text=True, - ) - return json.loads(result.stdout) - except subprocess.CalledProcessError as e: - raise RuntimeError(f"Failed to run gdalinfo on {file_path}: {e.stderr}") - - def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, Any]: """Extract necessary metadata from the gdalinfo JSON output.""" corner_coordinates = gdalinfo_json["cornerCoordinates"] @@ -54,15 +42,26 @@ def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, A def process_file(file_path: str) -> Dict[str, Any]: """Run gdalinfo and extract metadata for a single file.""" print(f"Running gdalinfo on {file_path}") - gdalinfo_json = run_gdalinfo(file_path) + try: + stdout,stderr = run_gdal_subcommand( + ["gdalinfo", "-json", file_path], + ) + except GDALError as e: + raise RuntimeError(f"Failed to run gdalinfo on {file_path}: {e}") + + gdalinfo_json: Dict = json.loads(stdout) return extract_metadata_from_gdalinfo(gdalinfo_json) -def generate_geojson(geotiffs: List[str], tiles_fn: str, extent_fn: str, max_workers: int = None): +def generate_geojsons( + geotiffs: List[str], + tiles_fn: str, + extent_fn: str, + max_workers: int = None +) -> Tuple[FeatureCollection, FeatureCollection]: """Generate tiles.geojson and extent.geojson files.""" features = [] polygons = [] - errors = [] with ProcessPoolExecutor(max_workers=max_workers) as executor: future_to_file = {executor.submit(process_file, file): file for file in geotiffs} @@ -88,27 +87,13 @@ def generate_geojson(geotiffs: List[str], tiles_fn: str, extent_fn: str, max_wor # Collect for union polygons.append(polygon) except Exception as e: - print(f"Error processing file {file}: {e}") - errors.append(f"File {file}: {e}") - - if errors: - raise RuntimeError(f"Failed to process the following files:\n" + "\n".join(errors)) + raise RuntimeError(f"Error processing file {file}: {e}") - # Write tiles.geojson tiles_fc = FeatureCollection(features) - tiles_txt = json.dumps(tiles_fc, indent=2) - - with open(tiles_fn, "w") as f: - print(tiles_txt, file=f) - # Create and write extent.geojson union_geometry = unary_union(polygons) extent_fc = FeatureCollection([ Feature(geometry=union_geometry.__geo_interface__, properties={}) ]) - extent_txt = json.dumps(extent_fc, indent=2) - print(f"extent.geojson:\n", extent_txt) - with open(extent_fn, "w") as f: - print(extent_txt, file=f) - print(f"GeoJSON written to {extent_fn}") + return tiles_fc, extent_fc From 40ed0c12ca9880a24b2b8db86e3ab9cde6029409 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 10 Dec 2024 10:14:01 -0500 Subject: [PATCH 19/28] Remove unneeded filnames from generate_geojsons call --- batch/python/resample.py | 2 -- batch/python/tiles_geojson.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index 13bd918d..452d165c 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -669,8 +669,6 @@ def resample( logger.log(logging.INFO, f"Generating geojsons") tiles_fc, extent_fc = generate_geojsons( tile_paths, - tiles_output_file, - extent_output_file, min(16, NUM_PROCESSES) ) logger.log(logging.INFO, f"Finished generating geojsons") diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 2d971b9c..573f8b6a 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -55,8 +55,6 @@ def process_file(file_path: str) -> Dict[str, Any]: def generate_geojsons( geotiffs: List[str], - tiles_fn: str, - extent_fn: str, max_workers: int = None ) -> Tuple[FeatureCollection, FeatureCollection]: """Generate tiles.geojson and extent.geojson files.""" From 36631ca4a1a719ab326e92b4551d5d406d95e34c Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 10 Dec 2024 10:26:08 -0500 Subject: [PATCH 20/28] Change behavior of get_aws_files to return s3:// URIs, rather than GDAL --- batch/python/aws_utils.py | 2 +- batch/python/resample.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/batch/python/aws_utils.py b/batch/python/aws_utils.py index e7c4858e..b788bbd4 100644 --- a/batch/python/aws_utils.py +++ b/batch/python/aws_utils.py @@ -50,7 +50,7 @@ def get_aws_files( for obj in contents: key = str(obj["Key"]) if any(key.endswith(ext) for ext in extensions): - files.append(f"/vsis3/{bucket}/{key}") + files.append(f"s3://{bucket}/{key}") print("done get_aws_files") return files diff --git a/batch/python/resample.py b/batch/python/resample.py index 452d165c..8b2d5278 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -22,7 +22,7 @@ exists_in_s3, get_s3_client, get_s3_path_parts, get_aws_files, upload_s3 ) from errors import SubprocessKilledError -from gdal_utils import from_vsi_path +from gdal_utils import from_vsi_path, to_vsi_path from gfw_pixetl.grids import grid_factory from logging_utils import listener_configurer, log_client_configurer, log_listener from tiles_geojson import generate_geojsons @@ -661,7 +661,10 @@ def resample( # Now generate a tiles.geojson and extent.geojson and upload to the target prefix. # tile_paths = [to_vsi_path(f) for f in get_aws_files(bucket, target_prefix)] - tile_paths = get_aws_files(bucket, target_prefix) + tile_paths = [ + to_vsi_path(uri) for uri in + get_aws_files(bucket, target_prefix) + ] tiles_output_file = "tiles.geojson" extent_output_file = "extent.geojson" From f662f0e7e8f5819284a91036f6bd661142fae038 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 10 Dec 2024 10:34:11 -0500 Subject: [PATCH 21/28] Fix imports --- batch/python/resample.py | 8 ++++---- batch/python/tiles_geojson.py | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/batch/python/resample.py b/batch/python/resample.py index 8b2d5278..ca04b2dc 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -16,6 +16,10 @@ import psutil import rasterio +from pyproj import CRS, Transformer +from shapely.geometry import MultiPolygon, Polygon, shape +from shapely.ops import unary_union +from typer import Option, run # Use relative imports because these modules get copied into container from aws_utils import ( @@ -27,10 +31,6 @@ from logging_utils import listener_configurer, log_client_configurer, log_listener from tiles_geojson import generate_geojsons -from pyproj import CRS, Transformer -from shapely.geometry import MultiPolygon, Polygon, shape -from shapely.ops import unary_union -from typer import Option, run # Use at least 1 process # Try to get NUM_PROCESSES, if that fails get # CPUs divided by 1.5 diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 573f8b6a..227dff40 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -1,5 +1,4 @@ import json -import subprocess from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Any, Dict, List, Tuple @@ -8,7 +7,7 @@ from shapely.geometry import Polygon from shapely.ops import unary_union -from batch.python.errors import GDALError +from errors import GDALError from gdal_utils import run_gdal_subcommand From 90b12c4cea5feb5bca3c227b55f3b2b73bcc5388 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 10 Dec 2024 13:16:44 -0500 Subject: [PATCH 22/28] switch apply_colormap to use local geojsons code too --- .isort.cfg | 2 +- batch/python/apply_colormap.py | 44 ++++++++++++++++++++++++-------- batch/python/resample.py | 42 ++++++++++++++++-------------- batch/scripts/run_pixetl_prep.sh | 33 ------------------------ 4 files changed, 57 insertions(+), 64 deletions(-) delete mode 100644 batch/scripts/run_pixetl_prep.sh diff --git a/.isort.cfg b/.isort.cfg index bb041810..9aaf0e2d 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -2,4 +2,4 @@ line_length = 88 multi_line_output = 3 include_trailing_comma = True -known_third_party = _pytest,aenum,affine,alembic,asgi_lifespan,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,ee,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,typer +known_third_party = _pytest,aenum,affine,alembic,asgi_lifespan,async_lru,asyncpg,aws_utils,boto3,botocore,click,docker,ee,errors,fastapi,fiona,gdal_utils,geoalchemy2,geojson,gfw_pixetl,gino,gino_starlette,google,httpx,httpx_auth,logger,logging_utils,moto,numpy,orjson,osgeo,pandas,pendulum,pglast,psutil,psycopg2,pydantic,pyproj,pytest,pytest_asyncio,rasterio,shapely,sqlalchemy,sqlalchemy_utils,starlette,tileputty,tiles_geojson,typer diff --git a/batch/python/apply_colormap.py b/batch/python/apply_colormap.py index a21f886a..1cc26ef5 100644 --- a/batch/python/apply_colormap.py +++ b/batch/python/apply_colormap.py @@ -15,11 +15,12 @@ import rasterio # Use relative imports because these modules get copied into container -from aws_utils import get_s3_client, get_s3_path_parts +from aws_utils import get_aws_files, get_s3_client, get_s3_path_parts, upload_s3 from errors import GDALError, SubprocessKilledError -from gdal_utils import from_vsi_path, run_gdal_subcommand +from gdal_utils import from_vsi_path, run_gdal_subcommand, to_vsi_path from logging_utils import listener_configurer, log_client_configurer, log_listener from pydantic import BaseModel, Extra, Field, StrictInt +from tiles_geojson import generate_geojsons from typer import Option, run NUM_PROCESSES = int( @@ -267,16 +268,37 @@ def apply_symbology( for tile_id in executor.map(create_rgb_tile, process_args): logger.log(logging.INFO, f"Finished processing tile {tile_id}") - # Now run pixetl_prep.create_geojsons to generate a tiles.geojson and - # extent.geojson in the target prefix. But that code appends /geotiff - # to the prefix so remove it first - create_geojsons_prefix = target_prefix.split(f"{dataset}/{version}/")[1].replace( - "/geotiff", "" - ) - logger.log(logging.INFO, "Uploading tiles.geojson to {create_geojsons_prefix}") - from gfw_pixetl.pixetl_prep import create_geojsons + # Now generate a tiles.geojson and extent.geojson and upload to the target prefix. + bucket, _ = get_s3_path_parts(source_uri) + tile_paths = [to_vsi_path(uri) for uri in get_aws_files(bucket, target_prefix)] + + tiles_output_file = "tiles.geojson" + extent_output_file = "extent.geojson" + + logger.log(logging.INFO, "Generating geojsons") + tiles_fc, extent_fc = generate_geojsons(tile_paths, min(16, NUM_PROCESSES)) + logger.log(logging.INFO, "Finished generating geojsons") + + tiles_txt = json.dumps(tiles_fc, indent=2) + with open(tiles_output_file, "w") as f: + print(tiles_txt, file=f) - create_geojsons(list(), dataset, version, create_geojsons_prefix, True) + extent_txt = json.dumps(extent_fc, indent=2) + with open(extent_output_file, "w") as f: + print(extent_txt, file=f) + + logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") + upload_s3( + tiles_output_file, + bucket, + os.path.join(target_prefix, "geotiff", tiles_output_file), + ) + upload_s3( + extent_output_file, + bucket, + os.path.join(target_prefix, "geotiff", extent_output_file), + ) + logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") log_queue.put_nowait(None) listener.join() diff --git a/batch/python/resample.py b/batch/python/resample.py index ca04b2dc..0a833c85 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -16,21 +16,24 @@ import psutil import rasterio -from pyproj import CRS, Transformer -from shapely.geometry import MultiPolygon, Polygon, shape -from shapely.ops import unary_union -from typer import Option, run # Use relative imports because these modules get copied into container from aws_utils import ( - exists_in_s3, get_s3_client, get_s3_path_parts, get_aws_files, upload_s3 + exists_in_s3, + get_aws_files, + get_s3_client, + get_s3_path_parts, + upload_s3, ) from errors import SubprocessKilledError from gdal_utils import from_vsi_path, to_vsi_path from gfw_pixetl.grids import grid_factory from logging_utils import listener_configurer, log_client_configurer, log_listener +from pyproj import CRS, Transformer +from shapely.geometry import MultiPolygon, Polygon, shape +from shapely.ops import unary_union from tiles_geojson import generate_geojsons - +from typer import Option, run # Use at least 1 process # Try to get NUM_PROCESSES, if that fails get # CPUs divided by 1.5 @@ -660,21 +663,14 @@ def resample( logger.log(logging.INFO, f"Finished processing tile {tile_id}") # Now generate a tiles.geojson and extent.geojson and upload to the target prefix. - # tile_paths = [to_vsi_path(f) for f in get_aws_files(bucket, target_prefix)] - tile_paths = [ - to_vsi_path(uri) for uri in - get_aws_files(bucket, target_prefix) - ] + tile_paths = [to_vsi_path(uri) for uri in get_aws_files(bucket, target_prefix)] tiles_output_file = "tiles.geojson" extent_output_file = "extent.geojson" - logger.log(logging.INFO, f"Generating geojsons") - tiles_fc, extent_fc = generate_geojsons( - tile_paths, - min(16, NUM_PROCESSES) - ) - logger.log(logging.INFO, f"Finished generating geojsons") + logger.log(logging.INFO, "Generating geojsons") + tiles_fc, extent_fc = generate_geojsons(tile_paths, min(16, NUM_PROCESSES)) + logger.log(logging.INFO, "Finished generating geojsons") tiles_txt = json.dumps(tiles_fc, indent=2) with open(tiles_output_file, "w") as f: @@ -685,8 +681,16 @@ def resample( print(extent_txt, file=f) logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") - upload_s3(tiles_output_file, bucket, os.path.join(target_prefix, "geotiff", tiles_output_file)) - upload_s3(extent_output_file, bucket, os.path.join(target_prefix, "geotiff", extent_output_file)) + upload_s3( + tiles_output_file, + bucket, + os.path.join(target_prefix, "geotiff", tiles_output_file), + ) + upload_s3( + extent_output_file, + bucket, + os.path.join(target_prefix, "geotiff", extent_output_file), + ) logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") log_queue.put_nowait(None) diff --git a/batch/scripts/run_pixetl_prep.sh b/batch/scripts/run_pixetl_prep.sh deleted file mode 100644 index 7dbd5167..00000000 --- a/batch/scripts/run_pixetl_prep.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash - -set -e - -# requires arguments -# -s | --source -# -d | --dataset -# -v | --version - -# optional arguments -# --overwrite -# --prefix - -ME=$(basename "$0") -. get_arguments.sh "$@" - -echo "Fetch remote GeoTIFFs headers to generate tiles.geojson" - -# Build an array of arguments to pass to pixetl_prep -ARG_ARRAY=$SRC -ARG_ARRAY+=("--dataset" "${DATASET}" "--version" "${VERSION}") - - -if [ -n "${PREFIX}" ]; then - ARG_ARRAY+=("--prefix" "${PREFIX}") -fi - -if [ -z "${OVERWRITE}" ]; then - ARG_ARRAY+=("--merge_existing") -fi - -# Run pixetl_prep with the array of arguments -pixetl_prep "${ARG_ARRAY[@]}" \ No newline at end of file From b6ffcf4dc8e2173882c11797a77e133dd43865a1 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 10 Dec 2024 13:19:01 -0500 Subject: [PATCH 23/28] Fix geojsons logging --- batch/python/apply_colormap.py | 10 ++++++---- batch/python/resample.py | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/batch/python/apply_colormap.py b/batch/python/apply_colormap.py index 1cc26ef5..8aae151b 100644 --- a/batch/python/apply_colormap.py +++ b/batch/python/apply_colormap.py @@ -287,18 +287,20 @@ def apply_symbology( with open(extent_output_file, "w") as f: print(extent_txt, file=f) - logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") + geojsons_prefix = os.path.join(target_prefix, "geotiff") + + logger.log(logging.INFO, f"Uploading geojsons to {geojsons_prefix}") upload_s3( tiles_output_file, bucket, - os.path.join(target_prefix, "geotiff", tiles_output_file), + os.path.join(geojsons_prefix, tiles_output_file), ) upload_s3( extent_output_file, bucket, - os.path.join(target_prefix, "geotiff", extent_output_file), + os.path.join(geojsons_prefix, extent_output_file), ) - logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") + logger.log(logging.INFO, f"Finished uploading geojsons to {geojsons_prefix}") log_queue.put_nowait(None) listener.join() diff --git a/batch/python/resample.py b/batch/python/resample.py index 0a833c85..63e752b4 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -680,18 +680,20 @@ def resample( with open(extent_output_file, "w") as f: print(extent_txt, file=f) - logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") + geojsons_prefix = os.path.join(target_prefix, "geotiff") + + logger.log(logging.INFO, f"Uploading geojsons to {geojsons_prefix}") upload_s3( tiles_output_file, bucket, - os.path.join(target_prefix, "geotiff", tiles_output_file), + os.path.join(geojsons_prefix, tiles_output_file), ) upload_s3( extent_output_file, bucket, - os.path.join(target_prefix, "geotiff", extent_output_file), + os.path.join(geojsons_prefix, extent_output_file), ) - logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") + logger.log(logging.INFO, f"Finished uploading geojsons to {geojsons_prefix}") log_queue.put_nowait(None) listener.join() From 0f920e3e3bdc55a9b19578cc8a42b730ec2bb445 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 10 Dec 2024 13:33:22 -0500 Subject: [PATCH 24/28] Remove extra geotiff from geojsons prefix --- batch/python/apply_colormap.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/batch/python/apply_colormap.py b/batch/python/apply_colormap.py index 8aae151b..a21b6649 100644 --- a/batch/python/apply_colormap.py +++ b/batch/python/apply_colormap.py @@ -287,20 +287,18 @@ def apply_symbology( with open(extent_output_file, "w") as f: print(extent_txt, file=f) - geojsons_prefix = os.path.join(target_prefix, "geotiff") - - logger.log(logging.INFO, f"Uploading geojsons to {geojsons_prefix}") + logger.log(logging.INFO, f"Uploading geojsons to {target_prefix}") upload_s3( tiles_output_file, bucket, - os.path.join(geojsons_prefix, tiles_output_file), + os.path.join(target_prefix, tiles_output_file), ) upload_s3( extent_output_file, bucket, - os.path.join(geojsons_prefix, extent_output_file), + os.path.join(target_prefix, extent_output_file), ) - logger.log(logging.INFO, f"Finished uploading geojsons to {geojsons_prefix}") + logger.log(logging.INFO, f"Finished uploading geojsons to {target_prefix}") log_queue.put_nowait(None) listener.join() From 6862049c236fc196c24dcc787238e0caca006877 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Wed, 11 Dec 2024 13:22:21 -0500 Subject: [PATCH 25/28] Don't indent geojsons to minimize changes in format --- batch/python/apply_colormap.py | 4 ++-- batch/python/resample.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/batch/python/apply_colormap.py b/batch/python/apply_colormap.py index a21b6649..00a6bf87 100644 --- a/batch/python/apply_colormap.py +++ b/batch/python/apply_colormap.py @@ -279,11 +279,11 @@ def apply_symbology( tiles_fc, extent_fc = generate_geojsons(tile_paths, min(16, NUM_PROCESSES)) logger.log(logging.INFO, "Finished generating geojsons") - tiles_txt = json.dumps(tiles_fc, indent=2) + tiles_txt = json.dumps(tiles_fc) with open(tiles_output_file, "w") as f: print(tiles_txt, file=f) - extent_txt = json.dumps(extent_fc, indent=2) + extent_txt = json.dumps(extent_fc) with open(extent_output_file, "w") as f: print(extent_txt, file=f) diff --git a/batch/python/resample.py b/batch/python/resample.py index 63e752b4..f065688b 100644 --- a/batch/python/resample.py +++ b/batch/python/resample.py @@ -672,11 +672,11 @@ def resample( tiles_fc, extent_fc = generate_geojsons(tile_paths, min(16, NUM_PROCESSES)) logger.log(logging.INFO, "Finished generating geojsons") - tiles_txt = json.dumps(tiles_fc, indent=2) + tiles_txt = json.dumps(tiles_fc) with open(tiles_output_file, "w") as f: print(tiles_txt, file=f) - extent_txt = json.dumps(extent_fc, indent=2) + extent_txt = json.dumps(extent_fc) with open(extent_output_file, "w") as f: print(extent_txt, file=f) From b70d59d6b36386cfc16cb0280ff8c1aca6b3dabc Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Thu, 12 Dec 2024 14:26:02 -0500 Subject: [PATCH 26/28] Restore removed metadata fields --- batch/python/tiles_geojson.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 227dff40..9004f02a 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -1,4 +1,5 @@ import json +import math from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Any, Dict, List, Tuple @@ -21,6 +22,31 @@ def to_4326(crs: CRS, x: float, y: float) -> Tuple[float, float]: def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, Any]: """Extract necessary metadata from the gdalinfo JSON output.""" corner_coordinates = gdalinfo_json["cornerCoordinates"] + geo_transform = gdalinfo_json["geoTransform"] + + bands = [ + { + "data_type": band.get("type", None), + "no_data": ( + "nan" if ( + band.get("noDataValue", None) is not None + and math.isnan(band.get("noDataValue")) + ) + else band.get("noDataValue", None) + ), + "nbits": band.get("metadata", {}).get("IMAGE_STRUCTURE", {}).get("NBITS", None), + "blockxsize": band.get("block", [None])[0], + "blockysize": band.get("block", [None])[1], + "stats": { + "min": band.get("minimum"), + "max": band.get("maximum"), + "mean": band.get("mean"), + "std_dev": band.get("stdDev"), + } if "minimum" in band and "maximum" in band else None, + "histogram": band.get("histogram", None), + } + for band in gdalinfo_json.get("bands", []) + ] crs: CRS = CRS.from_string(gdalinfo_json["coordinateSystem"]["wkt"]) metadata = { @@ -32,6 +58,14 @@ def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, A *to_4326(crs, *corner_coordinates["lowerLeft"]), *to_4326(crs, *corner_coordinates["upperRight"]), ], + "width": gdalinfo_json["size"][0], + "height": gdalinfo_json["size"][1], + "pixelxsize": geo_transform[1], + "pixelysize": abs(geo_transform[5]), + "crs": gdalinfo_json["coordinateSystem"]["wkt"], + "driver": gdalinfo_json.get("driverShortName", None), + "compression": gdalinfo_json.get("metadata", {}).get("IMAGE_STRUCTURE", {}).get("COMPRESSION", None), + "bands": bands, "name": gdalinfo_json["description"], } From ab99a9796223de4c5820190bc0fe342abb3e8325 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 17 Dec 2024 12:12:45 -0500 Subject: [PATCH 27/28] Run data types through from_gdal_data_type to match pixetl output --- batch/python/gdal_utils.py | 7 +++++++ batch/python/tiles_geojson.py | 8 ++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/batch/python/gdal_utils.py b/batch/python/gdal_utils.py index fa498791..c3b0b8a1 100644 --- a/batch/python/gdal_utils.py +++ b/batch/python/gdal_utils.py @@ -66,3 +66,10 @@ def run_gdal_subcommand(cmd: List[str], env: Optional[Dict] = None) -> Tuple[str raise GDALError(e) return o, e + + +def from_gdal_data_type(data_type: str) -> str: + if data_type == "Byte": + return "uint8" + else: + return data_type.lower() diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 9004f02a..653da89e 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -9,7 +9,7 @@ from shapely.ops import unary_union from errors import GDALError -from gdal_utils import run_gdal_subcommand +from gdal_utils import from_gdal_data_type, run_gdal_subcommand def to_4326(crs: CRS, x: float, y: float) -> Tuple[float, float]: @@ -26,7 +26,11 @@ def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, A bands = [ { - "data_type": band.get("type", None), + "data_type": ( + from_gdal_data_type(band.get("type")) + if band.get("type") is not None + else None + ), "no_data": ( "nan" if ( band.get("noDataValue", None) is not None From bbd833a02468504cd1c024b952934c105ff04814 Mon Sep 17 00:00:00 2001 From: Daniel Mannarino Date: Tue, 17 Dec 2024 12:26:01 -0500 Subject: [PATCH 28/28] Improve note about coords in tiles.geojson --- batch/python/tiles_geojson.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/batch/python/tiles_geojson.py b/batch/python/tiles_geojson.py index 653da89e..ed35360a 100644 --- a/batch/python/tiles_geojson.py +++ b/batch/python/tiles_geojson.py @@ -55,9 +55,10 @@ def extract_metadata_from_gdalinfo(gdalinfo_json: Dict[str, Any]) -> Dict[str, A crs: CRS = CRS.from_string(gdalinfo_json["coordinateSystem"]["wkt"]) metadata = { # NOTE: pixetl seems to always write features in tiles.geojson in - # epsg:4326 coordinates (even when the tiles themselves are - # epsg:3857). Reproduce that behavior for compatibility. If that - # ever changes, remove the call to to_4326 here. + # degrees (when the tiles themselves are epsg:3857 I think + # the units should be meters). Reproduce that behavior for + # backwards compatibility. If it ever changes, remove the call to + # to_4326 here. "extent": [ *to_4326(crs, *corner_coordinates["lowerLeft"]), *to_4326(crs, *corner_coordinates["upperRight"]),