diff --git a/.gitignore b/.gitignore index 890719c2..0086748f 100644 --- a/.gitignore +++ b/.gitignore @@ -103,6 +103,7 @@ ENV/ .idea/vcs.xml .idea/python-aodncore.iml .idea/inspectionProfiles/profiles_settings.xml +.idea/inspectionProfiles/Project_Default.xml # Sensitive or high-churn files: .idea/**/dataSources/ diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index 19f2474e..ea1b0b06 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -33,4 +33,4 @@ - \ No newline at end of file + diff --git a/README.md b/README.md index 01701047..3e1bb32a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,26 @@ -# AODN Core Pipeline +# AODN Core Pipeline for Prefect Flows + +This branch modifies the original python-aodncore and allows the pipeline steps to be used independently of the original state machine. As a result the steps can be reordered, skipped, added to or otherwise combined into adhoc Prefect flows. The changes also make the original state machine loggers available to Prefect so that fine grained output can be viewed in the Prefect Cloud interface. This results in a high degree of flexibility in the implementation of pipeline flows. + +The branch also includes functions for processing files in s3 buckets (aodncore/util/s3_util.py) and some minor changes to use these instead of the local file system. + +A new class, aodncore/pipeline/prefecthandlerbase.py. `PrefectHandlerBase` extends `HandlerBase` and should be used instead of `HandlerBase` when creating pipelines for use in Prefect flows. + +In PrefectHandlerBase the following HandlerBase methods are overridden: + +`_set_input_file_attributes` is overridden to use the s3 ETag for the checksum + +`_init_logger` is overridden to use the Prefect logger to log messages from aodncore to the Prefect Cloud UI. + +`_resolve` is overridden so that resolve runners can move files from the landing s3 to the local filesystem instead of copying. This is currently only available for the `SingleFileResolveRunner`. + +It is not intended for the Prefect pipelines to do the harvesting step. This would be part of additional Prefect flows downstream of the data ingestion. We have avoided the harvesting step by simply using a newly created `NoHarvestRunner` class in `aodncore/pipeline/steps/harvest.py` which will do the file upload without actually harvesting. + +For additional details and notes on future possible work see https://github.com/aodn/backlog/issues/4290. + +# Original AODN Core Pipeline. + +*Note: This branch overrides the behaviours described below. This section is only here for reference to the original AODN core pipeline.* The `aodncore` package provides the core & generic functionality for all data ingestion pipelines at the AODN. This can be customised and extended for specific data-streams in the [`aodndata`](https://github.com/aodn/python-aodndata) package. @@ -9,4 +31,4 @@ The package provides the base class for each pipeline handler, aodncore.pipeline Project documentation is hosted at: https://aodn.github.io/python-aodncore/index.html ## Licensing -This project is licensed under the terms of the GNU GPLv3 license. \ No newline at end of file +This project is licensed under the terms of the GNU GPLv3 license. diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py new file mode 100644 index 00000000..b866a1c8 --- /dev/null +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -0,0 +1,64 @@ +import logging +import os + +from aodncore.pipeline import HandlerBase, FileType +from aodncore.pipeline.exceptions import InvalidInputFileError +from aodncore.pipeline.log import get_pipeline_logger +from aodncore.pipeline.steps import get_resolve_runner +from aodncore.pipeline.steps.resolve import SingleFileResolveRunner +from aodncore.util import ensure_regex_list + + +class PrefectHandlerBase(HandlerBase): + + def _set_input_file_attributes(self): + """ Override HandlerBase""" + + try: + self._file_checksum = self.etag + except (IOError, OSError) as e: + self.logger.exception(e) + raise InvalidInputFileError(e) + self.logger.sysinfo("get_file_checksum -> '{self.file_checksum}'".format(self=self)) + + self._file_basename = os.path.basename(self.input_file) + self.logger.sysinfo("file_basename -> '{self._file_basename}'".format(self=self)) + _, self._file_extension = os.path.splitext(self.input_file) + self.logger.sysinfo("file_extension -> '{self._file_extension}'".format(self=self)) + self._file_type = FileType.get_type_from_extension(self.file_extension) + self.logger.sysinfo("file_type -> {self._file_type}".format(self=self)) + + def init_logger(self, logger_function): + self._init_logger(logger_function) + + def _init_logger(self, logger_function): + + logger = get_pipeline_logger(None, logger_function=logger_function) + + # turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key + liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN') + for lib in ('botocore', 'paramiko', 's3transfer', 'transitions'): + logging.getLogger(lib).setLevel(liblevel) + + self._logger = logger + self._celery_task_id = None + self._celery_task_name = 'NO_TASK' + + def _resolve(self): + resolve_runner = get_resolve_runner(self.input_file, self.collection_dir, self.config, self.logger, + self.resolve_params) + self.logger.sysinfo("get_resolve_runner -> {resolve_runner}".format(resolve_runner=resolve_runner)) + if isinstance(resolve_runner, SingleFileResolveRunner): + resolved_files = resolve_runner.run(move=True) + else: + resolved_files = resolve_runner.run() + + resolved_files.set_file_update_callback(self._file_update_callback) + + # if include_regexes is not defined, default to including all files when setting publish types + include_regexes = self.include_regexes if self.include_regexes else ensure_regex_list([r'.*']) + resolved_files.set_publish_types_from_regexes(include_regexes, self.exclude_regexes, + self.default_addition_publish_type, + self.default_deletion_publish_type) + + self.file_collection.update(resolved_files) diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index 5fb80a5f..6c8023dd 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -147,8 +147,10 @@ 'type': 'array', 'items': {'type': 'string'} }, + 'landing_bucket': {'type': 'string'}, 'archive_uri': {'type': 'string'}, 'error_uri': {'type': 'string'}, + 'optimised_uri': {'type': 'string'}, 'opendap_root': {'type': 'string'}, 'processing_dir': {'type': 'string'}, 'tmp_dir': {'type': 'string'}, @@ -231,15 +233,33 @@ }, 'required': ['incoming_dir', 'logger_name', 'task_namespace'], 'additionalProperties': False + }, + "trigger": { + 'type': 'object', + 'properties': { + "trigger_type": {'$ref': '#/definitions/triggers'}, + "sns_topic_arn": {'type': 'string'}, + "sns_message": {'type': 'string'}, + "prefect_api_key": {'type': 'string'}, + "prefect_queue_id": {'type': 'string'}, + "prefect_deployment_id": {'type': 'string'}, + "prefect_account_id": {'type': 'string'}, + "prefect_workspace_id": {'type': 'string'}, + "end_flow_function": {'type': 'string'} + }, + 'additionalProperties': False } }, - # TODO: add 'harvester' to the required list - once it has been added to the chef build 'required': ['global', 'logging', 'mail', 'talend', 'templating', 'watch', 'harvester'], 'additionalProperties': False, 'definitions': { 'loggingLevel': { 'type': 'string', 'enum': ['CRITICAL', 'FATAL', 'ERROR', 'WARNING', 'WARN', 'INFO', 'SYSINFO', 'DEBUG', 'NOTSET'] + }, + 'triggers': { + 'type': 'string', + 'enum': ['EVENT'] } } } diff --git a/aodncore/pipeline/steps/harvest.py b/aodncore/pipeline/steps/harvest.py index d3d19b98..890f3fa9 100644 --- a/aodncore/pipeline/steps/harvest.py +++ b/aodncore/pipeline/steps/harvest.py @@ -58,6 +58,8 @@ def get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_ return TalendHarvesterRunner(store_runner, harvest_params, tmp_base_dir, config, logger) elif harvester_name == 'csv': return CsvHarvesterRunner(store_runner, harvest_params, config, logger) + elif harvester_name is None: + return NoHarvestRunner(store_runner, config, logger) else: raise InvalidHarvesterError("invalid harvester '{name}'".format(name=harvester_name)) @@ -567,5 +569,22 @@ def build_runsheet(self, pf): return True +class NoHarvestRunner(BaseHarvesterRunner): + """:py:class:`BaseHarvesterRunner` implementation to store files without harvest.""" + + def __init__(self, storage_broker, config, logger): + super().__init__(config, logger) + self.storage_broker = storage_broker + self.config = self._config + self.logger = self._logger + self.unexpected_pipeline_files = [] + + def run(self, pipeline_files): + + files_to_upload = pipeline_files.filter_by_bool_attribute('pending_store_addition') + if files_to_upload: + self.storage_broker.upload(pipeline_files=files_to_upload) + + validate_triggerevent = validate_type(TriggerEvent) validate_harvestermap = validate_type(HarvesterMap) diff --git a/aodncore/pipeline/steps/resolve.py b/aodncore/pipeline/steps/resolve.py index 444e3f2c..d36de5ad 100644 --- a/aodncore/pipeline/steps/resolve.py +++ b/aodncore/pipeline/steps/resolve.py @@ -34,7 +34,8 @@ from ..exceptions import InvalidFileFormatError from ..files import PipelineFile, PipelineFileCollection, RemotePipelineFile from ..schema import validate_json_manifest -from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file +from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file, \ + safe_move_file, is_s3, TemporaryDirectory __all__ = [ 'get_resolve_runner', @@ -98,10 +99,14 @@ def run(self): class SingleFileResolveRunner(BaseResolveRunner): - def run(self): + def run(self, move=False): name = os.path.basename(self.input_file) temp_location = os.path.join(self.output_dir, name) - safe_copy_file(self.input_file, temp_location) + if move: + safe_move_file(self.input_file, temp_location) + else: + safe_copy_file(self.input_file, temp_location) + self._collection.add(temp_location) return self._collection @@ -121,11 +126,21 @@ def run(self): class ZipFileResolveRunner(BaseResolveRunner): def run(self): + + if is_s3(self.input_file): + removeS3Download = True + temp = f'/tmp/{os.path.basename(self.input_file)}' + safe_move_file(self.input_file, temp) + self.input_file = temp + if not is_zip_file(self.input_file): raise InvalidFileFormatError("input_file must be a valid ZIP file") extract_zip(self.input_file, self.output_dir) + if removeS3Download: + os.remove(self.input_file) + for f in list_regular_files(self.output_dir, recursive=True): self._collection.add(f) return self._collection diff --git a/aodncore/pipeline/storage.py b/aodncore/pipeline/storage.py index b920c4fe..5a916928 100644 --- a/aodncore/pipeline/storage.py +++ b/aodncore/pipeline/storage.py @@ -324,7 +324,7 @@ def __init__(self, bucket, prefix): super().__init__() self.bucket = bucket - self.prefix = prefix + self.prefix = prefix.lstrip('/') self.s3_client = boto3.client('s3') diff --git a/aodncore/util/__init__.py b/aodncore/util/__init__.py index bace3327..ca404a15 100644 --- a/aodncore/util/__init__.py +++ b/aodncore/util/__init__.py @@ -13,6 +13,7 @@ from .process import SystemProcess from .wfs import DEFAULT_WFS_VERSION, WfsBroker from .ff import get_field_type, get_tableschema_descriptor +from .s3_util import * __all__ = [ 'CaptureStdIO', diff --git a/aodncore/util/fileops.py b/aodncore/util/fileops.py index c2a54b86..01351858 100644 --- a/aodncore/util/fileops.py +++ b/aodncore/util/fileops.py @@ -18,6 +18,8 @@ import magic import netCDF4 +from aodncore.util.s3_util import * + __all__ = [ 'TemporaryDirectory', 'extract_gzip', @@ -330,7 +332,7 @@ def rm_rf(path): raise # pragma: no cover -def safe_copy_file(source, destination, overwrite=False): +def safe_copy_file(source, destination, overwrite=False, move=False): """Copy a file atomically by copying first to a temporary file in the same directory (and therefore filesystem) as the intended destination, before performing a rename (which is atomic) @@ -339,10 +341,11 @@ def safe_copy_file(source, destination, overwrite=False): :param overwrite: set to True to allow existing destination file to be overwritten :return: None """ - if not os.path.exists(source): + + if not os.path.exists(source) and not is_s3(source): raise OSError("source file '{source}' does not exist".format(source=source)) if source == destination: - raise OSError("source file and destination file can't refer the to same file") + raise OSError("source file and destination file can't refer to the same file") if not overwrite and os.path.exists(destination): raise OSError("destination file '{destination}' already exists".format(destination=destination)) @@ -350,8 +353,11 @@ def safe_copy_file(source, destination, overwrite=False): try: with tempfile.NamedTemporaryFile(mode='wb', dir=os.path.dirname(destination), delete=False) as temp_destination: temp_destination_name = temp_destination.name - with open(source, 'rb') as f: - shutil.copyfileobj(f, temp_destination) + if is_s3(source): + download_object(get_s3_bucket(source), get_s3_key(source), temp_destination_name) + else: + with open(source, 'rb') as f: + shutil.copyfileobj(f, temp_destination) os.rename(temp_destination_name, destination) finally: try: @@ -371,7 +377,10 @@ def safe_move_file(src, dst, overwrite=False): :return: None """ safe_copy_file(src, dst, overwrite) - os.remove(src) + if is_s3(src): + delete_object(get_s3_bucket(src), get_s3_key(src)) + else: + os.remove(src) def validate_dir_writable(path): diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py new file mode 100644 index 00000000..2db2f162 --- /dev/null +++ b/aodncore/util/s3_util.py @@ -0,0 +1,64 @@ +from urllib.parse import urlparse +import boto3 + +__all__ = [ + "move_object", + "list_1000_objects", + "download_object", + "is_s3", + "get_s3_bucket", + "get_s3_key", + "delete_object", + "set_s3" +] + +s3 = None + + +def set_s3(credentials=None): + global s3 + if credentials: + s3 = boto3.resource('s3', aws_session_token=credentials['SessionToken']) + else: + s3 = boto3.resource('s3') + + +def move_object(key, source_bucket, dest_bucket): + # Move objects between buckets + copy_source = { + 'Bucket': source_bucket, + 'Key': key + } + s3.meta.client.copy(copy_source, dest_bucket, key) + delete_object(source_bucket, key) + + return True + + +def list_1000_objects(bucket, prefix): + response = s3.meta.client.list_objects_v2(Bucket=bucket, Prefix=prefix) + objects = response.get('Contents') + return objects + + +def download_object(bucket, key, destination): + s3.Object(bucket, key).download_file(destination) + + +def delete_object(bucket, key): + s3.Object(bucket, key).delete() + + +def is_s3(url): + parsed_url = urlparse(url) + return parsed_url.scheme == 's3' + + +def get_s3_bucket(url): + parsed_url = urlparse(url) + return parsed_url.netloc + + +def get_s3_key(url): + parsed_url = urlparse(url) + return parsed_url.path.strip('/') diff --git a/aodncore_demo.ipynb b/aodncore_demo.ipynb index e08b0141..eb4ecd85 100644 --- a/aodncore_demo.ipynb +++ b/aodncore_demo.ipynb @@ -509,4 +509,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/setup.py b/setup.py index 61a0f888..1354b7ad 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ 'tableschema>=1.19.4', 'transitions>=0.7.1', 'psycopg2-binary==2.8.6', - 'PyYAML==5.3.1' + 'PyYAML>=5.4.1' ] TESTS_REQUIRE = [ diff --git a/test_aodncore/util/test_fileops.py b/test_aodncore/util/test_fileops.py index 9cc9d968..2b98e663 100644 --- a/test_aodncore/util/test_fileops.py +++ b/test_aodncore/util/test_fileops.py @@ -251,7 +251,7 @@ def test_safe_copy_file(self): with open(temp_source_file_path, 'w') as f: f.write(u'foobar') - with self.assertRaisesRegex(OSError, r"source file and destination file can't refer the to same file"): + with self.assertRaisesRegex(OSError, r"source file and destination file can't refer to the same file"): safe_copy_file(temp_source_file_path, temp_source_file_path) safe_copy_file(temp_source_file_path, temp_dest_file_path)