Skip to content

Commit

Permalink
changes based on review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mhidas committed Feb 20, 2020
1 parent ecebe8c commit cbe1590
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
38 changes: 21 additions & 17 deletions aodndata/moorings/products_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@


PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END')
VALID_PRODUCTS = {'aggregated', 'hourly'}
DOWNLOAD_URL_PREFIX = "https://s3-ap-southeast-2.amazonaws.com/imos-data/"
OPENDAP_URL_PREFIX = "http://thredds.aodn.org.au/thredds/dodsC/"

Expand Down Expand Up @@ -106,17 +105,17 @@ class MooringsProductsHandler(HandlerBase):
"""

FILE_INDEX_LAYER = 'imos:moorings_all_map'
VALID_PRODUCTS = {'aggregated', 'hourly'}

def __init__(self, *args, **kwargs):
super(MooringsProductsHandler, self).__init__(*args, **kwargs)
self.allowed_extensions = ['.json_manifest', '.nc', '.zip']
self.product_site_code = None
self.product_variables = None
self.products_to_create = VALID_PRODUCTS
self.products_to_create = self.VALID_PRODUCTS
self.input_file_collection = None
self.input_file_variables = None
self.excluded_files = dict()
self.product_qc_flags = [[1, 2], [0, 1, 2]]

def _read_manifest(self):
"""Read the manifest file and extract key parameters for product"""
Expand All @@ -131,7 +130,7 @@ def _read_manifest(self):
"manifest file '{self.input_file}' missing information (site_code, variables)".format(self=self)
)
if 'products' in manifest:
invalid_products = set(manifest['products']) - VALID_PRODUCTS
invalid_products = set(manifest['products']) - self.VALID_PRODUCTS
if invalid_products:
raise InvalidFileContentError(
"invalid product(s) {invalid_products} requested "
Expand Down Expand Up @@ -191,11 +190,12 @@ def _get_input_files(self):
def _get_old_product_files(self):
"""Get a list of the currently published product files for the site being processed."""

product_data_category = Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'),
PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'),
PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries')]
)
filter_list = [PropertyIsEqualTo(propertyname='site_code', literal=self.product_site_code),
Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'),
PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'),
PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries')
])
product_data_category
]
wfs_features = self.get_wfs_features(filter_list, propertyname=['url'])

Expand All @@ -213,7 +213,7 @@ def _get_old_product_files(self):
product_url=product_url)
)

def _handle_errors(self, errors):
def _log_excluded_files(self, errors):
"""Keep track of any input files that were excluded from the product and log a brief warning."""
if errors:
self.logger.warning("{n} files were excluded from the product.".format(n=len(errors)))
Expand All @@ -240,7 +240,7 @@ def _make_aggregated_timeseries(self):
download_url_prefix=DOWNLOAD_URL_PREFIX,
opendap_url_prefix=OPENDAP_URL_PREFIX
)
self._handle_errors(errors)
self._log_excluded_files(errors)

product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
Expand All @@ -255,7 +255,9 @@ def _make_hourly_timeseries(self):
input_list = [f.local_path for f in self.input_file_collection]
self.logger.info("Creating hourly products from {n} input files".format(n=len(input_list)))

for qc_flags in self.product_qc_flags:
# create two versions of the product, one with only good data (flags 1 & 2),
# and one also including non-QC'd data (flag 0)
for qc_flags in ((1, 2), (0, 1, 2)):

product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags,
input_dir=self.temp_dir,
Expand All @@ -264,23 +266,25 @@ def _make_hourly_timeseries(self):
opendap_url_prefix=OPENDAP_URL_PREFIX
)

self._handle_errors(errors)
self._log_excluded_files(errors)

product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)

self._cleanup_previous_version(product_file.name)

def _cleanup_previous_version(self, product_name):
"""Delete any previously published version(s) of the given product file.
def _cleanup_previous_version(self, product_filename):
"""Identify any previously published version(s) of the given product file and mark them for deletion.
Ignores cases where the previous version has exactly the same file name, as this will simply be overwritten.
:param product_name: Name of the newly generated product
:param product_filename: File name of the newly generated product
"""
product_type = get_product_type(product_name)
product_type = get_product_type(product_filename)
for old_product_url in self.old_product_files.get(product_type, []):
if os.path.basename(old_product_url) != product_name:
if os.path.basename(old_product_url) != product_filename:
# Add the previous version as a "late deletion". It will be deleted during the handler's `publish`
# step after (and only if) all new files have been successfully published.
old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True,
late_deletion=True, file_update_callback=self._file_update_callback)
old_file.publish_type = PipelineFilePublishType.DELETE_UNHARVEST
Expand Down
5 changes: 5 additions & 0 deletions test_aodndata/moorings/test_mooringsProductsHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from aodndata.moorings.products_handler import MooringsProductsHandler, MooringsProductClassifier, get_product_type


# Input files used in tests
TEST_ROOT = os.path.dirname(__file__)
GOOD_MANIFEST = os.path.join(TEST_ROOT, 'test_product.json_manifest')
AGGREGATED_ONLY_MANIFEST = os.path.join(TEST_ROOT, 'test_product_aggregated.json_manifest')
Expand All @@ -20,6 +22,7 @@
'IMOS_ANMN-NRS_TZ_20181213_NRSROT_FV01_TEMP-aggregated-timeseries_END-20190523_C-20191218.nc'
)

# Load JSON files used to mock WFS responses
GETFEATURE_FILE = os.path.join(TEST_ROOT, 'getFeature.json')
GETFEATURE_OLD_PRODUCTS_FILE = os.path.join(TEST_ROOT, 'getFeature_old_products.json')
GETFEATURE_EMPTY_FILE = os.path.join(TEST_ROOT, 'getFeature_empty.json')
Expand All @@ -33,6 +36,8 @@
with open(GETFEATURE_EMPTY_FILE) as f:
TEST_GETFEATURE_EMPTY_JSON = f.read()

# Create collection of input files for the products
# These will be uploaded to the mocked equivalent of S3 (where the real input files will be)
features = json.loads(TEST_GETFEATURE_JSON)['features']
INPUT_FILE_COLLECTION = PipelineFileCollection()
for f in features:
Expand Down

0 comments on commit cbe1590

Please sign in to comment.