From 5081691043267574346f08bc1ab2587a9022de31 Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 6 Feb 2020 12:31:51 +1100 Subject: [PATCH 01/10] Match up previous versions of product files based on product type label in file name --- aodndata/moorings/products_handler.py | 47 +++++++++++++++++---------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 7698f934..6e4b3f85 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -15,7 +15,23 @@ from aodndata.moorings.classifiers import MooringsFileClassifier -AGGREGATED_VARIABLE_PATTERN = re.compile(r'FV01_([A-Z0-9-]+)-aggregated') +PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END') + + +def get_product_type(file_path): + """Return a product type label for the given file (extracted from the file name). + For example "PSAL-aggregated-timeseries", or "hourly-timeseries". + + :param file_path: str path or name of file + :returns: str product type label + """ + file_name = os.path.basename(file_path) + name_match = PRODUCT_TYPE_PATTERN.search(file_name) + if not name_match: + raise InvalidFileNameError( + "Could not extract produt type from '{file_name}'".format(file_name=file_name) + ) + return name_match.group(1) class MooringsProductClassifier(MooringsFileClassifier): @@ -174,21 +190,16 @@ def _get_old_product_files(self): self.old_product_files = {} for f in wfs_features: product_url = f['properties']['url'] - var_match = AGGREGATED_VARIABLE_PATTERN.search(product_url) - if not var_match: - raise InvalidFileNameError( - "Could not determine variable of interest for '{product_url}'".format(product_url=product_url) - ) - variable_of_interest = var_match.group(1).replace('-', '_') - if variable_of_interest not in self.old_product_files: - self.old_product_files[variable_of_interest] = [product_url] + product_type = get_product_type(product_url) + if product_type not in self.old_product_files: + self.old_product_files[product_type] = [product_url] else: - self.old_product_files[variable_of_interest].append(product_url) + self.old_product_files[product_type].append(product_url) self.logger.info( - "Old file for {variable_of_interest}: '{product_url}'".format(variable_of_interest=variable_of_interest, - product_url=product_url) - ) + "Old file for {product_type}: '{product_url}'".format(product_type=product_type, + product_url=product_url) + ) def _make_aggregated_timeseries(self): """For each variable, generate product and add to file_collection.""" @@ -219,16 +230,16 @@ def _make_aggregated_timeseries(self): product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD self.file_collection.add(product_file) - self._cleanup_previous_version(product_file.name, var) + self._cleanup_previous_version(product_file.name) - def _cleanup_previous_version(self, product_name, var): - """Delete any previously published version(s) of the product for this variable file. + def _cleanup_previous_version(self, product_name): + """Delete any previously published version(s) of the given product file. Ignores cases where the previous version has exactly the same file name, as this will simply be overwritten. :param product_name: Name of the newly generated product - :param var: Name of the variable of interest """ - for old_product_url in self.old_product_files.get(var, []): + product_type = get_product_type(product_name) + for old_product_url in self.old_product_files.get(product_type, []): if os.path.basename(old_product_url) != product_name: old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True, late_deletion=True, file_update_callback=self._file_update_callback) From 9addd6250decd150605b686e25130c8bf639b2ed Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 6 Feb 2020 16:24:43 +1100 Subject: [PATCH 02/10] WIP: add creation of hourly products in MooringsProductsHandler --- aodndata/moorings/products_handler.py | 39 +++++++++++++++++-- .../moorings/test_mooringsProductsHandler.py | 23 ++++++++--- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 6e4b3f85..7e836100 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -2,7 +2,7 @@ import os import re -from owslib.fes import PropertyIsEqualTo, PropertyIsNotEqualTo, And +from owslib.fes import PropertyIsEqualTo, PropertyIsNotEqualTo, And, Or from aodncore.pipeline import HandlerBase, PipelineFilePublishType, PipelineFile, FileType from aodncore.pipeline.exceptions import (InvalidFileContentError, InvalidFileNameError, InvalidFileFormatError, @@ -11,6 +11,7 @@ from aodncore.util.wfs import ogc_filter_to_string from aodntools.timeseries_products.aggregated_timeseries import main_aggregator +from aodntools.timeseries_products.hourly_timeseries import hourly_aggregator from aodndata.moorings.classifiers import MooringsFileClassifier @@ -111,6 +112,7 @@ def __init__(self, *args, **kwargs): self.input_file_collection = None self.input_file_variables = None self.excluded_files = dict() + self.product_qc_flags = [[1, 2], [0, 1, 2]] def _read_manifest(self): """Read the manifest file and extract key parameters for product""" @@ -180,10 +182,13 @@ def _get_input_files(self): # TODO: Replace temp_dir above with cache_dir? def _get_old_product_files(self): - """Get a list of the currently published aggregated_timeseries files for the site being processed.""" + """Get a list of the currently published product files for the site being processed.""" filter_list = [PropertyIsEqualTo(propertyname='site_code', literal=self.product_site_code), - PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries') + Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'), + PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'), + PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries') + ]) ] wfs_features = self.get_wfs_features(filter_list, propertyname=['url']) @@ -202,7 +207,7 @@ def _get_old_product_files(self): ) def _make_aggregated_timeseries(self): - """For each variable, generate product and add to file_collection.""" + """For each variable, generate aggregated timeseries product and add to file_collection.""" for var in self.product_variables: # Filter input_list to the files relevant for this var @@ -232,6 +237,31 @@ def _make_aggregated_timeseries(self): self._cleanup_previous_version(product_file.name) + def _make_hourly_timeseries(self): + """Generate hourly products for the site and add to file_collection.""" + + # Filter input_list to the files relevant for this var + input_list = [f.local_path for f in self.input_file_collection] + self.logger.info("Creating hourly products from {n} input files".format(n=len(input_list))) + + for qc_flags in self.product_qc_flags: + + product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, self.temp_dir) + + if errors: + self.logger.warning("{n} files were excluded from the aggregation.".format(n=len(errors))) + for f, e in errors.items(): + if f not in self.excluded_files: + self.excluded_files[f] = set(e) + else: + self.excluded_files[f].update(e) + + product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) + product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD + self.file_collection.add(product_file) + + self._cleanup_previous_version(product_file.name) + def _cleanup_previous_version(self, product_name): """Delete any previously published version(s) of the given product file. Ignores cases where the previous version has exactly the same file name, as this will simply be overwritten. @@ -265,6 +295,7 @@ def preprocess(self): # TODO: Run compliance checks and remove non-compliant files from the input list (log them). self._make_aggregated_timeseries() + self._make_hourly_timeseries() # TODO: Include the list of excluded files as another table in the notification email (instead of the log) if self.excluded_files: diff --git a/test_aodndata/moorings/test_mooringsProductsHandler.py b/test_aodndata/moorings/test_mooringsProductsHandler.py index a9c0d342..1ec24881 100644 --- a/test_aodndata/moorings/test_mooringsProductsHandler.py +++ b/test_aodndata/moorings/test_mooringsProductsHandler.py @@ -8,7 +8,7 @@ from aodncore.pipeline.storage import get_storage_broker from aodncore.testlib import HandlerTestCase, make_test_file -from aodndata.moorings.products_handler import MooringsProductsHandler, MooringsProductClassifier +from aodndata.moorings.products_handler import MooringsProductsHandler, MooringsProductClassifier, get_product_type TEST_ROOT = os.path.dirname(__file__) GOOD_MANIFEST = os.path.join(TEST_ROOT, 'test_product.json_manifest') @@ -54,19 +54,32 @@ def test_good_manifest(self, mock_webfeatureservice): self.assertCountEqual(INPUT_FILE_COLLECTION.get_attribute_list('dest_path'), handler.input_file_collection.get_attribute_list('dest_path') ) - self.assertEqual(len(handler.file_collection), 5) - + self.assertEqual(len(handler.file_collection), 7) + + # check new product files + expected_new_products = {'TEMP-aggregated-timeseries', + 'PSAL-aggregated-timeseries', + 'CHLF-aggregated-timeseries', + 'hourly-timeseries', + 'hourly-timeseries-including-non-QC' +} published_files = handler.file_collection.filter_by_attribute_id('publish_type', PipelineFilePublishType.HARVEST_UPLOAD) - self.assertEqual(len(published_files), 3) + self.assertEqual(len(published_files), len(expected_new_products)) for f in published_files: self.assertTrue(f.is_harvested and f.is_stored) + published_products = {get_product_type(f.name) for f in published_files} + self.assertSetEqual(published_products, expected_new_products) + # check deletion of previous versions + expected_deleted_products = {'TEMP-aggregated-timeseries', 'PSAL-aggregated-timeseries'} deleted_files = handler.file_collection.filter_by_attribute_id('publish_type', PipelineFilePublishType.DELETE_UNHARVEST) - self.assertEqual(len(deleted_files), 2) + self.assertEqual(len(deleted_files), len(expected_deleted_products)) for f in deleted_files: self.assertTrue(f.is_harvested and f.is_stored) + deleted_products = {get_product_type(f.name) for f in deleted_files} + self.assertSetEqual(deleted_products, expected_deleted_products) self.assertEqual(len(handler.excluded_files), 1) From d2528efa1c83a25020b8e580aeca1577c903844d Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 6 Feb 2020 16:50:35 +1100 Subject: [PATCH 03/10] refactor and extend test_mooringsProductsHandler.py --- .../moorings/getFeature_old_products.json | 14 +++++-- .../moorings/test_mooringsProductsHandler.py | 39 ++++++++++++------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/test_aodndata/moorings/getFeature_old_products.json b/test_aodndata/moorings/getFeature_old_products.json index 998465d8..c377f024 100644 --- a/test_aodndata/moorings/getFeature_old_products.json +++ b/test_aodndata/moorings/getFeature_old_products.json @@ -24,11 +24,19 @@ "properties": { "url": "IMOS/ANMN/NRS/NRSROT/aggregated_timeseries/IMOS_ANMN-NRS_TZ_20081120_NRSROT_FV01_DOX1-2-aggregated-timeseries_END-20190523_C-20190819.nc" } + }, + { + "type": "Feature", + "id": "moorings_all_map.fid--44e8da32_16d0014d48c_7b20", + "geometry": null, + "properties": { + "url": "IMOS/ANMN/NRS/NRSROT/hourly_timeseries/IMOS_ANMN-NRS_STZ_20081120_NRSROT_FV02_hourly-timeseries_END-20190523_C-20191010.nc" + } } ], - "totalFeatures": 3, - "numberMatched": 3, - "numberReturned": 3, + "totalFeatures": 4, + "numberMatched": 4, + "numberReturned": 4, "timeStamp": "2019-12-05T06:33:53.380Z", "crs": null } diff --git a/test_aodndata/moorings/test_mooringsProductsHandler.py b/test_aodndata/moorings/test_mooringsProductsHandler.py index 1ec24881..17864e29 100644 --- a/test_aodndata/moorings/test_mooringsProductsHandler.py +++ b/test_aodndata/moorings/test_mooringsProductsHandler.py @@ -54,33 +54,44 @@ def test_good_manifest(self, mock_webfeatureservice): self.assertCountEqual(INPUT_FILE_COLLECTION.get_attribute_list('dest_path'), handler.input_file_collection.get_attribute_list('dest_path') ) - self.assertEqual(len(handler.file_collection), 7) - # check new product files expected_new_products = {'TEMP-aggregated-timeseries', 'PSAL-aggregated-timeseries', 'CHLF-aggregated-timeseries', 'hourly-timeseries', 'hourly-timeseries-including-non-QC' -} - published_files = handler.file_collection.filter_by_attribute_id('publish_type', - PipelineFilePublishType.HARVEST_UPLOAD) - self.assertEqual(len(published_files), len(expected_new_products)) - for f in published_files: + } + expected_deleted_products = {'TEMP-aggregated-timeseries', + 'PSAL-aggregated-timeseries', + 'hourly-timeseries', + } + + self.assertEqual(len(handler.file_collection), len(expected_new_products) + len(expected_deleted_products)) + for f in handler.file_collection: self.assertTrue(f.is_harvested and f.is_stored) - published_products = {get_product_type(f.name) for f in published_files} + + # check new product files + published_files = (handler.file_collection + .filter_by_attribute_id('publish_type', PipelineFilePublishType.HARVEST_UPLOAD) + .get_attribute_list('name') + ) + self.assertEqual(len(published_files), len(expected_new_products)) + published_products = {get_product_type(f) for f in published_files} self.assertSetEqual(published_products, expected_new_products) # check deletion of previous versions - expected_deleted_products = {'TEMP-aggregated-timeseries', 'PSAL-aggregated-timeseries'} - deleted_files = handler.file_collection.filter_by_attribute_id('publish_type', - PipelineFilePublishType.DELETE_UNHARVEST) + deleted_files = (handler.file_collection + .filter_by_attribute_id('publish_type', PipelineFilePublishType.DELETE_UNHARVEST) + .get_attribute_list('name') + ) self.assertEqual(len(deleted_files), len(expected_deleted_products)) - for f in deleted_files: - self.assertTrue(f.is_harvested and f.is_stored) - deleted_products = {get_product_type(f.name) for f in deleted_files} + deleted_products = {get_product_type(f) for f in deleted_files} self.assertSetEqual(deleted_products, expected_deleted_products) + # published and deleted files should never have the same name! + self.assertEqual(set(), set(published_files) & set(deleted_files)) + + # check input files excluded from the products self.assertEqual(len(handler.excluded_files), 1) def test_publish_product_nc(self): From 0ca6c8a6e7f1084ab16f75c9013b88cd98b8637a Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 6 Feb 2020 17:05:00 +1100 Subject: [PATCH 04/10] factor out handling of errors after each product file is generated --- aodndata/moorings/products_handler.py | 28 +++++++++++++-------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 7e836100..6940e84e 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -206,6 +206,16 @@ def _get_old_product_files(self): product_url=product_url) ) + def _handle_errors(self, errors): + """Keep track of any input files that were excluded from the product and log a brief warning.""" + if errors: + self.logger.warning("{n} files were excluded from the product.".format(n=len(errors))) + for f, e in errors.items(): + if f not in self.excluded_files: + self.excluded_files[f] = set(e) + else: + self.excluded_files[f].update(e) + def _make_aggregated_timeseries(self): """For each variable, generate aggregated timeseries product and add to file_collection.""" @@ -223,13 +233,7 @@ def _make_aggregated_timeseries(self): download_url_prefix="https://s3-ap-southeast-2.amazonaws.com/imos-data/", opendap_url_prefix="http://thredds.aodn.org.au/thredds/dodsC/" ) - if errors: - self.logger.warning("{n} files were excluded from the aggregation.".format(n=len(errors))) - for f, e in errors.items(): - if f not in self.excluded_files: - self.excluded_files[f] = set(e) - else: - self.excluded_files[f].update(e) + self._handle_errors(errors) product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD @@ -248,13 +252,7 @@ def _make_hourly_timeseries(self): product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, self.temp_dir) - if errors: - self.logger.warning("{n} files were excluded from the aggregation.".format(n=len(errors))) - for f, e in errors.items(): - if f not in self.excluded_files: - self.excluded_files[f] = set(e) - else: - self.excluded_files[f].update(e) + self._handle_errors(errors) product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD @@ -299,7 +297,7 @@ def preprocess(self): # TODO: Include the list of excluded files as another table in the notification email (instead of the log) if self.excluded_files: - self.logger.warning("Files exluded from aggregations:") + self.logger.warning("Files exluded from some of the products generated:") for f, e in self.excluded_files.items(): self.logger.warning("'{f}': {e}".format(f=f, e=list(e))) From afb18a2cf8f77e0f8d3a1e3894dde6dfff140df8 Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 6 Feb 2020 17:20:18 +1100 Subject: [PATCH 05/10] add option to select which products to generate (default all) --- aodndata/moorings/products_handler.py | 16 ++++++++++++++-- .../moorings/test_product.json_manifest | 3 ++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 6940e84e..829023d7 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -17,6 +17,7 @@ PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END') +VALID_PRODUCTS = {'aggregated', 'hourly'} def get_product_type(file_path): @@ -109,6 +110,7 @@ def __init__(self, *args, **kwargs): self.allowed_extensions = ['.json_manifest', '.nc', '.zip'] self.product_site_code = None self.product_variables = None + self.products_to_create = VALID_PRODUCTS self.input_file_collection = None self.input_file_variables = None self.excluded_files = dict() @@ -126,6 +128,14 @@ def _read_manifest(self): raise InvalidFileContentError( "manifest file '{self.input_file}' missing information (site_code, variables)".format(self=self) ) + if 'products' in manifest: + invalid_products = set(manifest['products']) - VALID_PRODUCTS + if invalid_products: + raise InvalidFileContentError( + "invalid product(s) {invalid_products} requested " + "in manifest file '{self.input_file}'".format(invalid_products=invalid_products, self=self) + ) + self.products_to_create = set(manifest['products']) def get_wfs_features(self, filter_list, propertyname='*'): """Query the file index WFS layer with the given filters and return a list of features. @@ -292,8 +302,10 @@ def preprocess(self): # TODO: Run compliance checks and remove non-compliant files from the input list (log them). - self._make_aggregated_timeseries() - self._make_hourly_timeseries() + if 'aggregated' in self.products_to_create: + self._make_aggregated_timeseries() + if 'hourly' in self.products_to_create: + self._make_hourly_timeseries() # TODO: Include the list of excluded files as another table in the notification email (instead of the log) if self.excluded_files: diff --git a/test_aodndata/moorings/test_product.json_manifest b/test_aodndata/moorings/test_product.json_manifest index c0ce9f46..8e038654 100644 --- a/test_aodndata/moorings/test_product.json_manifest +++ b/test_aodndata/moorings/test_product.json_manifest @@ -1,4 +1,5 @@ { "site_code": "NRSROT", - "variables": ["TEMP", "PSAL", "CHLF"] + "variables": ["TEMP", "PSAL", "CHLF"], + "products":["aggregated", "hourly"] } From 9356aa25d0242292670cd63efc75a81dba3dc649 Mon Sep 17 00:00:00 2001 From: mhidas Date: Wed, 19 Feb 2020 10:32:14 +1100 Subject: [PATCH 06/10] remove obsolete comment --- aodndata/moorings/products_handler.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 829023d7..0ab9e042 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -146,11 +146,6 @@ def get_wfs_features(self, filter_list, propertyname='*'): """ ogc_filter = ogc_filter_to_string(And(filter_list)) - - # Note I need to access _wfs_broker to be able to use query_urls_for_layer() with a filter, - # as the corresponding StateQuery method doesn't accept additional kwargs. - # TODO: find out why this calls getCapabilities twice (and takes 40s even when response mocked with httpretty) - # TODO: replace ._wfs_broker.getfeature_dict() with .getfeature_dict() once aodncore has been updated wfs_response = self.state_query.query_wfs_getfeature_dict(typename=[self.FILE_INDEX_LAYER], filter=ogc_filter, propertyname=propertyname From b351b033f2751e0ea8abf0b9f115962a5a50ccd3 Mon Sep 17 00:00:00 2001 From: mhidas Date: Wed, 19 Feb 2020 11:28:11 +1100 Subject: [PATCH 07/10] add more test cases for MooringsProductsHandler --- test_aodndata/moorings/getFeature_empty.json | 9 +++++ .../moorings/test_mooringsProductsHandler.py | 40 ++++++++++++++++--- .../test_product_aggregated.json_manifest | 5 +++ .../test_product_bad_var.json_manifest | 4 ++ 4 files changed, 53 insertions(+), 5 deletions(-) create mode 100644 test_aodndata/moorings/getFeature_empty.json create mode 100644 test_aodndata/moorings/test_product_aggregated.json_manifest create mode 100644 test_aodndata/moorings/test_product_bad_var.json_manifest diff --git a/test_aodndata/moorings/getFeature_empty.json b/test_aodndata/moorings/getFeature_empty.json new file mode 100644 index 00000000..b51fa91c --- /dev/null +++ b/test_aodndata/moorings/getFeature_empty.json @@ -0,0 +1,9 @@ +{ + "type": "FeatureCollection", + "features": [], + "totalFeatures": 0, + "numberMatched": 0, + "numberReturned": 0, + "timeStamp": "2020-02-19T00:00:00Z", + "crs": null +} diff --git a/test_aodndata/moorings/test_mooringsProductsHandler.py b/test_aodndata/moorings/test_mooringsProductsHandler.py index 17864e29..46efc8d0 100644 --- a/test_aodndata/moorings/test_mooringsProductsHandler.py +++ b/test_aodndata/moorings/test_mooringsProductsHandler.py @@ -5,6 +5,7 @@ from aodncore.pipeline import (PipelineFile, PipelineFileCollection, PipelineFilePublishType) +from aodncore.pipeline.exceptions import InvalidFileContentError from aodncore.pipeline.storage import get_storage_broker from aodncore.testlib import HandlerTestCase, make_test_file @@ -12,6 +13,8 @@ TEST_ROOT = os.path.dirname(__file__) GOOD_MANIFEST = os.path.join(TEST_ROOT, 'test_product.json_manifest') +AGGREGATED_ONLY_MANIFEST = os.path.join(TEST_ROOT, 'test_product_aggregated.json_manifest') +BAD_VAR_MANIFEST = os.path.join(TEST_ROOT, 'test_product_bad_var.json_manifest') PRODUCT_FILE = os.path.join( TEST_ROOT, 'IMOS_ANMN-NRS_TZ_20181213_NRSROT_FV01_TEMP-aggregated-timeseries_END-20190523_C-20191218.nc' @@ -19,6 +22,7 @@ GETFEATURE_FILE = os.path.join(TEST_ROOT, 'getFeature.json') GETFEATURE_OLD_PRODUCTS_FILE = os.path.join(TEST_ROOT, 'getFeature_old_products.json') +GETFEATURE_EMPTY_FILE = os.path.join(TEST_ROOT, 'getFeature_empty.json') with open(GETFEATURE_FILE) as f: TEST_GETFEATURE_JSON = f.read() @@ -26,6 +30,9 @@ with open(GETFEATURE_OLD_PRODUCTS_FILE) as f: TEST_GETFEATURE_OLD_PRODUCTS_JSON = f.read() +with open(GETFEATURE_EMPTY_FILE) as f: + TEST_GETFEATURE_EMPTY_JSON = f.read() + features = json.loads(TEST_GETFEATURE_JSON)['features'] INPUT_FILE_COLLECTION = PipelineFileCollection() for f in features: @@ -40,16 +47,15 @@ class TestMooringsProductsHandler(HandlerTestCase): def setUp(self): self.handler_class = MooringsProductsHandler - super(TestMooringsProductsHandler, self).setUp() + upload_broker = get_storage_broker(self.config.pipeline_config['global']['upload_uri']) + upload_broker.upload(INPUT_FILE_COLLECTION) + super().setUp() @patch('aodncore.util.wfs.WebFeatureService') - def test_good_manifest(self, mock_webfeatureservice): + def test_all_products(self, mock_webfeatureservice): mock_webfeatureservice().getfeature().getvalue.side_effect = [TEST_GETFEATURE_JSON, TEST_GETFEATURE_OLD_PRODUCTS_JSON] - upload_broker = get_storage_broker(self.config.pipeline_config['global']['upload_uri']) - upload_broker.upload(INPUT_FILE_COLLECTION) - handler = self.run_handler(GOOD_MANIFEST) self.assertCountEqual(INPUT_FILE_COLLECTION.get_attribute_list('dest_path'), handler.input_file_collection.get_attribute_list('dest_path') @@ -94,6 +100,30 @@ def test_good_manifest(self, mock_webfeatureservice): # check input files excluded from the products self.assertEqual(len(handler.excluded_files), 1) + @patch('aodncore.util.wfs.WebFeatureService') + def test_aggregated_only_no_old_files(self, mock_webfeatureservice): + mock_webfeatureservice().getfeature().getvalue.side_effect = [TEST_GETFEATURE_JSON, + TEST_GETFEATURE_EMPTY_JSON] + + handler = self.run_handler(AGGREGATED_ONLY_MANIFEST) + + expected_new_products = {'TEMP-aggregated-timeseries', + 'PSAL-aggregated-timeseries', + 'CHLF-aggregated-timeseries', + } + + self.assertEqual(len(handler.file_collection), len(expected_new_products)) + for f in handler.file_collection: + self.assertTrue(f.is_harvested and f.is_stored) + self.assertIs(f.publish_type, PipelineFilePublishType.HARVEST_UPLOAD) + self.assertIn(get_product_type(f.name), expected_new_products) + + @patch('aodncore.util.wfs.WebFeatureService') + def test_bad_var(self, mock_webfeatureservice): + mock_webfeatureservice().getfeature().getvalue.side_effect = [TEST_GETFEATURE_JSON, + TEST_GETFEATURE_OLD_PRODUCTS_JSON] + self.run_handler_with_exception(InvalidFileContentError, BAD_VAR_MANIFEST) + def test_publish_product_nc(self): handler = self.run_handler(PRODUCT_FILE) self.assertEqual(len(handler.file_collection), 1) diff --git a/test_aodndata/moorings/test_product_aggregated.json_manifest b/test_aodndata/moorings/test_product_aggregated.json_manifest new file mode 100644 index 00000000..4960692d --- /dev/null +++ b/test_aodndata/moorings/test_product_aggregated.json_manifest @@ -0,0 +1,5 @@ +{ + "site_code": "NRSROT", + "variables": ["TEMP", "PSAL", "CHLF"], + "products":["aggregated"] +} diff --git a/test_aodndata/moorings/test_product_bad_var.json_manifest b/test_aodndata/moorings/test_product_bad_var.json_manifest new file mode 100644 index 00000000..28b0d648 --- /dev/null +++ b/test_aodndata/moorings/test_product_bad_var.json_manifest @@ -0,0 +1,4 @@ +{ + "site_code": "NRSROT", + "variables": ["TEMP", "PSAL", "CHLF", "DOXY"] +} From 0d13ff4da045886d0feb4ee304bc7759b76dc27b Mon Sep 17 00:00:00 2001 From: mhidas Date: Wed, 19 Feb 2020 11:37:18 +1100 Subject: [PATCH 08/10] add input/output dir and URL prefix args to hourly_aggregator call (and update required version of aodntools) --- aodndata/moorings/products_handler.py | 13 ++++++++++--- setup.py | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 0ab9e042..66d29fd5 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -18,6 +18,8 @@ PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END') VALID_PRODUCTS = {'aggregated', 'hourly'} +DOWNLOAD_URL_PREFIX = "https://s3-ap-southeast-2.amazonaws.com/imos-data/" +OPENDAP_URL_PREFIX = "http://thredds.aodn.org.au/thredds/dodsC/" def get_product_type(file_path): @@ -235,8 +237,8 @@ def _make_aggregated_timeseries(self): product_url, errors = main_aggregator(input_list, var, self.product_site_code, input_dir=self.temp_dir, output_dir=self.products_dir, - download_url_prefix="https://s3-ap-southeast-2.amazonaws.com/imos-data/", - opendap_url_prefix="http://thredds.aodn.org.au/thredds/dodsC/" + download_url_prefix=DOWNLOAD_URL_PREFIX, + opendap_url_prefix=OPENDAP_URL_PREFIX ) self._handle_errors(errors) @@ -255,7 +257,12 @@ def _make_hourly_timeseries(self): for qc_flags in self.product_qc_flags: - product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, self.temp_dir) + product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, + input_dir=self.temp_dir, + output_dir=self.products_dir, + download_url_prefix=DOWNLOAD_URL_PREFIX, + opendap_url_prefix=OPENDAP_URL_PREFIX + ) self._handle_errors(errors) diff --git a/setup.py b/setup.py index f1fe4b5d..255972b2 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ INSTALL_REQUIRES = [ 'aodncore>=1.0.0', - 'aodntools>=1.0.0', + 'aodntools>=1.1.0', 'cc-plugin-imos>=1.3.0', 'fiona>=1.8.8', 'matplotlib>=3.0.3', From ecebe8c37156fc51bcaaf087a5b7f38aff43802a Mon Sep 17 00:00:00 2001 From: mhidas Date: Wed, 19 Feb 2020 11:53:39 +1100 Subject: [PATCH 09/10] Bump version to 1.1.0 --- .bumpversion.cfg | 3 +-- aodndata/version.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 8ff75ea0..0d439e5e 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.0.14 +current_version = 1.1.0 commit = False tag = False tag_name = {new_version} @@ -25,4 +25,3 @@ replace = __version__ = '{new_version}' [bumpversion:file:setup.py] search = version='{current_version}', replace = version='{new_version}', - diff --git a/aodndata/version.py b/aodndata/version.py index cc08f086..1a72d32e 100644 --- a/aodndata/version.py +++ b/aodndata/version.py @@ -1 +1 @@ -__version__ = '1.0.14' +__version__ = '1.1.0' diff --git a/setup.py b/setup.py index 255972b2..e5a40b97 100644 --- a/setup.py +++ b/setup.py @@ -80,7 +80,7 @@ setup( name=PACKAGE_NAME, - version='1.0.14', + version='1.1.0', packages=find_packages(exclude=PACKAGE_EXCLUDES), url='https://github.com/aodn', license='GPLv3', From cbe159009e56a0e3e732bc975602d5dee359afb8 Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 20 Feb 2020 15:23:57 +1100 Subject: [PATCH 10/10] changes based on review comments --- aodndata/moorings/products_handler.py | 38 ++++++++++--------- .../moorings/test_mooringsProductsHandler.py | 5 +++ 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 66d29fd5..808a6f31 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -17,7 +17,6 @@ PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END') -VALID_PRODUCTS = {'aggregated', 'hourly'} DOWNLOAD_URL_PREFIX = "https://s3-ap-southeast-2.amazonaws.com/imos-data/" OPENDAP_URL_PREFIX = "http://thredds.aodn.org.au/thredds/dodsC/" @@ -106,17 +105,17 @@ class MooringsProductsHandler(HandlerBase): """ FILE_INDEX_LAYER = 'imos:moorings_all_map' + VALID_PRODUCTS = {'aggregated', 'hourly'} def __init__(self, *args, **kwargs): super(MooringsProductsHandler, self).__init__(*args, **kwargs) self.allowed_extensions = ['.json_manifest', '.nc', '.zip'] self.product_site_code = None self.product_variables = None - self.products_to_create = VALID_PRODUCTS + self.products_to_create = self.VALID_PRODUCTS self.input_file_collection = None self.input_file_variables = None self.excluded_files = dict() - self.product_qc_flags = [[1, 2], [0, 1, 2]] def _read_manifest(self): """Read the manifest file and extract key parameters for product""" @@ -131,7 +130,7 @@ def _read_manifest(self): "manifest file '{self.input_file}' missing information (site_code, variables)".format(self=self) ) if 'products' in manifest: - invalid_products = set(manifest['products']) - VALID_PRODUCTS + invalid_products = set(manifest['products']) - self.VALID_PRODUCTS if invalid_products: raise InvalidFileContentError( "invalid product(s) {invalid_products} requested " @@ -191,11 +190,12 @@ def _get_input_files(self): def _get_old_product_files(self): """Get a list of the currently published product files for the site being processed.""" + product_data_category = Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'), + PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'), + PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries')] + ) filter_list = [PropertyIsEqualTo(propertyname='site_code', literal=self.product_site_code), - Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'), - PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'), - PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries') - ]) + product_data_category ] wfs_features = self.get_wfs_features(filter_list, propertyname=['url']) @@ -213,7 +213,7 @@ def _get_old_product_files(self): product_url=product_url) ) - def _handle_errors(self, errors): + def _log_excluded_files(self, errors): """Keep track of any input files that were excluded from the product and log a brief warning.""" if errors: self.logger.warning("{n} files were excluded from the product.".format(n=len(errors))) @@ -240,7 +240,7 @@ def _make_aggregated_timeseries(self): download_url_prefix=DOWNLOAD_URL_PREFIX, opendap_url_prefix=OPENDAP_URL_PREFIX ) - self._handle_errors(errors) + self._log_excluded_files(errors) product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD @@ -255,7 +255,9 @@ def _make_hourly_timeseries(self): input_list = [f.local_path for f in self.input_file_collection] self.logger.info("Creating hourly products from {n} input files".format(n=len(input_list))) - for qc_flags in self.product_qc_flags: + # create two versions of the product, one with only good data (flags 1 & 2), + # and one also including non-QC'd data (flag 0) + for qc_flags in ((1, 2), (0, 1, 2)): product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, input_dir=self.temp_dir, @@ -264,7 +266,7 @@ def _make_hourly_timeseries(self): opendap_url_prefix=OPENDAP_URL_PREFIX ) - self._handle_errors(errors) + self._log_excluded_files(errors) product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD @@ -272,15 +274,17 @@ def _make_hourly_timeseries(self): self._cleanup_previous_version(product_file.name) - def _cleanup_previous_version(self, product_name): - """Delete any previously published version(s) of the given product file. + def _cleanup_previous_version(self, product_filename): + """Identify any previously published version(s) of the given product file and mark them for deletion. Ignores cases where the previous version has exactly the same file name, as this will simply be overwritten. - :param product_name: Name of the newly generated product + :param product_filename: File name of the newly generated product """ - product_type = get_product_type(product_name) + product_type = get_product_type(product_filename) for old_product_url in self.old_product_files.get(product_type, []): - if os.path.basename(old_product_url) != product_name: + if os.path.basename(old_product_url) != product_filename: + # Add the previous version as a "late deletion". It will be deleted during the handler's `publish` + # step after (and only if) all new files have been successfully published. old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True, late_deletion=True, file_update_callback=self._file_update_callback) old_file.publish_type = PipelineFilePublishType.DELETE_UNHARVEST diff --git a/test_aodndata/moorings/test_mooringsProductsHandler.py b/test_aodndata/moorings/test_mooringsProductsHandler.py index 46efc8d0..2005c278 100644 --- a/test_aodndata/moorings/test_mooringsProductsHandler.py +++ b/test_aodndata/moorings/test_mooringsProductsHandler.py @@ -11,6 +11,8 @@ from aodndata.moorings.products_handler import MooringsProductsHandler, MooringsProductClassifier, get_product_type + +# Input files used in tests TEST_ROOT = os.path.dirname(__file__) GOOD_MANIFEST = os.path.join(TEST_ROOT, 'test_product.json_manifest') AGGREGATED_ONLY_MANIFEST = os.path.join(TEST_ROOT, 'test_product_aggregated.json_manifest') @@ -20,6 +22,7 @@ 'IMOS_ANMN-NRS_TZ_20181213_NRSROT_FV01_TEMP-aggregated-timeseries_END-20190523_C-20191218.nc' ) +# Load JSON files used to mock WFS responses GETFEATURE_FILE = os.path.join(TEST_ROOT, 'getFeature.json') GETFEATURE_OLD_PRODUCTS_FILE = os.path.join(TEST_ROOT, 'getFeature_old_products.json') GETFEATURE_EMPTY_FILE = os.path.join(TEST_ROOT, 'getFeature_empty.json') @@ -33,6 +36,8 @@ with open(GETFEATURE_EMPTY_FILE) as f: TEST_GETFEATURE_EMPTY_JSON = f.read() +# Create collection of input files for the products +# These will be uploaded to the mocked equivalent of S3 (where the real input files will be) features = json.loads(TEST_GETFEATURE_JSON)['features'] INPUT_FILE_COLLECTION = PipelineFileCollection() for f in features: