From a7be4e5bc1f57d11994ac26227dacb015f5bd0c8 Mon Sep 17 00:00:00 2001 From: cmrose Date: Thu, 8 Sep 2022 18:51:38 +1000 Subject: [PATCH 01/27] handle errors --- .idea/inspectionProfiles/Project_Default.xml | 6 ++- aodncore/pipeline/prefecthandlerbase.py | 45 ++++++++++++++++++ aodncore/pipeline/schema.py | 1 + aodncore/pipeline/steps/harvest.py | 19 ++++++++ aodncore/util/fileops.py | 16 +++++-- aodncore/util/s3_util.py | 49 ++++++++++++++++++++ 6 files changed, 130 insertions(+), 6 deletions(-) create mode 100644 aodncore/pipeline/prefecthandlerbase.py create mode 100644 aodncore/util/s3_util.py diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index 19f2474e..d1bf7724 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -4,10 +4,14 @@ diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py new file mode 100644 index 00000000..b5a08acd --- /dev/null +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -0,0 +1,45 @@ +import logging +import os + +from aodncore.pipeline import HandlerBase, FileType +from aodncore.pipeline.exceptions import InvalidInputFileError +from aodncore.pipeline.handlerbase import FALLBACK_LOG_LEVEL, FALLBACK_LOG_FORMAT +from aodncore.pipeline.log import get_pipeline_logger + + +class PrefectHandlerBase(HandlerBase): + + def _set_input_file_attributes(self): + """ Override HandlerBase""" + + try: + self._file_checksum = self.etag + except (IOError, OSError) as e: + self.logger.exception(e) + raise InvalidInputFileError(e) + self.logger.sysinfo("get_file_checksum -> '{self.file_checksum}'".format(self=self)) + + self._file_basename = os.path.basename(self.input_file) + self.logger.sysinfo("file_basename -> '{self._file_basename}'".format(self=self)) + _, self._file_extension = os.path.splitext(self.input_file) + self.logger.sysinfo("file_extension -> '{self._file_extension}'".format(self=self)) + self._file_type = FileType.get_type_from_extension(self.file_extension) + self.logger.sysinfo("file_type -> {self._file_type}".format(self=self)) + + def init_logger(self, logger_function): + self._init_logger(logger_function) + + def _init_logger(self, logger_function): + + logging.basicConfig(level=FALLBACK_LOG_LEVEL, format=FALLBACK_LOG_FORMAT) + logger = get_pipeline_logger('', logger_function=logger_function) + + # turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key + liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN') + for lib in ('botocore', 'paramiko', 's3transfer', 'transitions'): + logging.getLogger(lib).setLevel(liblevel) + + self._logger = logger + self._celery_task_id = None + self._celery_task_name = 'NO_TASK' + self._pipeline_name = 'NO_PIPELINE' diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index 5fb80a5f..a6d15a25 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -147,6 +147,7 @@ 'type': 'array', 'items': {'type': 'string'} }, + 'landing_bucket': {'type': 'string'}, 'archive_uri': {'type': 'string'}, 'error_uri': {'type': 'string'}, 'opendap_root': {'type': 'string'}, diff --git a/aodncore/pipeline/steps/harvest.py b/aodncore/pipeline/steps/harvest.py index d3d19b98..890f3fa9 100644 --- a/aodncore/pipeline/steps/harvest.py +++ b/aodncore/pipeline/steps/harvest.py @@ -58,6 +58,8 @@ def get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_ return TalendHarvesterRunner(store_runner, harvest_params, tmp_base_dir, config, logger) elif harvester_name == 'csv': return CsvHarvesterRunner(store_runner, harvest_params, config, logger) + elif harvester_name is None: + return NoHarvestRunner(store_runner, config, logger) else: raise InvalidHarvesterError("invalid harvester '{name}'".format(name=harvester_name)) @@ -567,5 +569,22 @@ def build_runsheet(self, pf): return True +class NoHarvestRunner(BaseHarvesterRunner): + """:py:class:`BaseHarvesterRunner` implementation to store files without harvest.""" + + def __init__(self, storage_broker, config, logger): + super().__init__(config, logger) + self.storage_broker = storage_broker + self.config = self._config + self.logger = self._logger + self.unexpected_pipeline_files = [] + + def run(self, pipeline_files): + + files_to_upload = pipeline_files.filter_by_bool_attribute('pending_store_addition') + if files_to_upload: + self.storage_broker.upload(pipeline_files=files_to_upload) + + validate_triggerevent = validate_type(TriggerEvent) validate_harvestermap = validate_type(HarvesterMap) diff --git a/aodncore/util/fileops.py b/aodncore/util/fileops.py index c2a54b86..f0e278ac 100644 --- a/aodncore/util/fileops.py +++ b/aodncore/util/fileops.py @@ -18,6 +18,9 @@ import magic import netCDF4 + +from aodncore.util.s3_util import * + __all__ = [ 'TemporaryDirectory', 'extract_gzip', @@ -329,7 +332,6 @@ def rm_rf(path): elif e.errno != errno.ENOENT: raise # pragma: no cover - def safe_copy_file(source, destination, overwrite=False): """Copy a file atomically by copying first to a temporary file in the same directory (and therefore filesystem) as the intended destination, before performing a rename (which is atomic) @@ -339,10 +341,11 @@ def safe_copy_file(source, destination, overwrite=False): :param overwrite: set to True to allow existing destination file to be overwritten :return: None """ - if not os.path.exists(source): + + if not os.path.exists(source) and not is_s3(source): raise OSError("source file '{source}' does not exist".format(source=source)) if source == destination: - raise OSError("source file and destination file can't refer the to same file") + raise OSError("source file and destination file can't refer to the same file") if not overwrite and os.path.exists(destination): raise OSError("destination file '{destination}' already exists".format(destination=destination)) @@ -350,8 +353,11 @@ def safe_copy_file(source, destination, overwrite=False): try: with tempfile.NamedTemporaryFile(mode='wb', dir=os.path.dirname(destination), delete=False) as temp_destination: temp_destination_name = temp_destination.name - with open(source, 'rb') as f: - shutil.copyfileobj(f, temp_destination) + if is_s3(source): + download_object(get_s3_bucket(source), get_s3_key(source), temp_destination_name) + else: + with open(source, 'rb') as f: + shutil.copyfileobj(f, temp_destination) os.rename(temp_destination_name, destination) finally: try: diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py new file mode 100644 index 00000000..10189c90 --- /dev/null +++ b/aodncore/util/s3_util.py @@ -0,0 +1,49 @@ +from urllib.parse import urlparse +import boto3 + +__all__ = [ + "move_object", + "list_1000_objects", + "download_object", + "is_s3", + "get_s3_bucket", + "get_s3_key" +] + +s3 = boto3.resource('s3') + + +def move_object(key, source_bucket, dest_bucket): + copy_source = { + 'Bucket': source_bucket, + 'Key': key + } + s3.meta.client.copy(copy_source, dest_bucket, key) + s3.Object(source_bucket, key).delete() + + return True + + +def list_1000_objects(bucket, prefix): + response = s3.meta.client.list_objects_v2(Bucket=bucket, Prefix=prefix) + objects = response.get('Contents') + return objects + + +def download_object(bucket, key, destination): + s3.Object(bucket, key).download_file(destination) + + +def is_s3(url): + parsed_url = urlparse(url) + return parsed_url.scheme == 's3' + + +def get_s3_bucket(url): + parsed_url = urlparse(url) + return parsed_url.netloc + + +def get_s3_key(url): + parsed_url = urlparse(url) + return parsed_url.path.strip('/') From f3387d84833c5c3f0fd15190142b13660b5567e4 Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 12:57:33 +1000 Subject: [PATCH 02/27] init logger tidy up --- aodncore/pipeline/prefecthandlerbase.py | 4 +--- aodncore_demo.ipynb | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py index b5a08acd..a6b0b3d8 100644 --- a/aodncore/pipeline/prefecthandlerbase.py +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -31,8 +31,7 @@ def init_logger(self, logger_function): def _init_logger(self, logger_function): - logging.basicConfig(level=FALLBACK_LOG_LEVEL, format=FALLBACK_LOG_FORMAT) - logger = get_pipeline_logger('', logger_function=logger_function) + logger = get_pipeline_logger(None, logger_function=logger_function) # turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN') @@ -42,4 +41,3 @@ def _init_logger(self, logger_function): self._logger = logger self._celery_task_id = None self._celery_task_name = 'NO_TASK' - self._pipeline_name = 'NO_PIPELINE' diff --git a/aodncore_demo.ipynb b/aodncore_demo.ipynb index e08b0141..eb4ecd85 100644 --- a/aodncore_demo.ipynb +++ b/aodncore_demo.ipynb @@ -509,4 +509,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file From 18aa9cbabb24d1d2a2809c84e91a51c0ff268e62 Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 15:24:28 +1000 Subject: [PATCH 03/27] delete from S3 after download --- aodncore/pipeline/prefecthandlerbase.py | 19 ++++++++++++++++++- aodncore/pipeline/steps/resolve.py | 11 ++++++++--- aodncore/util/fileops.py | 12 +++++++++--- aodncore/util/s3_util.py | 7 ++++++- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py index a6b0b3d8..bb7728c9 100644 --- a/aodncore/pipeline/prefecthandlerbase.py +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -3,8 +3,9 @@ from aodncore.pipeline import HandlerBase, FileType from aodncore.pipeline.exceptions import InvalidInputFileError -from aodncore.pipeline.handlerbase import FALLBACK_LOG_LEVEL, FALLBACK_LOG_FORMAT from aodncore.pipeline.log import get_pipeline_logger +from aodncore.pipeline.steps import get_resolve_runner +from aodncore.util import ensure_regex_list class PrefectHandlerBase(HandlerBase): @@ -41,3 +42,19 @@ def _init_logger(self, logger_function): self._logger = logger self._celery_task_id = None self._celery_task_name = 'NO_TASK' + + def _resolve(self): + resolve_runner = get_resolve_runner(self.input_file, self.collection_dir, self.config, self.logger, + self.resolve_params) + self.logger.sysinfo("get_resolve_runner -> {resolve_runner}".format(resolve_runner=resolve_runner)) + resolved_files = resolve_runner.run(move=True) + + resolved_files.set_file_update_callback(self._file_update_callback) + + # if include_regexes is not defined, default to including all files when setting publish types + include_regexes = self.include_regexes if self.include_regexes else ensure_regex_list([r'.*']) + resolved_files.set_publish_types_from_regexes(include_regexes, self.exclude_regexes, + self.default_addition_publish_type, + self.default_deletion_publish_type) + + self.file_collection.update(resolved_files) diff --git a/aodncore/pipeline/steps/resolve.py b/aodncore/pipeline/steps/resolve.py index 444e3f2c..b3858393 100644 --- a/aodncore/pipeline/steps/resolve.py +++ b/aodncore/pipeline/steps/resolve.py @@ -34,7 +34,8 @@ from ..exceptions import InvalidFileFormatError from ..files import PipelineFile, PipelineFileCollection, RemotePipelineFile from ..schema import validate_json_manifest -from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file +from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file, \ + safe_move_file __all__ = [ 'get_resolve_runner', @@ -98,10 +99,14 @@ def run(self): class SingleFileResolveRunner(BaseResolveRunner): - def run(self): + def run(self, move=False): name = os.path.basename(self.input_file) temp_location = os.path.join(self.output_dir, name) - safe_copy_file(self.input_file, temp_location) + if move: + safe_move_file(self.input_file, temp_location) + else: + safe_copy_file(self.input_file, temp_location) + self._collection.add(temp_location) return self._collection diff --git a/aodncore/util/fileops.py b/aodncore/util/fileops.py index f0e278ac..28b2b6dc 100644 --- a/aodncore/util/fileops.py +++ b/aodncore/util/fileops.py @@ -17,7 +17,7 @@ import magic import netCDF4 - +from botocore.exceptions import ClientError from aodncore.util.s3_util import * @@ -50,6 +50,8 @@ 'validate_file_writable' ] +from aodncore.util.s3_util import delete_object + # allow for consistent sorting of filesystem directory listings locale.setlocale(locale.LC_ALL, 'C') filesystem_sort_key = cmp_to_key(locale.strcoll) @@ -332,7 +334,8 @@ def rm_rf(path): elif e.errno != errno.ENOENT: raise # pragma: no cover -def safe_copy_file(source, destination, overwrite=False): + +def safe_copy_file(source, destination, overwrite=False, move=False): """Copy a file atomically by copying first to a temporary file in the same directory (and therefore filesystem) as the intended destination, before performing a rename (which is atomic) @@ -377,7 +380,10 @@ def safe_move_file(src, dst, overwrite=False): :return: None """ safe_copy_file(src, dst, overwrite) - os.remove(src) + if is_s3(src): + delete_object(get_s3_bucket(src), get_s3_key(src)) + else: + os.remove(src) def validate_dir_writable(path): diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py index 10189c90..f35d0095 100644 --- a/aodncore/util/s3_util.py +++ b/aodncore/util/s3_util.py @@ -14,12 +14,13 @@ def move_object(key, source_bucket, dest_bucket): + # Move objects between buckets copy_source = { 'Bucket': source_bucket, 'Key': key } s3.meta.client.copy(copy_source, dest_bucket, key) - s3.Object(source_bucket, key).delete() + delete_object(source_bucket, key) return True @@ -34,6 +35,10 @@ def download_object(bucket, key, destination): s3.Object(bucket, key).download_file(destination) +def delete_object(bucket, key): + s3.Object(bucket, key).delete() + + def is_s3(url): parsed_url = urlparse(url) return parsed_url.scheme == 's3' From 962507c8f7107e1ec4b1a59b9e1d278ba7cebad3 Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 15:32:08 +1000 Subject: [PATCH 04/27] update git ignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 6fa1e073..2ec748b7 100644 --- a/.gitignore +++ b/.gitignore @@ -97,6 +97,7 @@ ENV/ .idea/vcs.xml .idea/python-aodncore.iml .idea/inspectionProfiles/profiles_settings.xml +.idea/inspectionProfiles/Project_Default.xml # Sensitive or high-churn files: .idea/**/dataSources/ From f40bf28de14e289bdb86f01cbf3c3f3baeb6c5c3 Mon Sep 17 00:00:00 2001 From: craigrose <40220935+craigrose@users.noreply.github.com> Date: Fri, 9 Sep 2022 15:35:35 +1000 Subject: [PATCH 05/27] Update Project_Default.xml --- .idea/inspectionProfiles/Project_Default.xml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index d1bf7724..ea1b0b06 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -4,14 +4,10 @@ @@ -37,4 +33,4 @@ - \ No newline at end of file + From f16292f279c792d4dcad6629c0b963464ea47510 Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 15:51:20 +1000 Subject: [PATCH 06/27] fix test --- test_aodncore/util/test_fileops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_aodncore/util/test_fileops.py b/test_aodncore/util/test_fileops.py index 9cc9d968..2b98e663 100644 --- a/test_aodncore/util/test_fileops.py +++ b/test_aodncore/util/test_fileops.py @@ -251,7 +251,7 @@ def test_safe_copy_file(self): with open(temp_source_file_path, 'w') as f: f.write(u'foobar') - with self.assertRaisesRegex(OSError, r"source file and destination file can't refer the to same file"): + with self.assertRaisesRegex(OSError, r"source file and destination file can't refer to the same file"): safe_copy_file(temp_source_file_path, temp_source_file_path) safe_copy_file(temp_source_file_path, temp_dest_file_path) From e99a1f1cb958cb40b50850a0d852d8c823679be1 Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 17:01:28 +1000 Subject: [PATCH 07/27] fix test --- aodncore/util/__init__.py | 1 + aodncore/util/fileops.py | 3 --- aodncore/util/s3_util.py | 3 ++- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/aodncore/util/__init__.py b/aodncore/util/__init__.py index bace3327..ca404a15 100644 --- a/aodncore/util/__init__.py +++ b/aodncore/util/__init__.py @@ -13,6 +13,7 @@ from .process import SystemProcess from .wfs import DEFAULT_WFS_VERSION, WfsBroker from .ff import get_field_type, get_tableschema_descriptor +from .s3_util import * __all__ = [ 'CaptureStdIO', diff --git a/aodncore/util/fileops.py b/aodncore/util/fileops.py index 28b2b6dc..01351858 100644 --- a/aodncore/util/fileops.py +++ b/aodncore/util/fileops.py @@ -17,7 +17,6 @@ import magic import netCDF4 -from botocore.exceptions import ClientError from aodncore.util.s3_util import * @@ -50,8 +49,6 @@ 'validate_file_writable' ] -from aodncore.util.s3_util import delete_object - # allow for consistent sorting of filesystem directory listings locale.setlocale(locale.LC_ALL, 'C') filesystem_sort_key = cmp_to_key(locale.strcoll) diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py index f35d0095..2ee7e80f 100644 --- a/aodncore/util/s3_util.py +++ b/aodncore/util/s3_util.py @@ -7,7 +7,8 @@ "download_object", "is_s3", "get_s3_bucket", - "get_s3_key" + "get_s3_key", + "delete_object" ] s3 = boto3.resource('s3') From ac035bb76b9a85dd08ddef0e70ed88266951104b Mon Sep 17 00:00:00 2001 From: cmrose Date: Thu, 8 Sep 2022 18:51:38 +1000 Subject: [PATCH 08/27] handle errors --- .idea/inspectionProfiles/Project_Default.xml | 6 ++- aodncore/pipeline/prefecthandlerbase.py | 45 ++++++++++++++++++ aodncore/pipeline/schema.py | 1 + aodncore/pipeline/steps/harvest.py | 19 ++++++++ aodncore/util/fileops.py | 16 +++++-- aodncore/util/s3_util.py | 49 ++++++++++++++++++++ 6 files changed, 130 insertions(+), 6 deletions(-) create mode 100644 aodncore/pipeline/prefecthandlerbase.py create mode 100644 aodncore/util/s3_util.py diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index 19f2474e..d1bf7724 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -4,10 +4,14 @@ diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py new file mode 100644 index 00000000..b5a08acd --- /dev/null +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -0,0 +1,45 @@ +import logging +import os + +from aodncore.pipeline import HandlerBase, FileType +from aodncore.pipeline.exceptions import InvalidInputFileError +from aodncore.pipeline.handlerbase import FALLBACK_LOG_LEVEL, FALLBACK_LOG_FORMAT +from aodncore.pipeline.log import get_pipeline_logger + + +class PrefectHandlerBase(HandlerBase): + + def _set_input_file_attributes(self): + """ Override HandlerBase""" + + try: + self._file_checksum = self.etag + except (IOError, OSError) as e: + self.logger.exception(e) + raise InvalidInputFileError(e) + self.logger.sysinfo("get_file_checksum -> '{self.file_checksum}'".format(self=self)) + + self._file_basename = os.path.basename(self.input_file) + self.logger.sysinfo("file_basename -> '{self._file_basename}'".format(self=self)) + _, self._file_extension = os.path.splitext(self.input_file) + self.logger.sysinfo("file_extension -> '{self._file_extension}'".format(self=self)) + self._file_type = FileType.get_type_from_extension(self.file_extension) + self.logger.sysinfo("file_type -> {self._file_type}".format(self=self)) + + def init_logger(self, logger_function): + self._init_logger(logger_function) + + def _init_logger(self, logger_function): + + logging.basicConfig(level=FALLBACK_LOG_LEVEL, format=FALLBACK_LOG_FORMAT) + logger = get_pipeline_logger('', logger_function=logger_function) + + # turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key + liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN') + for lib in ('botocore', 'paramiko', 's3transfer', 'transitions'): + logging.getLogger(lib).setLevel(liblevel) + + self._logger = logger + self._celery_task_id = None + self._celery_task_name = 'NO_TASK' + self._pipeline_name = 'NO_PIPELINE' diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index 5fb80a5f..a6d15a25 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -147,6 +147,7 @@ 'type': 'array', 'items': {'type': 'string'} }, + 'landing_bucket': {'type': 'string'}, 'archive_uri': {'type': 'string'}, 'error_uri': {'type': 'string'}, 'opendap_root': {'type': 'string'}, diff --git a/aodncore/pipeline/steps/harvest.py b/aodncore/pipeline/steps/harvest.py index d3d19b98..890f3fa9 100644 --- a/aodncore/pipeline/steps/harvest.py +++ b/aodncore/pipeline/steps/harvest.py @@ -58,6 +58,8 @@ def get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_ return TalendHarvesterRunner(store_runner, harvest_params, tmp_base_dir, config, logger) elif harvester_name == 'csv': return CsvHarvesterRunner(store_runner, harvest_params, config, logger) + elif harvester_name is None: + return NoHarvestRunner(store_runner, config, logger) else: raise InvalidHarvesterError("invalid harvester '{name}'".format(name=harvester_name)) @@ -567,5 +569,22 @@ def build_runsheet(self, pf): return True +class NoHarvestRunner(BaseHarvesterRunner): + """:py:class:`BaseHarvesterRunner` implementation to store files without harvest.""" + + def __init__(self, storage_broker, config, logger): + super().__init__(config, logger) + self.storage_broker = storage_broker + self.config = self._config + self.logger = self._logger + self.unexpected_pipeline_files = [] + + def run(self, pipeline_files): + + files_to_upload = pipeline_files.filter_by_bool_attribute('pending_store_addition') + if files_to_upload: + self.storage_broker.upload(pipeline_files=files_to_upload) + + validate_triggerevent = validate_type(TriggerEvent) validate_harvestermap = validate_type(HarvesterMap) diff --git a/aodncore/util/fileops.py b/aodncore/util/fileops.py index c2a54b86..f0e278ac 100644 --- a/aodncore/util/fileops.py +++ b/aodncore/util/fileops.py @@ -18,6 +18,9 @@ import magic import netCDF4 + +from aodncore.util.s3_util import * + __all__ = [ 'TemporaryDirectory', 'extract_gzip', @@ -329,7 +332,6 @@ def rm_rf(path): elif e.errno != errno.ENOENT: raise # pragma: no cover - def safe_copy_file(source, destination, overwrite=False): """Copy a file atomically by copying first to a temporary file in the same directory (and therefore filesystem) as the intended destination, before performing a rename (which is atomic) @@ -339,10 +341,11 @@ def safe_copy_file(source, destination, overwrite=False): :param overwrite: set to True to allow existing destination file to be overwritten :return: None """ - if not os.path.exists(source): + + if not os.path.exists(source) and not is_s3(source): raise OSError("source file '{source}' does not exist".format(source=source)) if source == destination: - raise OSError("source file and destination file can't refer the to same file") + raise OSError("source file and destination file can't refer to the same file") if not overwrite and os.path.exists(destination): raise OSError("destination file '{destination}' already exists".format(destination=destination)) @@ -350,8 +353,11 @@ def safe_copy_file(source, destination, overwrite=False): try: with tempfile.NamedTemporaryFile(mode='wb', dir=os.path.dirname(destination), delete=False) as temp_destination: temp_destination_name = temp_destination.name - with open(source, 'rb') as f: - shutil.copyfileobj(f, temp_destination) + if is_s3(source): + download_object(get_s3_bucket(source), get_s3_key(source), temp_destination_name) + else: + with open(source, 'rb') as f: + shutil.copyfileobj(f, temp_destination) os.rename(temp_destination_name, destination) finally: try: diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py new file mode 100644 index 00000000..10189c90 --- /dev/null +++ b/aodncore/util/s3_util.py @@ -0,0 +1,49 @@ +from urllib.parse import urlparse +import boto3 + +__all__ = [ + "move_object", + "list_1000_objects", + "download_object", + "is_s3", + "get_s3_bucket", + "get_s3_key" +] + +s3 = boto3.resource('s3') + + +def move_object(key, source_bucket, dest_bucket): + copy_source = { + 'Bucket': source_bucket, + 'Key': key + } + s3.meta.client.copy(copy_source, dest_bucket, key) + s3.Object(source_bucket, key).delete() + + return True + + +def list_1000_objects(bucket, prefix): + response = s3.meta.client.list_objects_v2(Bucket=bucket, Prefix=prefix) + objects = response.get('Contents') + return objects + + +def download_object(bucket, key, destination): + s3.Object(bucket, key).download_file(destination) + + +def is_s3(url): + parsed_url = urlparse(url) + return parsed_url.scheme == 's3' + + +def get_s3_bucket(url): + parsed_url = urlparse(url) + return parsed_url.netloc + + +def get_s3_key(url): + parsed_url = urlparse(url) + return parsed_url.path.strip('/') From 3656eec5067462136272a342224239b5fc075f19 Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 12:57:33 +1000 Subject: [PATCH 09/27] init logger tidy up --- aodncore/pipeline/prefecthandlerbase.py | 4 +--- aodncore_demo.ipynb | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py index b5a08acd..a6b0b3d8 100644 --- a/aodncore/pipeline/prefecthandlerbase.py +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -31,8 +31,7 @@ def init_logger(self, logger_function): def _init_logger(self, logger_function): - logging.basicConfig(level=FALLBACK_LOG_LEVEL, format=FALLBACK_LOG_FORMAT) - logger = get_pipeline_logger('', logger_function=logger_function) + logger = get_pipeline_logger(None, logger_function=logger_function) # turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN') @@ -42,4 +41,3 @@ def _init_logger(self, logger_function): self._logger = logger self._celery_task_id = None self._celery_task_name = 'NO_TASK' - self._pipeline_name = 'NO_PIPELINE' diff --git a/aodncore_demo.ipynb b/aodncore_demo.ipynb index e08b0141..eb4ecd85 100644 --- a/aodncore_demo.ipynb +++ b/aodncore_demo.ipynb @@ -509,4 +509,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file From fce5f1540b611478d8a68d15a9e1b307d7651f63 Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 15:24:28 +1000 Subject: [PATCH 10/27] delete from S3 after download --- aodncore/pipeline/prefecthandlerbase.py | 19 ++++++++++++++++++- aodncore/pipeline/steps/resolve.py | 11 ++++++++--- aodncore/util/fileops.py | 12 +++++++++--- aodncore/util/s3_util.py | 7 ++++++- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py index a6b0b3d8..bb7728c9 100644 --- a/aodncore/pipeline/prefecthandlerbase.py +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -3,8 +3,9 @@ from aodncore.pipeline import HandlerBase, FileType from aodncore.pipeline.exceptions import InvalidInputFileError -from aodncore.pipeline.handlerbase import FALLBACK_LOG_LEVEL, FALLBACK_LOG_FORMAT from aodncore.pipeline.log import get_pipeline_logger +from aodncore.pipeline.steps import get_resolve_runner +from aodncore.util import ensure_regex_list class PrefectHandlerBase(HandlerBase): @@ -41,3 +42,19 @@ def _init_logger(self, logger_function): self._logger = logger self._celery_task_id = None self._celery_task_name = 'NO_TASK' + + def _resolve(self): + resolve_runner = get_resolve_runner(self.input_file, self.collection_dir, self.config, self.logger, + self.resolve_params) + self.logger.sysinfo("get_resolve_runner -> {resolve_runner}".format(resolve_runner=resolve_runner)) + resolved_files = resolve_runner.run(move=True) + + resolved_files.set_file_update_callback(self._file_update_callback) + + # if include_regexes is not defined, default to including all files when setting publish types + include_regexes = self.include_regexes if self.include_regexes else ensure_regex_list([r'.*']) + resolved_files.set_publish_types_from_regexes(include_regexes, self.exclude_regexes, + self.default_addition_publish_type, + self.default_deletion_publish_type) + + self.file_collection.update(resolved_files) diff --git a/aodncore/pipeline/steps/resolve.py b/aodncore/pipeline/steps/resolve.py index 444e3f2c..b3858393 100644 --- a/aodncore/pipeline/steps/resolve.py +++ b/aodncore/pipeline/steps/resolve.py @@ -34,7 +34,8 @@ from ..exceptions import InvalidFileFormatError from ..files import PipelineFile, PipelineFileCollection, RemotePipelineFile from ..schema import validate_json_manifest -from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file +from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file, \ + safe_move_file __all__ = [ 'get_resolve_runner', @@ -98,10 +99,14 @@ def run(self): class SingleFileResolveRunner(BaseResolveRunner): - def run(self): + def run(self, move=False): name = os.path.basename(self.input_file) temp_location = os.path.join(self.output_dir, name) - safe_copy_file(self.input_file, temp_location) + if move: + safe_move_file(self.input_file, temp_location) + else: + safe_copy_file(self.input_file, temp_location) + self._collection.add(temp_location) return self._collection diff --git a/aodncore/util/fileops.py b/aodncore/util/fileops.py index f0e278ac..28b2b6dc 100644 --- a/aodncore/util/fileops.py +++ b/aodncore/util/fileops.py @@ -17,7 +17,7 @@ import magic import netCDF4 - +from botocore.exceptions import ClientError from aodncore.util.s3_util import * @@ -50,6 +50,8 @@ 'validate_file_writable' ] +from aodncore.util.s3_util import delete_object + # allow for consistent sorting of filesystem directory listings locale.setlocale(locale.LC_ALL, 'C') filesystem_sort_key = cmp_to_key(locale.strcoll) @@ -332,7 +334,8 @@ def rm_rf(path): elif e.errno != errno.ENOENT: raise # pragma: no cover -def safe_copy_file(source, destination, overwrite=False): + +def safe_copy_file(source, destination, overwrite=False, move=False): """Copy a file atomically by copying first to a temporary file in the same directory (and therefore filesystem) as the intended destination, before performing a rename (which is atomic) @@ -377,7 +380,10 @@ def safe_move_file(src, dst, overwrite=False): :return: None """ safe_copy_file(src, dst, overwrite) - os.remove(src) + if is_s3(src): + delete_object(get_s3_bucket(src), get_s3_key(src)) + else: + os.remove(src) def validate_dir_writable(path): diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py index 10189c90..f35d0095 100644 --- a/aodncore/util/s3_util.py +++ b/aodncore/util/s3_util.py @@ -14,12 +14,13 @@ def move_object(key, source_bucket, dest_bucket): + # Move objects between buckets copy_source = { 'Bucket': source_bucket, 'Key': key } s3.meta.client.copy(copy_source, dest_bucket, key) - s3.Object(source_bucket, key).delete() + delete_object(source_bucket, key) return True @@ -34,6 +35,10 @@ def download_object(bucket, key, destination): s3.Object(bucket, key).download_file(destination) +def delete_object(bucket, key): + s3.Object(bucket, key).delete() + + def is_s3(url): parsed_url = urlparse(url) return parsed_url.scheme == 's3' From 1c061eb37c75ef6af1245534bfaf89f1c3d0f7ba Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 15:32:08 +1000 Subject: [PATCH 11/27] update git ignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 890719c2..0086748f 100644 --- a/.gitignore +++ b/.gitignore @@ -103,6 +103,7 @@ ENV/ .idea/vcs.xml .idea/python-aodncore.iml .idea/inspectionProfiles/profiles_settings.xml +.idea/inspectionProfiles/Project_Default.xml # Sensitive or high-churn files: .idea/**/dataSources/ From bf77b7939692ad00adf9fbd46992b5601233584c Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 15:51:20 +1000 Subject: [PATCH 12/27] fix test --- test_aodncore/util/test_fileops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_aodncore/util/test_fileops.py b/test_aodncore/util/test_fileops.py index 9cc9d968..2b98e663 100644 --- a/test_aodncore/util/test_fileops.py +++ b/test_aodncore/util/test_fileops.py @@ -251,7 +251,7 @@ def test_safe_copy_file(self): with open(temp_source_file_path, 'w') as f: f.write(u'foobar') - with self.assertRaisesRegex(OSError, r"source file and destination file can't refer the to same file"): + with self.assertRaisesRegex(OSError, r"source file and destination file can't refer to the same file"): safe_copy_file(temp_source_file_path, temp_source_file_path) safe_copy_file(temp_source_file_path, temp_dest_file_path) From 4007572110a4c38699032431081322d879e5865d Mon Sep 17 00:00:00 2001 From: craigrose <40220935+craigrose@users.noreply.github.com> Date: Fri, 9 Sep 2022 15:35:35 +1000 Subject: [PATCH 13/27] Update Project_Default.xml --- .idea/inspectionProfiles/Project_Default.xml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index d1bf7724..ea1b0b06 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -4,14 +4,10 @@ @@ -37,4 +33,4 @@ - \ No newline at end of file + From f666c6c5c7d39a78537c1bffdbe7d2369606989d Mon Sep 17 00:00:00 2001 From: cmrose Date: Fri, 9 Sep 2022 17:01:28 +1000 Subject: [PATCH 14/27] fix test --- aodncore/util/__init__.py | 1 + aodncore/util/fileops.py | 3 --- aodncore/util/s3_util.py | 3 ++- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/aodncore/util/__init__.py b/aodncore/util/__init__.py index bace3327..ca404a15 100644 --- a/aodncore/util/__init__.py +++ b/aodncore/util/__init__.py @@ -13,6 +13,7 @@ from .process import SystemProcess from .wfs import DEFAULT_WFS_VERSION, WfsBroker from .ff import get_field_type, get_tableschema_descriptor +from .s3_util import * __all__ = [ 'CaptureStdIO', diff --git a/aodncore/util/fileops.py b/aodncore/util/fileops.py index 28b2b6dc..01351858 100644 --- a/aodncore/util/fileops.py +++ b/aodncore/util/fileops.py @@ -17,7 +17,6 @@ import magic import netCDF4 -from botocore.exceptions import ClientError from aodncore.util.s3_util import * @@ -50,8 +49,6 @@ 'validate_file_writable' ] -from aodncore.util.s3_util import delete_object - # allow for consistent sorting of filesystem directory listings locale.setlocale(locale.LC_ALL, 'C') filesystem_sort_key = cmp_to_key(locale.strcoll) diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py index f35d0095..2ee7e80f 100644 --- a/aodncore/util/s3_util.py +++ b/aodncore/util/s3_util.py @@ -7,7 +7,8 @@ "download_object", "is_s3", "get_s3_bucket", - "get_s3_key" + "get_s3_key", + "delete_object" ] s3 = boto3.resource('s3') From fbf1307b3078146945a4eaf6fe3e8cc4537a413f Mon Sep 17 00:00:00 2001 From: cmrose Date: Mon, 12 Sep 2022 14:23:54 +1000 Subject: [PATCH 15/27] set pyyaml to >=5.4.1 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 61a0f888..1354b7ad 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ 'tableschema>=1.19.4', 'transitions>=0.7.1', 'psycopg2-binary==2.8.6', - 'PyYAML==5.3.1' + 'PyYAML>=5.4.1' ] TESTS_REQUIRE = [ From d83d3c8ebf5853b15e1aa5a2fed45518adacdfd5 Mon Sep 17 00:00:00 2001 From: cmrose Date: Tue, 13 Sep 2022 15:56:50 +1000 Subject: [PATCH 16/27] allow s3 with session token --- aodncore/util/s3_util.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py index 2ee7e80f..bad7a79f 100644 --- a/aodncore/util/s3_util.py +++ b/aodncore/util/s3_util.py @@ -8,10 +8,19 @@ "is_s3", "get_s3_bucket", "get_s3_key", - "delete_object" + "delete_object", + "set_s3" ] -s3 = boto3.resource('s3') +s3 = None + +def set_s3(credentials=None): + global s3 + if credentials: + s3 = boto3.resource('s3', aws_session_token=credentials['SessionToken']) + else: + s3 = boto3.resource('s3') + def move_object(key, source_bucket, dest_bucket): From f74a2fc618e2334b1062788bc2dab898e79e42d6 Mon Sep 17 00:00:00 2001 From: cmrose Date: Wed, 14 Sep 2022 12:31:52 +1000 Subject: [PATCH 17/27] updated config schema with trigger --- aodncore/pipeline/schema.py | 19 ++++++++++++++++++- aodncore/util/s3_util.py | 2 +- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index a6d15a25..f9f76d9e 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -232,15 +232,32 @@ }, 'required': ['incoming_dir', 'logger_name', 'task_namespace'], 'additionalProperties': False + }, + "trigger": { + 'type': 'object', + 'properties': { + "trigger_type": {'$ref': '#/definitions/triggers'}, + "sns_topic_arn": {'type': 'string'}, + "prefect_api_key": {'type': 'string'}, + "prefect_queue_id": {'type': 'string'}, + "prefect_deployment_id": {'type': 'string'}, + "prefect_account_id": {'type': 'string'}, + "prefect_workspace_id": {'type': 'string'}, + "end_flow_function": {'type': 'string'} + }, + 'additionalProperties': False } }, - # TODO: add 'harvester' to the required list - once it has been added to the chef build 'required': ['global', 'logging', 'mail', 'talend', 'templating', 'watch', 'harvester'], 'additionalProperties': False, 'definitions': { 'loggingLevel': { 'type': 'string', 'enum': ['CRITICAL', 'FATAL', 'ERROR', 'WARNING', 'WARN', 'INFO', 'SYSINFO', 'DEBUG', 'NOTSET'] + }, + 'triggers': { + 'type': 'string', + 'enum': ['EVENT'] } } } diff --git a/aodncore/util/s3_util.py b/aodncore/util/s3_util.py index bad7a79f..2db2f162 100644 --- a/aodncore/util/s3_util.py +++ b/aodncore/util/s3_util.py @@ -14,6 +14,7 @@ s3 = None + def set_s3(credentials=None): global s3 if credentials: @@ -22,7 +23,6 @@ def set_s3(credentials=None): s3 = boto3.resource('s3') - def move_object(key, source_bucket, dest_bucket): # Move objects between buckets copy_source = { From 2bed073cdac3d33668a0ca4de16c225c93d14078 Mon Sep 17 00:00:00 2001 From: cmrose Date: Wed, 14 Sep 2022 15:25:12 +1000 Subject: [PATCH 18/27] add local agent trigger type --- aodncore/pipeline/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index f9f76d9e..011be886 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -257,7 +257,7 @@ }, 'triggers': { 'type': 'string', - 'enum': ['EVENT'] + 'enum': ['EVENT', 'LOCAL_AGENT'] } } } From 4fa08387ff708273a34df2e56ee9e842f3e48a87 Mon Sep 17 00:00:00 2001 From: cmrose Date: Wed, 14 Sep 2022 15:56:55 +1000 Subject: [PATCH 19/27] add local agent trigger type --- aodncore/pipeline/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index 011be886..f9f76d9e 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -257,7 +257,7 @@ }, 'triggers': { 'type': 'string', - 'enum': ['EVENT', 'LOCAL_AGENT'] + 'enum': ['EVENT'] } } } From 9346e49cd981edcc4fb8e7f02a6aaf0b35b759a0 Mon Sep 17 00:00:00 2001 From: cmrose Date: Thu, 15 Sep 2022 11:41:27 +1000 Subject: [PATCH 20/27] add prefect trigger sns message to config schema --- aodncore/pipeline/schema.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index f9f76d9e..0dd5f72d 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -238,6 +238,7 @@ 'properties': { "trigger_type": {'$ref': '#/definitions/triggers'}, "sns_topic_arn": {'type': 'string'}, + "sns_message": {'type': 'string'}, "prefect_api_key": {'type': 'string'}, "prefect_queue_id": {'type': 'string'}, "prefect_deployment_id": {'type': 'string'}, From e36210da826996fc533534f9e3d144842dff793d Mon Sep 17 00:00:00 2001 From: craigrose <40220935+craigrose@users.noreply.github.com> Date: Fri, 30 Sep 2022 10:50:41 +1000 Subject: [PATCH 21/27] Update README.md --- README.md | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 01701047..f5edc2ab 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,24 @@ -# AODN Core Pipeline +# AODN Core Pipeline for Prefect Flows + +This branch modifies the original python-aodncore and allows the pipeline steps to be used independently of the original state machine. As a result the steps can be reordered, skipped, added to or otherwise combined into adhoc Prefect flows. The changes also make the original state machine loggers available to Prefect so that fine grained output can be viewed in the Prefect Cloud interface. This results in a high degree of flexibility in the implementation of pipeline flows. + +The branch also includes functions for processing files in s3 buckets (aodncore/util/s3_util.py) and some minor changes to use these instead of the local file system. + +A new class, aodncore/pipeline/prefecthandlerbase.py. PrefectHandlerBase extends HandlerBase and should be used instead of HandlerBase when creating pipelines for use in Prefect flows. + +In PrefectHandlerBase the following HandlerBase methods are overridden: + + _set_input_file_attributes is overridden to use the s3 ETag for the checksum + _init_logger is overridden to use the Prefect logger to log messages from aodncore to the Prefect Cloud UI + _resolve is overridden so that resolve runners can move files from the landing s3 to the local filesystem instead of copying. This is currently only available for the SingleFileResolveRunner. + +It is not intended for the Prefect pipelines to do the harvesting step. This would be part of additional Prefect flows downstream of the data ingestion. We have avoided the harvesting step by simply using a newly created NoHarvestRunner class in aodncore/pipeline/steps/harvest.py which will do the file upload without actually harvesting. + +For additional details and notes on future possible work see https://github.com/aodn/backlog/issues/4290. + +# Original AODN Core Pipeline. + +*Note: This branch overrides the behaviours described below. This section is only here for reference to the original AODN core pipeline.* The `aodncore` package provides the core & generic functionality for all data ingestion pipelines at the AODN. This can be customised and extended for specific data-streams in the [`aodndata`](https://github.com/aodn/python-aodndata) package. @@ -9,4 +29,4 @@ The package provides the base class for each pipeline handler, aodncore.pipeline Project documentation is hosted at: https://aodn.github.io/python-aodncore/index.html ## Licensing -This project is licensed under the terms of the GNU GPLv3 license. \ No newline at end of file +This project is licensed under the terms of the GNU GPLv3 license. From bab541046e924d79e40dbbfff0bca27ab8fda878 Mon Sep 17 00:00:00 2001 From: craigrose <40220935+craigrose@users.noreply.github.com> Date: Fri, 30 Sep 2022 10:52:24 +1000 Subject: [PATCH 22/27] Update README.md --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f5edc2ab..ae3845bf 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,11 @@ A new class, aodncore/pipeline/prefecthandlerbase.py. PrefectHandlerBase extends In PrefectHandlerBase the following HandlerBase methods are overridden: - _set_input_file_attributes is overridden to use the s3 ETag for the checksum - _init_logger is overridden to use the Prefect logger to log messages from aodncore to the Prefect Cloud UI - _resolve is overridden so that resolve runners can move files from the landing s3 to the local filesystem instead of copying. This is currently only available for the SingleFileResolveRunner. +`_set_input_file_attributes` is overridden to use the s3 ETag for the checksum + +`_init_logger` is overridden to use the Prefect logger to log messages from aodncore to the Prefect Cloud UI. + +`_resolve` is overridden so that resolve runners can move files from the landing s3 to the local filesystem instead of copying. This is currently only available for the SingleFileResolveRunner. It is not intended for the Prefect pipelines to do the harvesting step. This would be part of additional Prefect flows downstream of the data ingestion. We have avoided the harvesting step by simply using a newly created NoHarvestRunner class in aodncore/pipeline/steps/harvest.py which will do the file upload without actually harvesting. From b032feb38f03bd5d69feb5fbd02676824b908f28 Mon Sep 17 00:00:00 2001 From: craigrose <40220935+craigrose@users.noreply.github.com> Date: Fri, 30 Sep 2022 10:54:46 +1000 Subject: [PATCH 23/27] Update README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index ae3845bf..3e1bb32a 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This branch modifies the original python-aodncore and allows the pipeline steps The branch also includes functions for processing files in s3 buckets (aodncore/util/s3_util.py) and some minor changes to use these instead of the local file system. -A new class, aodncore/pipeline/prefecthandlerbase.py. PrefectHandlerBase extends HandlerBase and should be used instead of HandlerBase when creating pipelines for use in Prefect flows. +A new class, aodncore/pipeline/prefecthandlerbase.py. `PrefectHandlerBase` extends `HandlerBase` and should be used instead of `HandlerBase` when creating pipelines for use in Prefect flows. In PrefectHandlerBase the following HandlerBase methods are overridden: @@ -12,9 +12,9 @@ In PrefectHandlerBase the following HandlerBase methods are overridden: `_init_logger` is overridden to use the Prefect logger to log messages from aodncore to the Prefect Cloud UI. -`_resolve` is overridden so that resolve runners can move files from the landing s3 to the local filesystem instead of copying. This is currently only available for the SingleFileResolveRunner. +`_resolve` is overridden so that resolve runners can move files from the landing s3 to the local filesystem instead of copying. This is currently only available for the `SingleFileResolveRunner`. -It is not intended for the Prefect pipelines to do the harvesting step. This would be part of additional Prefect flows downstream of the data ingestion. We have avoided the harvesting step by simply using a newly created NoHarvestRunner class in aodncore/pipeline/steps/harvest.py which will do the file upload without actually harvesting. +It is not intended for the Prefect pipelines to do the harvesting step. This would be part of additional Prefect flows downstream of the data ingestion. We have avoided the harvesting step by simply using a newly created `NoHarvestRunner` class in `aodncore/pipeline/steps/harvest.py` which will do the file upload without actually harvesting. For additional details and notes on future possible work see https://github.com/aodn/backlog/issues/4290. From bb3658ac23b62809ce62ec04442673531388ac41 Mon Sep 17 00:00:00 2001 From: cmrose Date: Tue, 18 Oct 2022 12:05:28 +1100 Subject: [PATCH 24/27] fix ingest_flow so it works with optimise_flow --- aodncore/pipeline/schema.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aodncore/pipeline/schema.py b/aodncore/pipeline/schema.py index 0dd5f72d..6c8023dd 100644 --- a/aodncore/pipeline/schema.py +++ b/aodncore/pipeline/schema.py @@ -150,6 +150,7 @@ 'landing_bucket': {'type': 'string'}, 'archive_uri': {'type': 'string'}, 'error_uri': {'type': 'string'}, + 'optimised_uri': {'type': 'string'}, 'opendap_root': {'type': 'string'}, 'processing_dir': {'type': 'string'}, 'tmp_dir': {'type': 'string'}, From 628eb90fa84bbe616bbe4e36bb7b16eec928c2aa Mon Sep 17 00:00:00 2001 From: cmrose Date: Tue, 11 Jul 2023 17:14:35 +1000 Subject: [PATCH 25/27] handle zip files --- aodncore/pipeline/prefecthandlerbase.py | 6 +++++- aodncore/pipeline/steps/resolve.py | 12 +++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/aodncore/pipeline/prefecthandlerbase.py b/aodncore/pipeline/prefecthandlerbase.py index bb7728c9..b866a1c8 100644 --- a/aodncore/pipeline/prefecthandlerbase.py +++ b/aodncore/pipeline/prefecthandlerbase.py @@ -5,6 +5,7 @@ from aodncore.pipeline.exceptions import InvalidInputFileError from aodncore.pipeline.log import get_pipeline_logger from aodncore.pipeline.steps import get_resolve_runner +from aodncore.pipeline.steps.resolve import SingleFileResolveRunner from aodncore.util import ensure_regex_list @@ -47,7 +48,10 @@ def _resolve(self): resolve_runner = get_resolve_runner(self.input_file, self.collection_dir, self.config, self.logger, self.resolve_params) self.logger.sysinfo("get_resolve_runner -> {resolve_runner}".format(resolve_runner=resolve_runner)) - resolved_files = resolve_runner.run(move=True) + if isinstance(resolve_runner, SingleFileResolveRunner): + resolved_files = resolve_runner.run(move=True) + else: + resolved_files = resolve_runner.run() resolved_files.set_file_update_callback(self._file_update_callback) diff --git a/aodncore/pipeline/steps/resolve.py b/aodncore/pipeline/steps/resolve.py index b3858393..d36de5ad 100644 --- a/aodncore/pipeline/steps/resolve.py +++ b/aodncore/pipeline/steps/resolve.py @@ -35,7 +35,7 @@ from ..files import PipelineFile, PipelineFileCollection, RemotePipelineFile from ..schema import validate_json_manifest from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file, \ - safe_move_file + safe_move_file, is_s3, TemporaryDirectory __all__ = [ 'get_resolve_runner', @@ -126,11 +126,21 @@ def run(self): class ZipFileResolveRunner(BaseResolveRunner): def run(self): + + if is_s3(self.input_file): + removeS3Download = True + temp = f'/tmp/{os.path.basename(self.input_file)}' + safe_move_file(self.input_file, temp) + self.input_file = temp + if not is_zip_file(self.input_file): raise InvalidFileFormatError("input_file must be a valid ZIP file") extract_zip(self.input_file, self.output_dir) + if removeS3Download: + os.remove(self.input_file) + for f in list_regular_files(self.output_dir, recursive=True): self._collection.add(f) return self._collection From 9ac8b5c5c49269ff73f384f92811a67b031b0b91 Mon Sep 17 00:00:00 2001 From: cmrose Date: Thu, 13 Jul 2023 16:19:49 +1000 Subject: [PATCH 26/27] strip leading / from s3 prefixes --- aodncore/pipeline/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aodncore/pipeline/storage.py b/aodncore/pipeline/storage.py index b920c4fe..e35fefc5 100644 --- a/aodncore/pipeline/storage.py +++ b/aodncore/pipeline/storage.py @@ -324,7 +324,7 @@ def __init__(self, bucket, prefix): super().__init__() self.bucket = bucket - self.prefix = prefix + self.prefix = prefix.strip('/') self.s3_client = boto3.client('s3') From 1bf123b1e2d5167fd9187b5ea9a185b0a3300365 Mon Sep 17 00:00:00 2001 From: cmrose Date: Thu, 13 Jul 2023 16:25:21 +1000 Subject: [PATCH 27/27] strip leading / from s3 prefixes --- aodncore/pipeline/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aodncore/pipeline/storage.py b/aodncore/pipeline/storage.py index e35fefc5..5a916928 100644 --- a/aodncore/pipeline/storage.py +++ b/aodncore/pipeline/storage.py @@ -324,7 +324,7 @@ def __init__(self, bucket, prefix): super().__init__() self.bucket = bucket - self.prefix = prefix.strip('/') + self.prefix = prefix.lstrip('/') self.s3_client = boto3.client('s3')