Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create hourly product in moorings products pipeline #206

Merged
merged 11 commits into from
Feb 24, 2020
114 changes: 83 additions & 31 deletions aodndata/moorings/products_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import re

from owslib.fes import PropertyIsEqualTo, PropertyIsNotEqualTo, And
from owslib.fes import PropertyIsEqualTo, PropertyIsNotEqualTo, And, Or

from aodncore.pipeline import HandlerBase, PipelineFilePublishType, PipelineFile, FileType
from aodncore.pipeline.exceptions import (InvalidFileContentError, InvalidFileNameError, InvalidFileFormatError,
Expand All @@ -11,11 +11,29 @@
from aodncore.util.wfs import ogc_filter_to_string

from aodntools.timeseries_products.aggregated_timeseries import main_aggregator
from aodntools.timeseries_products.hourly_timeseries import hourly_aggregator

from aodndata.moorings.classifiers import MooringsFileClassifier


AGGREGATED_VARIABLE_PATTERN = re.compile(r'FV01_([A-Z0-9-]+)-aggregated')
PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END')
VALID_PRODUCTS = {'aggregated', 'hourly'}


def get_product_type(file_path):
"""Return a product type label for the given file (extracted from the file name).
For example "PSAL-aggregated-timeseries", or "hourly-timeseries".

:param file_path: str path or name of file
:returns: str product type label
"""
file_name = os.path.basename(file_path)
name_match = PRODUCT_TYPE_PATTERN.search(file_name)
if not name_match:
raise InvalidFileNameError(
"Could not extract produt type from '{file_name}'".format(file_name=file_name)
)
return name_match.group(1)


class MooringsProductClassifier(MooringsFileClassifier):
Expand Down Expand Up @@ -92,9 +110,11 @@ def __init__(self, *args, **kwargs):
self.allowed_extensions = ['.json_manifest', '.nc', '.zip']
self.product_site_code = None
self.product_variables = None
self.products_to_create = VALID_PRODUCTS
mhidas marked this conversation as resolved.
Show resolved Hide resolved
self.input_file_collection = None
self.input_file_variables = None
self.excluded_files = dict()
self.product_qc_flags = [[1, 2], [0, 1, 2]]

mhidas marked this conversation as resolved.
Show resolved Hide resolved
def _read_manifest(self):
"""Read the manifest file and extract key parameters for product"""
Expand All @@ -108,6 +128,14 @@ def _read_manifest(self):
raise InvalidFileContentError(
"manifest file '{self.input_file}' missing information (site_code, variables)".format(self=self)
)
if 'products' in manifest:
mhidas marked this conversation as resolved.
Show resolved Hide resolved
invalid_products = set(manifest['products']) - VALID_PRODUCTS
if invalid_products:
raise InvalidFileContentError(
"invalid product(s) {invalid_products} requested "
"in manifest file '{self.input_file}'".format(invalid_products=invalid_products, self=self)
)
self.products_to_create = set(manifest['products'])

def get_wfs_features(self, filter_list, propertyname='*'):
"""Query the file index WFS layer with the given filters and return a list of features.
Expand Down Expand Up @@ -164,34 +192,42 @@ def _get_input_files(self):
# TODO: Replace temp_dir above with cache_dir?

def _get_old_product_files(self):
"""Get a list of the currently published aggregated_timeseries files for the site being processed."""
"""Get a list of the currently published product files for the site being processed."""

filter_list = [PropertyIsEqualTo(propertyname='site_code', literal=self.product_site_code),
mhidas marked this conversation as resolved.
Show resolved Hide resolved
PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries')
Or([PropertyIsEqualTo(propertyname='data_category', literal='aggregated_timeseries'),
PropertyIsEqualTo(propertyname='data_category', literal='hourly_timeseries'),
PropertyIsEqualTo(propertyname='data_category', literal='gridded_timeseries')
])
]
wfs_features = self.get_wfs_features(filter_list, propertyname=['url'])

self.old_product_files = {}
for f in wfs_features:
product_url = f['properties']['url']
var_match = AGGREGATED_VARIABLE_PATTERN.search(product_url)
if not var_match:
raise InvalidFileNameError(
"Could not determine variable of interest for '{product_url}'".format(product_url=product_url)
)
variable_of_interest = var_match.group(1).replace('-', '_')
if variable_of_interest not in self.old_product_files:
self.old_product_files[variable_of_interest] = [product_url]
product_type = get_product_type(product_url)
if product_type not in self.old_product_files:
self.old_product_files[product_type] = [product_url]
else:
self.old_product_files[variable_of_interest].append(product_url)
self.old_product_files[product_type].append(product_url)

self.logger.info(
"Old file for {variable_of_interest}: '{product_url}'".format(variable_of_interest=variable_of_interest,
product_url=product_url)
)
"Old file for {product_type}: '{product_url}'".format(product_type=product_type,
product_url=product_url)
)

def _handle_errors(self, errors):
mhidas marked this conversation as resolved.
Show resolved Hide resolved
"""Keep track of any input files that were excluded from the product and log a brief warning."""
if errors:
self.logger.warning("{n} files were excluded from the product.".format(n=len(errors)))
for f, e in errors.items():
if f not in self.excluded_files:
self.excluded_files[f] = set(e)
else:
self.excluded_files[f].update(e)

def _make_aggregated_timeseries(self):
"""For each variable, generate product and add to file_collection."""
"""For each variable, generate aggregated timeseries product and add to file_collection."""

for var in self.product_variables:
# Filter input_list to the files relevant for this var
Expand All @@ -207,28 +243,41 @@ def _make_aggregated_timeseries(self):
download_url_prefix="https://s3-ap-southeast-2.amazonaws.com/imos-data/",
opendap_url_prefix="http://thredds.aodn.org.au/thredds/dodsC/"
)
if errors:
self.logger.warning("{n} files were excluded from the aggregation.".format(n=len(errors)))
for f, e in errors.items():
if f not in self.excluded_files:
self.excluded_files[f] = set(e)
else:
self.excluded_files[f].update(e)
self._handle_errors(errors)

product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)

self._cleanup_previous_version(product_file.name)

def _make_hourly_timeseries(self):
"""Generate hourly products for the site and add to file_collection."""

# Filter input_list to the files relevant for this var
input_list = [f.local_path for f in self.input_file_collection]
self.logger.info("Creating hourly products from {n} input files".format(n=len(input_list)))

for qc_flags in self.product_qc_flags:

product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, self.temp_dir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume product_url is different given the qc_flags arguments!?


self._handle_errors(errors)

product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)

self._cleanup_previous_version(product_file.name, var)
self._cleanup_previous_version(product_file.name)

ocehugo marked this conversation as resolved.
Show resolved Hide resolved
def _cleanup_previous_version(self, product_name, var):
"""Delete any previously published version(s) of the product for this variable file.
def _cleanup_previous_version(self, product_name):
"""Delete any previously published version(s) of the given product file.
Ignores cases where the previous version has exactly the same file name, as this will simply be overwritten.

:param product_name: Name of the newly generated product
:param var: Name of the variable of interest
"""
for old_product_url in self.old_product_files.get(var, []):
product_type = get_product_type(product_name)
for old_product_url in self.old_product_files.get(product_type, []):
if os.path.basename(old_product_url) != product_name:
mhidas marked this conversation as resolved.
Show resolved Hide resolved
old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True,
late_deletion=True, file_update_callback=self._file_update_callback)
Expand All @@ -253,11 +302,14 @@ def preprocess(self):

# TODO: Run compliance checks and remove non-compliant files from the input list (log them).

self._make_aggregated_timeseries()
if 'aggregated' in self.products_to_create:
self._make_aggregated_timeseries()
if 'hourly' in self.products_to_create:
self._make_hourly_timeseries()

# TODO: Include the list of excluded files as another table in the notification email (instead of the log)
if self.excluded_files:
self.logger.warning("Files exluded from aggregations:")
self.logger.warning("Files exluded from some of the products generated:")
for f, e in self.excluded_files.items():
self.logger.warning("'{f}': {e}".format(f=f, e=list(e)))

Expand Down
14 changes: 11 additions & 3 deletions test_aodndata/moorings/getFeature_old_products.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@
"properties": {
"url": "IMOS/ANMN/NRS/NRSROT/aggregated_timeseries/IMOS_ANMN-NRS_TZ_20081120_NRSROT_FV01_DOX1-2-aggregated-timeseries_END-20190523_C-20190819.nc"
}
},
{
"type": "Feature",
"id": "moorings_all_map.fid--44e8da32_16d0014d48c_7b20",
"geometry": null,
"properties": {
"url": "IMOS/ANMN/NRS/NRSROT/hourly_timeseries/IMOS_ANMN-NRS_STZ_20081120_NRSROT_FV02_hourly-timeseries_END-20190523_C-20191010.nc"
}
}
],
"totalFeatures": 3,
"numberMatched": 3,
"numberReturned": 3,
"totalFeatures": 4,
"numberMatched": 4,
"numberReturned": 4,
"timeStamp": "2019-12-05T06:33:53.380Z",
"crs": null
}
48 changes: 36 additions & 12 deletions test_aodndata/moorings/test_mooringsProductsHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from aodncore.pipeline.storage import get_storage_broker
from aodncore.testlib import HandlerTestCase, make_test_file

from aodndata.moorings.products_handler import MooringsProductsHandler, MooringsProductClassifier
from aodndata.moorings.products_handler import MooringsProductsHandler, MooringsProductClassifier, get_product_type

TEST_ROOT = os.path.dirname(__file__)
GOOD_MANIFEST = os.path.join(TEST_ROOT, 'test_product.json_manifest')
Expand Down Expand Up @@ -54,20 +54,44 @@ def test_good_manifest(self, mock_webfeatureservice):
self.assertCountEqual(INPUT_FILE_COLLECTION.get_attribute_list('dest_path'),
handler.input_file_collection.get_attribute_list('dest_path')
)
self.assertEqual(len(handler.file_collection), 5)

published_files = handler.file_collection.filter_by_attribute_id('publish_type',
PipelineFilePublishType.HARVEST_UPLOAD)
self.assertEqual(len(published_files), 3)
for f in published_files:
self.assertTrue(f.is_harvested and f.is_stored)

deleted_files = handler.file_collection.filter_by_attribute_id('publish_type',
PipelineFilePublishType.DELETE_UNHARVEST)
self.assertEqual(len(deleted_files), 2)
for f in deleted_files:
expected_new_products = {'TEMP-aggregated-timeseries',
'PSAL-aggregated-timeseries',
'CHLF-aggregated-timeseries',
'hourly-timeseries',
'hourly-timeseries-including-non-QC'
mhidas marked this conversation as resolved.
Show resolved Hide resolved
}
expected_deleted_products = {'TEMP-aggregated-timeseries',
'PSAL-aggregated-timeseries',
'hourly-timeseries',
}

self.assertEqual(len(handler.file_collection), len(expected_new_products) + len(expected_deleted_products))
for f in handler.file_collection:
self.assertTrue(f.is_harvested and f.is_stored)

# check new product files
published_files = (handler.file_collection
.filter_by_attribute_id('publish_type', PipelineFilePublishType.HARVEST_UPLOAD)
.get_attribute_list('name')
)
self.assertEqual(len(published_files), len(expected_new_products))
published_products = {get_product_type(f) for f in published_files}
self.assertSetEqual(published_products, expected_new_products)

# check deletion of previous versions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part can also be a separate one

deleted_files = (handler.file_collection
.filter_by_attribute_id('publish_type', PipelineFilePublishType.DELETE_UNHARVEST)
.get_attribute_list('name')
)
self.assertEqual(len(deleted_files), len(expected_deleted_products))
deleted_products = {get_product_type(f) for f in deleted_files}
self.assertSetEqual(deleted_products, expected_deleted_products)

# published and deleted files should never have the same name!
self.assertEqual(set(), set(published_files) & set(deleted_files))

# check input files excluded from the products
self.assertEqual(len(handler.excluded_files), 1)

def test_publish_product_nc(self):
Expand Down
3 changes: 2 additions & 1 deletion test_aodndata/moorings/test_product.json_manifest
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"site_code": "NRSROT",
"variables": ["TEMP", "PSAL", "CHLF"]
"variables": ["TEMP", "PSAL", "CHLF"],
"products":["aggregated", "hourly"]
}