Skip to content

Commit

Permalink
Revert "Got carried away, re-wrote process_active_fire_alerts with lo…
Browse files Browse the repository at this point in the history
…ts of context managers"

This reverts commit 497d402.
  • Loading branch information
dmannarino committed Aug 28, 2024
1 parent b44c882 commit d2c6028
Showing 1 changed file with 68 additions and 46 deletions.
114 changes: 68 additions & 46 deletions src/datapump/sync/fire_alerts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import csv
import io
import os
from tempfile import TemporaryDirectory
import shutil
import zipfile

import requests
import shapefile

from ..clients.aws import get_s3_client
Expand All @@ -27,77 +30,96 @@


def process_active_fire_alerts(alert_type):
nrt_s3_directory = f"nasa_{alert_type.lower()}_fire_alerts/{VERSIONS[alert_type]}/vector/epsg-4326/tsv/near_real_time"
last_saved_date, last_saved_min = _get_last_saved_alert_time(nrt_s3_directory)
LOGGER.info(f"Last saved row datetime: {last_saved_date} {last_saved_min}")

LOGGER.info(f"Retrieving fire alerts for {alert_type}")
rows = []
with shapefile.Reader(ACTIVE_FIRE_ALERTS_7D_SHAPEFILE_URLS[alert_type]) as sf:
LOGGER.info(f"Shapefile has {len(sf)} records")
for shape_record in sf.iterShapeRecords():
row = shape_record.record.as_dict()
row["ACQ_DATE"] = row["ACQ_DATE"].strftime("%Y-%m-%d")
row["LATITUDE"] = shape_record.shape.points[0][1]
row["LONGITUDE"] = shape_record.shape.points[0][0]
# For VIIRS we only want first letter of confidence category,
# to make NRT category same as scientific
if alert_type == "viirs":
row["CONFIDENCE"] = row["CONFIDENCE"][0]
response = requests.get(ACTIVE_FIRE_ALERTS_7D_SHAPEFILE_URLS[alert_type])

if row["ACQ_DATE"] > last_saved_date or (
row["ACQ_DATE"] == last_saved_date and
row["ACQ_TIME"] > last_saved_min
):
rows.append(row)
if response.status_code != 200:
raise Exception(
f"Unable to get active {alert_type} fire alerts, FIRMS returned status code {response.status_code}"
)

LOGGER.info("Successfully downloaded alerts from NASA")

if not rows:
zip = zipfile.ZipFile(io.BytesIO(response.content))
shp_dir = f"{TEMP_DIR}/fire_alerts_{alert_type}"
zip.extractall(shp_dir)

if not os.path.isfile(f"{shp_dir}/{SHP_NAMES[alert_type]}"):
raise Exception(
f"{alert_type} shapefile contained no new records since {last_saved_date} {last_saved_min}"
f"{alert_type} fire alerts zip downloaded, but contains no .shp file!"
)
else:
LOGGER.info(f"Found {len(rows)} new records for {alert_type}")

sf = shapefile.Reader(f"{shp_dir}/{SHP_NAMES[alert_type]}")

rows = []
for shape_record in sf.iterShapeRecords():
row = shape_record.record.as_dict()
row["LATITUDE"] = shape_record.shape.points[0][1]
row["LONGITUDE"] = shape_record.shape.points[0][0]
row["ACQ_DATE"] = row["ACQ_DATE"].strftime("%Y-%m-%d")
rows.append(row)

sorted_rows = sorted(rows, key=lambda row: f"{row['ACQ_DATE']}_{row['ACQ_TIME']}")

first_row = sorted_rows[0]
last_row = sorted_rows[-1]

LOGGER.info(f"First new record datetime: {first_row['ACQ_DATE']} {first_row['ACQ_TIME']}")

fields = [
"latitude",
"longitude",
"acq_date",
"acq_time",
"confidence",
*(BRIGHTNESS_FIELDS[alert_type]),
"frp"
]
fields += BRIGHTNESS_FIELDS[alert_type]
fields.append("frp")

with TemporaryDirectory() as temp_dir:
result_path = f"{temp_dir}/fire_alerts_{alert_type.lower()}.tsv"
result_path = get_tmp_result_path(alert_type)

with open(result_path, "w", newline="") as tsv_file:
tsv_writer = csv.DictWriter(tsv_file, fieldnames=fields, delimiter="\t")
tsv_writer.writeheader()
tsv_file = open(result_path, "w", newline="")
tsv_writer = csv.DictWriter(tsv_file, fieldnames=fields, delimiter="\t")
tsv_writer.writeheader()

for row in sorted_rows:
_write_row(row, fields, tsv_writer)
nrt_s3_directory = f"nasa_{alert_type.lower()}_fire_alerts/{VERSIONS[alert_type]}/vector/epsg-4326/tsv/near_real_time"
last_saved_date, last_saved_min = _get_last_saved_alert_time(nrt_s3_directory)
LOGGER.info(f"Last saved row datetime: {last_saved_date} {last_saved_min}")

LOGGER.info("Successfully wrote TSV")
LOGGER.info(f"Last new record datetime: {last_row['ACQ_DATE']} {last_row['ACQ_TIME']}")
first_row = None
for row in sorted_rows:
# only start once we confirm we're past the overlap with the last dataset
if row["ACQ_DATE"] > last_saved_date or (
row["ACQ_DATE"] == last_saved_date and row["ACQ_TIME"] > last_saved_min
):
if not first_row:
first_row = row
LOGGER.info(
f"First row datetime: {first_row['ACQ_DATE']} {first_row['ACQ_TIME']}"
)

# for VIIRS, we only want first letter of confidence category, to make NRT category same as scientific
if alert_type == "viirs":
row["CONFIDENCE"] = row["CONFIDENCE"][0]

_write_row(row, fields, tsv_writer)

LOGGER.info(f"Last row datetime: {last_row['ACQ_DATE']} {last_row['ACQ_TIME']}")
LOGGER.info("Successfully wrote TSV")

# Upload file to s3
pipeline_key = f"{nrt_s3_directory}/{first_row['ACQ_DATE']}-{first_row['ACQ_TIME']}_{last_row['ACQ_DATE']}-{last_row['ACQ_TIME']}.tsv"
tsv_file.close()

with open(result_path, "rb") as tsv_result:
get_s3_client().upload_fileobj(
tsv_result, Bucket=DATA_LAKE_BUCKET, Key=pipeline_key
)
# upload both files to s3
file_name = f"{first_row['ACQ_DATE']}-{first_row['ACQ_TIME']}_{last_row['ACQ_DATE']}-{last_row['ACQ_TIME']}.tsv"

with open(result_path, "rb") as tsv_result:
pipeline_key = f"{nrt_s3_directory}/{file_name}"
get_s3_client().upload_fileobj(
tsv_result, Bucket=DATA_LAKE_BUCKET, Key=pipeline_key
)

LOGGER.info(f"Successfully uploaded to s3://{DATA_LAKE_BUCKET}/{pipeline_key}")

# remove raw shapefile, since it can be big and hit max lambda storage size of 512 MB
shutil.rmtree(shp_dir)

return (f"s3a://{DATA_LAKE_BUCKET}/{pipeline_key}", last_row["ACQ_DATE"])


Expand Down

0 comments on commit d2c6028

Please sign in to comment.