Skip to content

Commit

Permalink
factor out handling of errors after each product file is generated
Browse files Browse the repository at this point in the history
  • Loading branch information
mhidas committed Feb 6, 2020
1 parent 06dc00d commit d79a6b1
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions aodndata/moorings/products_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ def _get_old_product_files(self):
product_url=product_url)
)

def _handle_errors(self, errors):
"""Keep track of any input files that were excluded from the product and log a brief warning."""
if errors:
self.logger.warning("{n} files were excluded from the product.".format(n=len(errors)))
for f, e in errors.items():
if f not in self.excluded_files:
self.excluded_files[f] = set(e)
else:
self.excluded_files[f].update(e)

def _make_aggregated_timeseries(self):
"""For each variable, generate aggregated timeseries product and add to file_collection."""

Expand All @@ -223,13 +233,7 @@ def _make_aggregated_timeseries(self):
download_url_prefix="https://s3-ap-southeast-2.amazonaws.com/imos-data/",
opendap_url_prefix="http://thredds.aodn.org.au/thredds/dodsC/"
)
if errors:
self.logger.warning("{n} files were excluded from the aggregation.".format(n=len(errors)))
for f, e in errors.items():
if f not in self.excluded_files:
self.excluded_files[f] = set(e)
else:
self.excluded_files[f].update(e)
self._handle_errors(errors)

product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
Expand All @@ -248,13 +252,7 @@ def _make_hourly_timeseries(self):

product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, self.temp_dir)

if errors:
self.logger.warning("{n} files were excluded from the aggregation.".format(n=len(errors)))
for f, e in errors.items():
if f not in self.excluded_files:
self.excluded_files[f] = set(e)
else:
self.excluded_files[f].update(e)
self._handle_errors(errors)

product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
Expand Down Expand Up @@ -299,7 +297,7 @@ def preprocess(self):

# TODO: Include the list of excluded files as another table in the notification email (instead of the log)
if self.excluded_files:
self.logger.warning("Files exluded from aggregations:")
self.logger.warning("Files exluded from some of the products generated:")
for f, e in self.excluded_files.items():
self.logger.warning("'{f}': {e}".format(f=f, e=list(e)))

Expand Down

0 comments on commit d79a6b1

Please sign in to comment.