diff --git a/src/datapump/sync/fire_alerts.py b/src/datapump/sync/fire_alerts.py index ed7a173..fb8f9eb 100644 --- a/src/datapump/sync/fire_alerts.py +++ b/src/datapump/sync/fire_alerts.py @@ -1,7 +1,10 @@ import csv +import io import os -from tempfile import TemporaryDirectory +import shutil +import zipfile +import requests import shapefile from ..clients.aws import get_s3_client @@ -27,77 +30,96 @@ def process_active_fire_alerts(alert_type): - nrt_s3_directory = f"nasa_{alert_type.lower()}_fire_alerts/{VERSIONS[alert_type]}/vector/epsg-4326/tsv/near_real_time" - last_saved_date, last_saved_min = _get_last_saved_alert_time(nrt_s3_directory) - LOGGER.info(f"Last saved row datetime: {last_saved_date} {last_saved_min}") - LOGGER.info(f"Retrieving fire alerts for {alert_type}") - rows = [] - with shapefile.Reader(ACTIVE_FIRE_ALERTS_7D_SHAPEFILE_URLS[alert_type]) as sf: - LOGGER.info(f"Shapefile has {len(sf)} records") - for shape_record in sf.iterShapeRecords(): - row = shape_record.record.as_dict() - row["ACQ_DATE"] = row["ACQ_DATE"].strftime("%Y-%m-%d") - row["LATITUDE"] = shape_record.shape.points[0][1] - row["LONGITUDE"] = shape_record.shape.points[0][0] - # For VIIRS we only want first letter of confidence category, - # to make NRT category same as scientific - if alert_type == "viirs": - row["CONFIDENCE"] = row["CONFIDENCE"][0] + response = requests.get(ACTIVE_FIRE_ALERTS_7D_SHAPEFILE_URLS[alert_type]) - if row["ACQ_DATE"] > last_saved_date or ( - row["ACQ_DATE"] == last_saved_date and - row["ACQ_TIME"] > last_saved_min - ): - rows.append(row) + if response.status_code != 200: + raise Exception( + f"Unable to get active {alert_type} fire alerts, FIRMS returned status code {response.status_code}" + ) + + LOGGER.info("Successfully downloaded alerts from NASA") - if not rows: + zip = zipfile.ZipFile(io.BytesIO(response.content)) + shp_dir = f"{TEMP_DIR}/fire_alerts_{alert_type}" + zip.extractall(shp_dir) + + if not os.path.isfile(f"{shp_dir}/{SHP_NAMES[alert_type]}"): raise Exception( - f"{alert_type} shapefile contained no new records since {last_saved_date} {last_saved_min}" + f"{alert_type} fire alerts zip downloaded, but contains no .shp file!" ) - else: - LOGGER.info(f"Found {len(rows)} new records for {alert_type}") + + sf = shapefile.Reader(f"{shp_dir}/{SHP_NAMES[alert_type]}") + + rows = [] + for shape_record in sf.iterShapeRecords(): + row = shape_record.record.as_dict() + row["LATITUDE"] = shape_record.shape.points[0][1] + row["LONGITUDE"] = shape_record.shape.points[0][0] + row["ACQ_DATE"] = row["ACQ_DATE"].strftime("%Y-%m-%d") + rows.append(row) sorted_rows = sorted(rows, key=lambda row: f"{row['ACQ_DATE']}_{row['ACQ_TIME']}") - first_row = sorted_rows[0] last_row = sorted_rows[-1] - LOGGER.info(f"First new record datetime: {first_row['ACQ_DATE']} {first_row['ACQ_TIME']}") - fields = [ "latitude", "longitude", "acq_date", "acq_time", "confidence", - *(BRIGHTNESS_FIELDS[alert_type]), - "frp" ] + fields += BRIGHTNESS_FIELDS[alert_type] + fields.append("frp") - with TemporaryDirectory() as temp_dir: - result_path = f"{temp_dir}/fire_alerts_{alert_type.lower()}.tsv" + result_path = get_tmp_result_path(alert_type) - with open(result_path, "w", newline="") as tsv_file: - tsv_writer = csv.DictWriter(tsv_file, fieldnames=fields, delimiter="\t") - tsv_writer.writeheader() + tsv_file = open(result_path, "w", newline="") + tsv_writer = csv.DictWriter(tsv_file, fieldnames=fields, delimiter="\t") + tsv_writer.writeheader() - for row in sorted_rows: - _write_row(row, fields, tsv_writer) + nrt_s3_directory = f"nasa_{alert_type.lower()}_fire_alerts/{VERSIONS[alert_type]}/vector/epsg-4326/tsv/near_real_time" + last_saved_date, last_saved_min = _get_last_saved_alert_time(nrt_s3_directory) + LOGGER.info(f"Last saved row datetime: {last_saved_date} {last_saved_min}") - LOGGER.info("Successfully wrote TSV") - LOGGER.info(f"Last new record datetime: {last_row['ACQ_DATE']} {last_row['ACQ_TIME']}") + first_row = None + for row in sorted_rows: + # only start once we confirm we're past the overlap with the last dataset + if row["ACQ_DATE"] > last_saved_date or ( + row["ACQ_DATE"] == last_saved_date and row["ACQ_TIME"] > last_saved_min + ): + if not first_row: + first_row = row + LOGGER.info( + f"First row datetime: {first_row['ACQ_DATE']} {first_row['ACQ_TIME']}" + ) + + # for VIIRS, we only want first letter of confidence category, to make NRT category same as scientific + if alert_type == "viirs": + row["CONFIDENCE"] = row["CONFIDENCE"][0] + + _write_row(row, fields, tsv_writer) + + LOGGER.info(f"Last row datetime: {last_row['ACQ_DATE']} {last_row['ACQ_TIME']}") + LOGGER.info("Successfully wrote TSV") - # Upload file to s3 - pipeline_key = f"{nrt_s3_directory}/{first_row['ACQ_DATE']}-{first_row['ACQ_TIME']}_{last_row['ACQ_DATE']}-{last_row['ACQ_TIME']}.tsv" + tsv_file.close() - with open(result_path, "rb") as tsv_result: - get_s3_client().upload_fileobj( - tsv_result, Bucket=DATA_LAKE_BUCKET, Key=pipeline_key - ) + # upload both files to s3 + file_name = f"{first_row['ACQ_DATE']}-{first_row['ACQ_TIME']}_{last_row['ACQ_DATE']}-{last_row['ACQ_TIME']}.tsv" + + with open(result_path, "rb") as tsv_result: + pipeline_key = f"{nrt_s3_directory}/{file_name}" + get_s3_client().upload_fileobj( + tsv_result, Bucket=DATA_LAKE_BUCKET, Key=pipeline_key + ) LOGGER.info(f"Successfully uploaded to s3://{DATA_LAKE_BUCKET}/{pipeline_key}") + # remove raw shapefile, since it can be big and hit max lambda storage size of 512 MB + shutil.rmtree(shp_dir) + return (f"s3a://{DATA_LAKE_BUCKET}/{pipeline_key}", last_row["ACQ_DATE"])