Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4290 prefect pipeline #266

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a7be4e5
handle errors
craigrose Sep 8, 2022
f3387d8
init logger tidy up
craigrose Sep 9, 2022
18aa9cb
delete from S3 after download
craigrose Sep 9, 2022
962507c
update git ignore
craigrose Sep 9, 2022
f40bf28
Update Project_Default.xml
craigrose Sep 9, 2022
f16292f
fix test
craigrose Sep 9, 2022
bc49b9b
Merge branch '4290-prefect-pipeline' of github.com:aodn/python-aodnco…
craigrose Sep 9, 2022
e99a1f1
fix test
craigrose Sep 9, 2022
ac035bb
handle errors
craigrose Sep 8, 2022
3656eec
init logger tidy up
craigrose Sep 9, 2022
fce5f15
delete from S3 after download
craigrose Sep 9, 2022
1c061eb
update git ignore
craigrose Sep 9, 2022
bf77b79
fix test
craigrose Sep 9, 2022
4007572
Update Project_Default.xml
craigrose Sep 9, 2022
f666c6c
fix test
craigrose Sep 9, 2022
4bc7df2
Merge branch '4290-prefect-pipeline' of github.com:aodn/python-aodnco…
craigrose Sep 11, 2022
fbf1307
set pyyaml to >=5.4.1
craigrose Sep 12, 2022
d83d3c8
allow s3 with session token
craigrose Sep 13, 2022
f74a2fc
updated config schema with trigger
craigrose Sep 14, 2022
2bed073
add local agent trigger type
craigrose Sep 14, 2022
4fa0838
add local agent trigger type
craigrose Sep 14, 2022
9346e49
add prefect trigger sns message to config schema
craigrose Sep 15, 2022
e36210d
Update README.md
craigrose Sep 30, 2022
bab5410
Update README.md
craigrose Sep 30, 2022
b032feb
Update README.md
craigrose Sep 30, 2022
bb3658a
fix ingest_flow so it works with optimise_flow
craigrose Oct 18, 2022
628eb90
handle zip files
craigrose Jul 11, 2023
9ac8b5c
strip leading / from s3 prefixes
craigrose Jul 13, 2023
1bf123b
strip leading / from s3 prefixes
craigrose Jul 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ ENV/
.idea/vcs.xml
.idea/python-aodncore.iml
.idea/inspectionProfiles/profiles_settings.xml
.idea/inspectionProfiles/Project_Default.xml

# Sensitive or high-churn files:
.idea/**/dataSources/
Expand Down
2 changes: 1 addition & 1 deletion .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
# AODN Core Pipeline
# AODN Core Pipeline for Prefect Flows

This branch modifies the original python-aodncore and allows the pipeline steps to be used independently of the original state machine. As a result the steps can be reordered, skipped, added to or otherwise combined into adhoc Prefect flows. The changes also make the original state machine loggers available to Prefect so that fine grained output can be viewed in the Prefect Cloud interface. This results in a high degree of flexibility in the implementation of pipeline flows.

The branch also includes functions for processing files in s3 buckets (aodncore/util/s3_util.py) and some minor changes to use these instead of the local file system.

A new class, aodncore/pipeline/prefecthandlerbase.py. `PrefectHandlerBase` extends `HandlerBase` and should be used instead of `HandlerBase` when creating pipelines for use in Prefect flows.

In PrefectHandlerBase the following HandlerBase methods are overridden:

`_set_input_file_attributes` is overridden to use the s3 ETag for the checksum

`_init_logger` is overridden to use the Prefect logger to log messages from aodncore to the Prefect Cloud UI.

`_resolve` is overridden so that resolve runners can move files from the landing s3 to the local filesystem instead of copying. This is currently only available for the `SingleFileResolveRunner`.

It is not intended for the Prefect pipelines to do the harvesting step. This would be part of additional Prefect flows downstream of the data ingestion. We have avoided the harvesting step by simply using a newly created `NoHarvestRunner` class in `aodncore/pipeline/steps/harvest.py` which will do the file upload without actually harvesting.

For additional details and notes on future possible work see https://github.com/aodn/backlog/issues/4290.

# Original AODN Core Pipeline.

*Note: This branch overrides the behaviours described below. This section is only here for reference to the original AODN core pipeline.*

The `aodncore` package provides the core & generic functionality for all data ingestion pipelines at the AODN. This can be customised and extended for specific data-streams in the [`aodndata`](https://github.com/aodn/python-aodndata) package.

Expand All @@ -9,4 +31,4 @@ The package provides the base class for each pipeline handler, aodncore.pipeline

Project documentation is hosted at: https://aodn.github.io/python-aodncore/index.html
## Licensing
This project is licensed under the terms of the GNU GPLv3 license.
This project is licensed under the terms of the GNU GPLv3 license.
60 changes: 60 additions & 0 deletions aodncore/pipeline/prefecthandlerbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
import os

from aodncore.pipeline import HandlerBase, FileType
from aodncore.pipeline.exceptions import InvalidInputFileError
from aodncore.pipeline.log import get_pipeline_logger
from aodncore.pipeline.steps import get_resolve_runner
from aodncore.util import ensure_regex_list


class PrefectHandlerBase(HandlerBase):

def _set_input_file_attributes(self):
""" Override HandlerBase"""

try:
self._file_checksum = self.etag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see where this etag attribute is set?

except (IOError, OSError) as e:
self.logger.exception(e)
raise InvalidInputFileError(e)
self.logger.sysinfo("get_file_checksum -> '{self.file_checksum}'".format(self=self))

self._file_basename = os.path.basename(self.input_file)
self.logger.sysinfo("file_basename -> '{self._file_basename}'".format(self=self))
_, self._file_extension = os.path.splitext(self.input_file)
self.logger.sysinfo("file_extension -> '{self._file_extension}'".format(self=self))
self._file_type = FileType.get_type_from_extension(self.file_extension)
self.logger.sysinfo("file_type -> {self._file_type}".format(self=self))

def init_logger(self, logger_function):
self._init_logger(logger_function)

def _init_logger(self, logger_function):

logger = get_pipeline_logger(None, logger_function=logger_function)

# turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key
liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN')
for lib in ('botocore', 'paramiko', 's3transfer', 'transitions'):
logging.getLogger(lib).setLevel(liblevel)

self._logger = logger
self._celery_task_id = None
self._celery_task_name = 'NO_TASK'

def _resolve(self):
resolve_runner = get_resolve_runner(self.input_file, self.collection_dir, self.config, self.logger,
self.resolve_params)
self.logger.sysinfo("get_resolve_runner -> {resolve_runner}".format(resolve_runner=resolve_runner))
resolved_files = resolve_runner.run(move=True)

resolved_files.set_file_update_callback(self._file_update_callback)

# if include_regexes is not defined, default to including all files when setting publish types
include_regexes = self.include_regexes if self.include_regexes else ensure_regex_list([r'.*'])
resolved_files.set_publish_types_from_regexes(include_regexes, self.exclude_regexes,
self.default_addition_publish_type,
self.default_deletion_publish_type)

self.file_collection.update(resolved_files)
21 changes: 20 additions & 1 deletion aodncore/pipeline/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
'type': 'array',
'items': {'type': 'string'}
},
'landing_bucket': {'type': 'string'},
'archive_uri': {'type': 'string'},
'error_uri': {'type': 'string'},
'opendap_root': {'type': 'string'},
Expand Down Expand Up @@ -231,15 +232,33 @@
},
'required': ['incoming_dir', 'logger_name', 'task_namespace'],
'additionalProperties': False
},
"trigger": {
'type': 'object',
'properties': {
"trigger_type": {'$ref': '#/definitions/triggers'},
"sns_topic_arn": {'type': 'string'},
"sns_message": {'type': 'string'},
"prefect_api_key": {'type': 'string'},
"prefect_queue_id": {'type': 'string'},
"prefect_deployment_id": {'type': 'string'},
"prefect_account_id": {'type': 'string'},
"prefect_workspace_id": {'type': 'string'},
"end_flow_function": {'type': 'string'}
},
'additionalProperties': False
}
},
# TODO: add 'harvester' to the required list - once it has been added to the chef build
'required': ['global', 'logging', 'mail', 'talend', 'templating', 'watch', 'harvester'],
'additionalProperties': False,
'definitions': {
'loggingLevel': {
'type': 'string',
'enum': ['CRITICAL', 'FATAL', 'ERROR', 'WARNING', 'WARN', 'INFO', 'SYSINFO', 'DEBUG', 'NOTSET']
},
'triggers': {
'type': 'string',
'enum': ['EVENT']
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions aodncore/pipeline/steps/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_
return TalendHarvesterRunner(store_runner, harvest_params, tmp_base_dir, config, logger)
elif harvester_name == 'csv':
return CsvHarvesterRunner(store_runner, harvest_params, config, logger)
elif harvester_name is None:
return NoHarvestRunner(store_runner, config, logger)
else:
raise InvalidHarvesterError("invalid harvester '{name}'".format(name=harvester_name))

Expand Down Expand Up @@ -567,5 +569,22 @@ def build_runsheet(self, pf):
return True


class NoHarvestRunner(BaseHarvesterRunner):
""":py:class:`BaseHarvesterRunner` implementation to store files without harvest."""

def __init__(self, storage_broker, config, logger):
super().__init__(config, logger)
self.storage_broker = storage_broker
self.config = self._config
self.logger = self._logger
self.unexpected_pipeline_files = []

def run(self, pipeline_files):

files_to_upload = pipeline_files.filter_by_bool_attribute('pending_store_addition')
if files_to_upload:
self.storage_broker.upload(pipeline_files=files_to_upload)


validate_triggerevent = validate_type(TriggerEvent)
validate_harvestermap = validate_type(HarvesterMap)
11 changes: 8 additions & 3 deletions aodncore/pipeline/steps/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
from ..exceptions import InvalidFileFormatError
from ..files import PipelineFile, PipelineFileCollection, RemotePipelineFile
from ..schema import validate_json_manifest
from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file
from ...util import extract_gzip, extract_zip, list_regular_files, is_gzip_file, is_zip_file, safe_copy_file, \
safe_move_file

__all__ = [
'get_resolve_runner',
Expand Down Expand Up @@ -98,10 +99,14 @@ def run(self):


class SingleFileResolveRunner(BaseResolveRunner):
def run(self):
def run(self, move=False):
name = os.path.basename(self.input_file)
temp_location = os.path.join(self.output_dir, name)
safe_copy_file(self.input_file, temp_location)
if move:
safe_move_file(self.input_file, temp_location)
else:
safe_copy_file(self.input_file, temp_location)


self._collection.add(temp_location)
return self._collection
Expand Down
1 change: 1 addition & 0 deletions aodncore/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .process import SystemProcess
from .wfs import DEFAULT_WFS_VERSION, WfsBroker
from .ff import get_field_type, get_tableschema_descriptor
from .s3_util import *

__all__ = [
'CaptureStdIO',
Expand Down
21 changes: 15 additions & 6 deletions aodncore/util/fileops.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import magic
import netCDF4

from aodncore.util.s3_util import *

__all__ = [
'TemporaryDirectory',
'extract_gzip',
Expand Down Expand Up @@ -330,7 +332,7 @@ def rm_rf(path):
raise # pragma: no cover


def safe_copy_file(source, destination, overwrite=False):
def safe_copy_file(source, destination, overwrite=False, move=False):
"""Copy a file atomically by copying first to a temporary file in the same directory (and therefore filesystem) as
the intended destination, before performing a rename (which is atomic)

Expand All @@ -339,19 +341,23 @@ def safe_copy_file(source, destination, overwrite=False):
:param overwrite: set to True to allow existing destination file to be overwritten
:return: None
"""
if not os.path.exists(source):

if not os.path.exists(source) and not is_s3(source):
raise OSError("source file '{source}' does not exist".format(source=source))
if source == destination:
raise OSError("source file and destination file can't refer the to same file")
raise OSError("source file and destination file can't refer to the same file")
if not overwrite and os.path.exists(destination):
raise OSError("destination file '{destination}' already exists".format(destination=destination))

temp_destination_name = None
try:
with tempfile.NamedTemporaryFile(mode='wb', dir=os.path.dirname(destination), delete=False) as temp_destination:
temp_destination_name = temp_destination.name
with open(source, 'rb') as f:
shutil.copyfileobj(f, temp_destination)
if is_s3(source):
download_object(get_s3_bucket(source), get_s3_key(source), temp_destination_name)
else:
with open(source, 'rb') as f:
shutil.copyfileobj(f, temp_destination)
os.rename(temp_destination_name, destination)
finally:
try:
Expand All @@ -371,7 +377,10 @@ def safe_move_file(src, dst, overwrite=False):
:return: None
"""
safe_copy_file(src, dst, overwrite)
os.remove(src)
if is_s3(src):
delete_object(get_s3_bucket(src), get_s3_key(src))
else:
os.remove(src)


def validate_dir_writable(path):
Expand Down
64 changes: 64 additions & 0 deletions aodncore/util/s3_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from urllib.parse import urlparse
import boto3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some functionality for handling objects on S3 is already implemented as an S3StorageBroker class that suports query, download, upload and delete operations. Could that be reused instead of duplicating the functionality here?


__all__ = [
"move_object",
"list_1000_objects",
"download_object",
"is_s3",
"get_s3_bucket",
"get_s3_key",
"delete_object",
"set_s3"
]

s3 = None


def set_s3(credentials=None):
global s3
if credentials:
s3 = boto3.resource('s3', aws_session_token=credentials['SessionToken'])
else:
s3 = boto3.resource('s3')


def move_object(key, source_bucket, dest_bucket):
# Move objects between buckets
copy_source = {
'Bucket': source_bucket,
'Key': key
}
s3.meta.client.copy(copy_source, dest_bucket, key)
delete_object(source_bucket, key)

return True


def list_1000_objects(bucket, prefix):
response = s3.meta.client.list_objects_v2(Bucket=bucket, Prefix=prefix)
objects = response.get('Contents')
return objects


def download_object(bucket, key, destination):
s3.Object(bucket, key).download_file(destination)


def delete_object(bucket, key):
s3.Object(bucket, key).delete()


def is_s3(url):
parsed_url = urlparse(url)
return parsed_url.scheme == 's3'


def get_s3_bucket(url):
parsed_url = urlparse(url)
return parsed_url.netloc


def get_s3_key(url):
parsed_url = urlparse(url)
return parsed_url.path.strip('/')
2 changes: 1 addition & 1 deletion aodncore_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
'tableschema>=1.19.4',
'transitions>=0.7.1',
'psycopg2-binary==2.8.6',
'PyYAML==5.3.1'
'PyYAML>=5.4.1'
]

TESTS_REQUIRE = [
Expand Down
2 changes: 1 addition & 1 deletion test_aodncore/util/test_fileops.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def test_safe_copy_file(self):
with open(temp_source_file_path, 'w') as f:
f.write(u'foobar')

with self.assertRaisesRegex(OSError, r"source file and destination file can't refer the to same file"):
with self.assertRaisesRegex(OSError, r"source file and destination file can't refer to the same file"):
safe_copy_file(temp_source_file_path, temp_source_file_path)

safe_copy_file(temp_source_file_path, temp_dest_file_path)
Expand Down