diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 66d29fd5..808a6f31 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -17,7 +17,6 @@ PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END') -VALID_PRODUCTS = {'aggregated', 'hourly'} DOWNLOAD_URL_PREFIX = "https://s3-ap-southeast-2.amazonaws.com/imos-data/" OPENDAP_URL_PREFIX = "http://thredds.aodn.org.au/thredds/dodsC/" @@ -106,17 +105,17 @@ class MooringsProductsHandler(HandlerBase): """ FILE_INDEX_LAYER = 'imos:moorings_all_map' + VALID_PRODUCTS = {'aggregated', 'hourly'} def __init__(self, *args, **kwargs): super(MooringsProductsHandler, self).__init__(*args, **kwargs) self.allowed_extensions = ['.json_manifest', '.nc', '.zip'] self.product_site_code = None self.product_variables = None - self.products_to_create = VALID_PRODUCTS + self.products_to_create = self.VALID_PRODUCTS self.input_file_collection = None self.input_file_variables = None self.excluded_files = dict() - self.product_qc_flags = [[1, 2], [0, 1, 2]] def _read_manifest(self): """Read the manifest file and extract key parameters for product""" @@ -131,7 +130,7 @@ def _read_manifest(self): "manifest file '{self.input_file}' missing information (site_code, variables)".format(self=self) ) if 'products' in manifest: - invalid_products = set(manifest['products']) - VALID_PRODUCTS + invalid_products = set(manifest['products']) - self.VALID_PRODUCTS if invalid_products: raise InvalidFileContentError( "invalid product(s) {invalid_products} requested " @@ -191,11 +190,12 @@ def _get_input_files(self): def _get_old_product_files(self): """Get a list of the currently published product files for the site being processed.""" + product_data_category = Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'), + PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'), + PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries')] + ) filter_list = [PropertyIsEqualTo(propertyname='site_code', literal=self.product_site_code), - Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'), - PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'), - PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries') - ]) + product_data_category ] wfs_features = self.get_wfs_features(filter_list, propertyname=['url']) @@ -213,7 +213,7 @@ def _get_old_product_files(self): product_url=product_url) ) - def _handle_errors(self, errors): + def _log_excluded_files(self, errors): """Keep track of any input files that were excluded from the product and log a brief warning.""" if errors: self.logger.warning("{n} files were excluded from the product.".format(n=len(errors))) @@ -240,7 +240,7 @@ def _make_aggregated_timeseries(self): download_url_prefix=DOWNLOAD_URL_PREFIX, opendap_url_prefix=OPENDAP_URL_PREFIX ) - self._handle_errors(errors) + self._log_excluded_files(errors) product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD @@ -255,7 +255,9 @@ def _make_hourly_timeseries(self): input_list = [f.local_path for f in self.input_file_collection] self.logger.info("Creating hourly products from {n} input files".format(n=len(input_list))) - for qc_flags in self.product_qc_flags: + # create two versions of the product, one with only good data (flags 1 & 2), + # and one also including non-QC'd data (flag 0) + for qc_flags in ((1, 2), (0, 1, 2)): product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, input_dir=self.temp_dir, @@ -264,7 +266,7 @@ def _make_hourly_timeseries(self): opendap_url_prefix=OPENDAP_URL_PREFIX ) - self._handle_errors(errors) + self._log_excluded_files(errors) product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD @@ -272,15 +274,17 @@ def _make_hourly_timeseries(self): self._cleanup_previous_version(product_file.name) - def _cleanup_previous_version(self, product_name): - """Delete any previously published version(s) of the given product file. + def _cleanup_previous_version(self, product_filename): + """Identify any previously published version(s) of the given product file and mark them for deletion. Ignores cases where the previous version has exactly the same file name, as this will simply be overwritten. - :param product_name: Name of the newly generated product + :param product_filename: File name of the newly generated product """ - product_type = get_product_type(product_name) + product_type = get_product_type(product_filename) for old_product_url in self.old_product_files.get(product_type, []): - if os.path.basename(old_product_url) != product_name: + if os.path.basename(old_product_url) != product_filename: + # Add the previous version as a "late deletion". It will be deleted during the handler's `publish` + # step after (and only if) all new files have been successfully published. old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True, late_deletion=True, file_update_callback=self._file_update_callback) old_file.publish_type = PipelineFilePublishType.DELETE_UNHARVEST diff --git a/test_aodndata/moorings/test_mooringsProductsHandler.py b/test_aodndata/moorings/test_mooringsProductsHandler.py index 46efc8d0..2005c278 100644 --- a/test_aodndata/moorings/test_mooringsProductsHandler.py +++ b/test_aodndata/moorings/test_mooringsProductsHandler.py @@ -11,6 +11,8 @@ from aodndata.moorings.products_handler import MooringsProductsHandler, MooringsProductClassifier, get_product_type + +# Input files used in tests TEST_ROOT = os.path.dirname(__file__) GOOD_MANIFEST = os.path.join(TEST_ROOT, 'test_product.json_manifest') AGGREGATED_ONLY_MANIFEST = os.path.join(TEST_ROOT, 'test_product_aggregated.json_manifest') @@ -20,6 +22,7 @@ 'IMOS_ANMN-NRS_TZ_20181213_NRSROT_FV01_TEMP-aggregated-timeseries_END-20190523_C-20191218.nc' ) +# Load JSON files used to mock WFS responses GETFEATURE_FILE = os.path.join(TEST_ROOT, 'getFeature.json') GETFEATURE_OLD_PRODUCTS_FILE = os.path.join(TEST_ROOT, 'getFeature_old_products.json') GETFEATURE_EMPTY_FILE = os.path.join(TEST_ROOT, 'getFeature_empty.json') @@ -33,6 +36,8 @@ with open(GETFEATURE_EMPTY_FILE) as f: TEST_GETFEATURE_EMPTY_JSON = f.read() +# Create collection of input files for the products +# These will be uploaded to the mocked equivalent of S3 (where the real input files will be) features = json.loads(TEST_GETFEATURE_JSON)['features'] INPUT_FILE_COLLECTION = PipelineFileCollection() for f in features: