Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create hourly product in moorings products pipeline #206

Merged
merged 11 commits into from
Feb 24, 2020
47 changes: 29 additions & 18 deletions aodndata/moorings/products_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,23 @@
from aodndata.moorings.classifiers import MooringsFileClassifier


AGGREGATED_VARIABLE_PATTERN = re.compile(r'FV01_([A-Z0-9-]+)-aggregated')
PRODUCT_TYPE_PATTERN = re.compile(r'FV0[12]_([^_]+)_END')


def get_product_type(file_path):
"""Return a product type label for the given file (extracted from the file name).
For example "PSAL-aggregated-timeseries", or "hourly-timeseries".

:param file_path: str path or name of file
:returns: str product type label
"""
file_name = os.path.basename(file_path)
name_match = PRODUCT_TYPE_PATTERN.search(file_name)
if not name_match:
raise InvalidFileNameError(
"Could not extract produt type from '{file_name}'".format(file_name=file_name)
)
return name_match.group(1)


class MooringsProductClassifier(MooringsFileClassifier):
Expand Down Expand Up @@ -174,21 +190,16 @@ def _get_old_product_files(self):
self.old_product_files = {}
for f in wfs_features:
product_url = f['properties']['url']
var_match = AGGREGATED_VARIABLE_PATTERN.search(product_url)
if not var_match:
raise InvalidFileNameError(
"Could not determine variable of interest for '{product_url}'".format(product_url=product_url)
)
variable_of_interest = var_match.group(1).replace('-', '_')
if variable_of_interest not in self.old_product_files:
self.old_product_files[variable_of_interest] = [product_url]
product_type = get_product_type(product_url)
if product_type not in self.old_product_files:
self.old_product_files[product_type] = [product_url]
else:
self.old_product_files[variable_of_interest].append(product_url)
self.old_product_files[product_type].append(product_url)

self.logger.info(
"Old file for {variable_of_interest}: '{product_url}'".format(variable_of_interest=variable_of_interest,
product_url=product_url)
)
"Old file for {product_type}: '{product_url}'".format(product_type=product_type,
product_url=product_url)
)

def _make_aggregated_timeseries(self):
"""For each variable, generate product and add to file_collection."""
Expand Down Expand Up @@ -219,16 +230,16 @@ def _make_aggregated_timeseries(self):
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)

self._cleanup_previous_version(product_file.name, var)
self._cleanup_previous_version(product_file.name)

ocehugo marked this conversation as resolved.
Show resolved Hide resolved
def _cleanup_previous_version(self, product_name, var):
"""Delete any previously published version(s) of the product for this variable file.
def _cleanup_previous_version(self, product_name):
"""Delete any previously published version(s) of the given product file.
Ignores cases where the previous version has exactly the same file name, as this will simply be overwritten.

:param product_name: Name of the newly generated product
:param var: Name of the variable of interest
"""
for old_product_url in self.old_product_files.get(var, []):
product_type = get_product_type(product_name)
for old_product_url in self.old_product_files.get(product_type, []):
if os.path.basename(old_product_url) != product_name:
mhidas marked this conversation as resolved.
Show resolved Hide resolved
old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True,
late_deletion=True, file_update_callback=self._file_update_callback)
Expand Down